This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new ca797e33 [ISSUE #290] activemq source connector adapt to new api (#291)
ca797e33 is described below

commit ca797e330ffa8caed2ee7d397180bbf2df2c1a86
Author: Oliver <[email protected]>
AuthorDate: Tue Sep 27 13:41:08 2022 +0800

    [ISSUE #290] activemq source connector adapt to new api (#291)
    
    * [ISSUE #290] activemq source connector adapt to new api
    
    * set extensions
    
    * set position
---
 connectors/rocketmq-connect-activemq/README-CN.md  |  31 ++++--
 connectors/rocketmq-connect-activemq/README.md     |  31 ++++--
 connectors/rocketmq-connect-activemq/pom.xml       |  22 ++++-
 .../apache/rocketmq/connect/activemq/Config.java   |   8 ++
 .../rocketmq/connect/activemq/Replicator.java      |   4 +-
 .../connector/ActivemqSourceConnector.java         |  33 ++-----
 .../activemq/connector/ActivemqSourceTask.java     | 109 ++++++++++++++-------
 .../connect/activemq/pattern/PatternProcessor.java |  10 +-
 .../rocketmq/connect/activemq/ReplicatorTest.java  |   2 +-
 .../activemq/connector/ActivemqConnectorTest.java  |  17 +---
 .../activemq/connector/ActivemqSourceTaskTest.java |  30 +++---
 11 files changed, 189 insertions(+), 108 deletions(-)

diff --git a/connectors/rocketmq-connect-activemq/README-CN.md 
b/connectors/rocketmq-connect-activemq/README-CN.md
index be036839..21cf9f74 100644
--- a/connectors/rocketmq-connect-activemq/README-CN.md
+++ b/connectors/rocketmq-connect-activemq/README-CN.md
@@ -1,16 +1,31 @@
 ##### ActiveConnector完全限定名
 org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
 
+**activemq-source-connector** 启动
+
+```
+POST  http://${runtime-ip}:${runtime-port}/connectors/activeSourceConnector
+{
+    
"connector.class":"org.apache.rocketmq.connect.activemq.connector.ActivemqSourceConnector",
+    "max-task":"2",
+    "activemqUrl":"tcp://localhost:61616",
+    "destinationType":"queue",
+    "destinationName":"testQueue",
+    "connect.topicname":"targetTopic",
+    
"value-converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+    
"key-converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}
+```
 
 ##### 配置参数
 
 参数 | 作用 | 是否必填 | 默认值
 ---|--- |--- | ---
-activemq.url | activemq ip与端口号 | 是 | 无
-activemq.username | 用户名 | 否 |  无
-activemq.password|  密码    | 否  | 无
-jms.destination.name | 读取的队列或者主题名   |  是 | 无
-jms.destination.type | 读取的类型:queue(队列)或者topic(主题) | 是 | 无
-jms.message.selector | 过滤器    |  否  |无
-jms.session.acknowledge.mode | 消息确认  | 否 | Session.AUTO_ACKNOWLEDGE
-jms.session.transacted | 是否是事务会话      | 否 | false
+activemqUrl | activemq ip与端口号 | 是 | 无
+activemqUsername | 用户名 | 否 |  无
+activemqPassword|  密码    | 否  | 无
+destinationName | 读取的队列或者主题名   |  是 | 无
+destinationType | 读取的类型:queue(队列)或者topic(主题) | 是 | 无
+messageSelector | 过滤器    |  否  |无
+sessionAcknowledgeMode | 消息确认  | 否 | Session.AUTO_ACKNOWLEDGE
+sessionTransacted | 是否是事务会话      | 否 | false
diff --git a/connectors/rocketmq-connect-activemq/README.md 
b/connectors/rocketmq-connect-activemq/README.md
index e15149e1..52aea0f2 100644
--- a/connectors/rocketmq-connect-activemq/README.md
+++ b/connectors/rocketmq-connect-activemq/README.md
@@ -1,16 +1,31 @@
 ##### ActiveConnector fully-qualified name
 org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
 
+**activemq-source-connector** start
+
+```
+POST  http://${runtime-ip}:${runtime-port}/connectors/activeSourceConnector
+{
+    
"connector.class":"org.apache.rocketmq.connect.activemq.connector.ActivemqSourceConnector",
+    "max-task":"3",
+    "activemqUrl":"tcp://localhost:61616",
+    "destinationType":"queue",
+    "destinationName":"testQueue",
+    "connect.topicname":"targetTopic",
+    
"value-converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+    
"key-converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}
+```
 
 ##### parameter configuration
 
 parameter | effect | required |default
 ---|--- |--- | ---
-activemq.url | The URL of the ActiveMQ broker | yes | null
-activemq.username | The username to use when connecting to ActiveMQ | no |  
null
-activemq.password|  The password to use when connecting to ActiveMQ    | no  | 
null
-jms.destination.name | The name of the JMS destination (queue or topic) to 
read from   |  yes | null
-jms.destination.type | The type of JMS destination, which is either queue or 
topic | yes | null
-jms.message.selector | The message selector that should be applied to messages 
in the destination    |  no  | null 
-jms.session.acknowledge.mode | The acknowledgement mode for the JMS Session  | 
null | Session.AUTO_ACKNOWLEDGE
-jms.session.transacted | Flag to determine if the session is transacted and 
the session completely controls. the message delivery by either committing or 
rolling back the session      | null | false
+activemqUrl | The URL of the ActiveMQ broker | yes | null
+activemqUsername | The username to use when connecting to ActiveMQ | no |  null
+activemqPassword|  The password to use when connecting to ActiveMQ    | no  | 
null
+destinationName | The name of the JMS destination (queue or topic) to read 
from   |  yes | null
+destinationType | The type of JMS destination, which is either queue or topic 
| yes | null
+messageSelector | The message selector that should be applied to messages in 
the destination    |  no  | null 
+sessionAcknowledgeMode | The acknowledgement mode for the JMS Session  | null 
| Session.AUTO_ACKNOWLEDGE
+sessionTransacted | Flag to determine if the session is transacted and the 
session completely controls. the message delivery by either committing or 
rolling back the session      | null | false
diff --git a/connectors/rocketmq-connect-activemq/pom.xml 
b/connectors/rocketmq-connect-activemq/pom.xml
index b9b7bb92..738bf7cf 100644
--- a/connectors/rocketmq-connect-activemq/pom.xml
+++ b/connectors/rocketmq-connect-activemq/pom.xml
@@ -139,6 +139,24 @@
                     </excludes>
                 </configuration>
             </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 
@@ -164,7 +182,7 @@
         <dependency>
             <groupId>io.openmessaging</groupId>
             <artifactId>openmessaging-connector</artifactId>
-            <version>0.1.0-beta</version>
+            <version>0.1.4</version>
         </dependency>
         <dependency>
             <groupId>com.alibaba</groupId>
@@ -199,7 +217,7 @@
         <dependency>
             <groupId>org.apache.activemq</groupId>
             <artifactId>activemq-all</artifactId>
-            <version>5.9.0</version>
+            <version>5.16.5</version>
         </dependency>
         <dependency>
             <groupId>javax.jms</groupId>
diff --git 
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
 
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
index 30f898df..bc42576e 100644
--- 
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
+++ 
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
@@ -34,6 +34,14 @@ public class Config {
         }
     };
 
+    public static final String DESTINATION_TYPE = "destinationType";
+
+    public static final String DESTINATION_NAME = "destinationName";
+
+    public static final String MESSAGE = "message";
+
+    public static final String POSITION =  "position";
+
     public String activemqUrl;
 
     public String activemqUsername;
diff --git 
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
 
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
index e0ebe125..2959f646 100644
--- 
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
+++ 
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
@@ -39,9 +39,9 @@ public class Replicator {
         this.config = config;
     }
 
-    public void start() throws Exception {
+    public void start(long offset) throws Exception {
         processor = new PatternProcessor(this);
-        processor.start();
+        processor.start(offset);
         LOGGER.info("Replicator start succeed");
     }
 
diff --git 
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java
 
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java
index 7e6290bc..134197b0 100644
--- 
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java
+++ 
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java
@@ -18,8 +18,8 @@
 package org.apache.rocketmq.connect.activemq.connector;
 
 import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.Task;
-import io.openmessaging.connector.api.source.SourceConnector;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.rocketmq.connect.activemq.Config;
@@ -28,34 +28,27 @@ public class ActivemqSourceConnector extends 
SourceConnector {
 
     private KeyValue config;
 
-    @Override
-    public String verifyAndSetConfig(KeyValue config) {
 
+
+    @Override public void start(KeyValue config) {
         for (String requestKey : Config.REQUEST_CONFIG) {
             if (!config.containsKey(requestKey)) {
-                return "Request config key: " + requestKey;
+                throw new RuntimeException("Request config key: " + 
requestKey);
             }
         }
         this.config = config;
-        return "";
-    }
-
-    @Override
-    public void start() {
-
     }
 
     @Override
     public void stop() {
-
-    }
-
-    @Override public void pause() {
-
+        this.config = null;
     }
 
-    @Override public void resume() {
 
+    @Override public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> config = new ArrayList<>();
+        config.add(this.config);
+        return config;
     }
 
     @Override
@@ -63,10 +56,4 @@ public class ActivemqSourceConnector extends SourceConnector 
{
         return ActivemqSourceTask.class;
     }
 
-    @Override
-    public List<KeyValue> taskConfigs() {
-        List<KeyValue> config = new ArrayList<>();
-        config.add(this.config);
-        return config;
-    }
 }
diff --git 
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
 
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
index c0092741..600e7b97 100644
--- 
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
+++ 
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
@@ -19,14 +19,17 @@ package org.apache.rocketmq.connect.activemq.connector;
 
 import com.alibaba.fastjson.JSON;
 import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.EntryType;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import io.openmessaging.connector.api.exception.DataConnectException;
-import io.openmessaging.connector.api.source.SourceTask;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
 import java.io.ByteArrayOutputStream;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
@@ -40,7 +43,6 @@ import javax.jms.ObjectMessage;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
 import org.apache.rocketmq.connect.activemq.Config;
-import org.apache.rocketmq.connect.activemq.ErrorCode;
 import org.apache.rocketmq.connect.activemq.Replicator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,17 +55,13 @@ public class ActivemqSourceTask extends SourceTask {
 
     private Config config;
 
-    private ByteBuffer sourcePartition;
-
     @Override
-    public Collection<SourceDataEntry> poll() {
-        List<SourceDataEntry> res = new ArrayList<>();
+    public List<ConnectRecord> poll() {
+        List<ConnectRecord> res = new ArrayList<>();
         try {
             Message message = replicator.getQueue().poll(1000, 
TimeUnit.MILLISECONDS);
             if (message != null) {
-                Object[] payload = new Object[] {config.getDestinationType(), 
config.getDestinationName(), getMessageContent(message)};
-                SourceDataEntry sourceDataEntry = new 
SourceDataEntry(sourcePartition, null, System.currentTimeMillis(), 
EntryType.CREATE, null, null, payload);
-                res.add(sourceDataEntry);
+                res.add(message2ConnectRecord(message));
             }
         } catch (Exception e) {
             log.error("activemq task poll error, current config:" + 
JSON.toJSONString(config), e);
@@ -76,12 +74,18 @@ public class ActivemqSourceTask extends SourceTask {
         try {
             this.config = new Config();
             this.config.load(props);
-            this.sourcePartition = 
ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8"));
             this.replicator = new Replicator(config);
-            this.replicator.start();
+            final RecordOffset recordOffset = 
this.sourceTaskContext.offsetStorageReader().readOffset(buildRecordPartition());
+            long offset = 0L;
+            if (recordOffset != null) {
+                final Object position = 
recordOffset.getOffset().get(Config.POSITION);
+                if (position != null) {
+                    offset = Long.valueOf(position.toString());
+                }
+            }
+            this.replicator.start(offset);
         } catch (Exception e) {
             log.error("activemq task start failed.", e);
-            throw new DataConnectException(ErrorCode.START_ERROR_CODE, 
e.getMessage(), e);
         }
     }
 
@@ -91,29 +95,21 @@ public class ActivemqSourceTask extends SourceTask {
             replicator.stop();
         } catch (Exception e) {
             log.error("activemq task stop failed.", e);
-            throw new DataConnectException(ErrorCode.STOP_ERROR_CODE, 
e.getMessage(), e);
         }
     }
 
-    @Override public void pause() {
 
-    }
-
-    @Override public void resume() {
-
-    }
 
     @SuppressWarnings("unchecked")
-    public ByteBuffer getMessageContent(Message message) throws JMSException {
-        byte[] data = null;
+    public String getMessageContent(Message message) throws JMSException {
+        String data = null;
         if (message instanceof TextMessage) {
-            data = ((TextMessage) message).getText().getBytes();
+            data = ((TextMessage) message).getText();
         } else if (message instanceof ObjectMessage) {
-            data = JSON.toJSONBytes(((ObjectMessage) message).getObject());
+            data = JSON.toJSONString(((ObjectMessage) message).getObject());
         } else if (message instanceof BytesMessage) {
             BytesMessage bytesMessage = (BytesMessage) message;
-            data = new byte[(int) bytesMessage.getBodyLength()];
-            bytesMessage.readBytes(data);
+            data = bytesMessage.toString();
         } else if (message instanceof MapMessage) {
             MapMessage mapMessage = (MapMessage) message;
             Map<String, Object> map = new HashMap<>();
@@ -122,7 +118,7 @@ public class ActivemqSourceTask extends SourceTask {
                 String name = names.nextElement().toString();
                 map.put(name, mapMessage.getObject(name));
             }
-            data = JSON.toJSONBytes(map);
+            data = JSON.toJSONString(map);
         } else if (message instanceof StreamMessage) {
             StreamMessage streamMessage = (StreamMessage) message;
             ByteArrayOutputStream bis = new ByteArrayOutputStream();
@@ -131,11 +127,58 @@ public class ActivemqSourceTask extends SourceTask {
             while ((i = streamMessage.readBytes(by)) != -1) {
                 bis.write(by, 0, i);
             }
-            data = bis.toByteArray();
+            data = bis.toString();
         } else {
             // The exception is printed and does not need to be written as a 
DataConnectException
             throw new RuntimeException("message type exception");
         }
-        return ByteBuffer.wrap(data);
+        return data;
+    }
+
+    private ConnectRecord message2ConnectRecord(Message message) throws 
JMSException {
+        Schema schema = SchemaBuilder.struct().name("activemq").build();
+        final List<Field> fields = buildFields();
+        schema.setFields(fields);
+        final ConnectRecord connectRecord = new 
ConnectRecord(buildRecordPartition(),
+            buildRecordOffset(message),
+            System.currentTimeMillis(),
+            schema,
+            buildPayLoad(fields, message, schema));
+        connectRecord.setExtensions(buildExtendFiled());
+        return connectRecord;
+    }
+
+    private RecordOffset buildRecordOffset(Message message) throws 
JMSException {
+        Map<String, Long> offsetMap = new HashMap<>();
+        offsetMap.put(Config.POSITION, 
Long.parseLong(message.getJMSMessageID().split(":")[5]));
+        RecordOffset recordOffset = new RecordOffset(offsetMap);
+        return recordOffset;
+    }
+
+    private RecordPartition buildRecordPartition() {
+        Map<String, String> partitionMap = new HashMap<>();
+        partitionMap.put("partition", "defaultPartition");
+        RecordPartition  recordPartition = new RecordPartition(partitionMap);
+        return recordPartition;
+    }
+
+    private List<Field> buildFields() {
+        final Schema stringSchema = SchemaBuilder.string().build();
+        List<Field> fields = new ArrayList<>();
+        fields.add(new Field(0, Config.MESSAGE, stringSchema));
+        return fields;
+    }
+
+    private Struct buildPayLoad(List<Field> fields, Message message, Schema 
schema) throws JMSException {
+        Struct payLoad = new Struct(schema);
+        payLoad.put(fields.get(0), getMessageContent(message));
+        return payLoad;
+    }
+
+    private KeyValue buildExtendFiled() {
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(Config.DESTINATION_NAME,  config.getDestinationName());
+        keyValue.put(Config.DESTINATION_TYPE, config.getDestinationType());
+        return keyValue;
     }
 }
diff --git 
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
 
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
index 6e39a7ec..71f744b0 100644
--- 
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
+++ 
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.connect.activemq.pattern;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -47,7 +48,7 @@ public class PatternProcessor {
         this.config = replicator.getConfig();
     }
 
-    public void start() throws Exception {
+    public void start(long offset) throws Exception {
         if (!StringUtils.equals("topic", config.getDestinationType())
             && !StringUtils.equals("queue", config.getDestinationType())) {
             // RuntimeException is caught by DataConnectException
@@ -74,6 +75,13 @@ public class PatternProcessor {
         consumer.setMessageListener(new MessageListener() {
             @Override
             public void onMessage(Message message) {
+                try {
+                    if (Long.valueOf(message.getJMSMessageID().split(":")[5]) 
< offset) {
+                        return;
+                    }
+                } catch (JMSException e) {
+                    throw new RuntimeException("parse JMSMessageId failed", e);
+                }
                 replicator.commit(message, true);
             }
         });
diff --git 
a/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
 
b/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
index b94237ea..0573bf79 100644
--- 
a/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
+++ 
b/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
@@ -51,7 +51,7 @@ public class ReplicatorTest {
 
     @Test(expected = RuntimeException.class)
     public void startTest() throws Exception {
-        replicator.start();
+        replicator.start(0L);
     }
 
     @Test
diff --git 
a/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
 
b/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
index 22b301c5..115f668f 100644
--- 
a/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
+++ 
b/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
@@ -29,17 +29,6 @@ public class ActivemqConnectorTest {
 
     ActivemqSourceConnector connector = new ActivemqSourceConnector();
 
-    @Test
-    public void verifyAndSetConfigTest() {
-        KeyValue keyValue = new DefaultKeyValue();
-
-        for (String requestKey : Config.REQUEST_CONFIG) {
-            assertEquals(connector.verifyAndSetConfig(keyValue), "Request 
config key: " + requestKey);
-            keyValue.put(requestKey, requestKey);
-        }
-        assertEquals(connector.verifyAndSetConfig(keyValue), "");
-    }
-
     @Test
     public void taskClassTest() {
         assertEquals(connector.taskClass(), ActivemqSourceTask.class);
@@ -47,12 +36,12 @@ public class ActivemqConnectorTest {
 
     @Test
     public void taskConfigsTest() {
-        assertEquals(connector.taskConfigs().get(0), null);
+        assertEquals(connector.taskConfigs(2).get(0), null);
         KeyValue keyValue = new DefaultKeyValue();
         for (String requestKey : Config.REQUEST_CONFIG) {
             keyValue.put(requestKey, requestKey);
         }
-        connector.verifyAndSetConfig(keyValue);
-        assertEquals(connector.taskConfigs().get(0), keyValue);
+        connector.start(keyValue);
+        assertEquals(connector.taskConfigs(2), keyValue);
     }
 }
diff --git 
a/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java
 
b/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java
index 049496da..dbecbe43 100644
--- 
a/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java
+++ 
b/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java
@@ -17,13 +17,12 @@
 
 package org.apache.rocketmq.connect.activemq.connector;
 
+import io.openmessaging.connector.api.data.ConnectRecord;
 import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
@@ -54,7 +53,6 @@ import org.mockito.Mockito;
 import com.alibaba.fastjson.JSON;
 
 import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.SourceDataEntry;
 import io.openmessaging.internal.DefaultKeyValue;
 
 public class ActivemqSourceTaskTest {
@@ -80,7 +78,7 @@ public class ActivemqSourceTaskTest {
         connection.close();
     }
 
-    //@Test
+    @Test
     public void test() throws InterruptedException {
         KeyValue kv = new DefaultKeyValue();
         kv.put("activemqUrl", "tcp://112.74.48.251:6166");
@@ -89,7 +87,7 @@ public class ActivemqSourceTaskTest {
         ActivemqSourceTask task = new ActivemqSourceTask();
         task.start(kv);
         for (int i = 0; i < 20; ) {
-            Collection<SourceDataEntry> sourceDataEntry = task.poll();
+            Collection<ConnectRecord> sourceDataEntry = task.poll();
             i = i + sourceDataEntry.size();
             System.out.println(sourceDataEntry);
         }
@@ -115,7 +113,7 @@ public class ActivemqSourceTaskTest {
         config.set(task, new Config());
 
         queue.put(textMessage);
-        Collection<SourceDataEntry> list = task.poll();
+        Collection<ConnectRecord> list = task.poll();
         Assert.assertEquals(list.size(), 1);
 
         list = task.poll();
@@ -129,24 +127,24 @@ public class ActivemqSourceTaskTest {
         ActivemqSourceTask task = new ActivemqSourceTask();
         TextMessage textMessage = new ActiveMQTextMessage();
         textMessage.setText(value);
-        ByteBuffer buffer = task.getMessageContent(textMessage);
-        Assert.assertEquals(new String(buffer.array()), textMessage.getText());
+        String content = task.getMessageContent(textMessage);
+        Assert.assertEquals(content, textMessage.getText());
 
         ObjectMessage objectMessage = new ActiveMQObjectMessage();
         objectMessage.setObject(value);
-        buffer = task.getMessageContent(objectMessage);
-        Assert.assertEquals(new String(buffer.array()), "\"" + 
objectMessage.getObject().toString() + "\"");
+        content = task.getMessageContent(objectMessage);
+        Assert.assertEquals(content, "\"" + 
objectMessage.getObject().toString() + "\"");
 
         BytesMessage bytes = new ActiveMQBytesMessage();
         bytes.writeBytes(value.getBytes());
         bytes.reset();
-        buffer = task.getMessageContent(bytes);
-        Assert.assertEquals(new String(buffer.array()), value);
+        content = task.getMessageContent(bytes);
+        Assert.assertEquals(content, value);
 
         MapMessage mapMessage = new ActiveMQMapMessage();
         mapMessage.setString("hello", "rocketmq");
-        buffer = task.getMessageContent(mapMessage);
-        Map<String, String> map = JSON.parseObject(buffer.array(), Map.class);
+        content = task.getMessageContent(mapMessage);
+        Map<String, String> map = JSON.parseObject(content, Map.class);
         Assert.assertEquals(map.get("hello"), "rocketmq");
         Assert.assertEquals(map.size(), 1);
 
@@ -157,8 +155,8 @@ public class ActivemqSourceTaskTest {
         }
         streamMessage.writeBytes(valueTwo.getBytes());
         streamMessage.reset();
-        buffer = task.getMessageContent(streamMessage);
-        Assert.assertEquals(new String(buffer.array()), valueTwo);
+        content = task.getMessageContent(streamMessage);
+        Assert.assertEquals(content, valueTwo);
 
         task.getMessageContent(null);
     }

Reply via email to