This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new ca797e33 [ISSUE #290] activemq source connector adapt to new api (#291)
ca797e33 is described below
commit ca797e330ffa8caed2ee7d397180bbf2df2c1a86
Author: Oliver <[email protected]>
AuthorDate: Tue Sep 27 13:41:08 2022 +0800
[ISSUE #290] activemq source connector adapt to new api (#291)
* [ISSUE #290] activemq source connector adapt to new api
* set extensions
* set position
---
connectors/rocketmq-connect-activemq/README-CN.md | 31 ++++--
connectors/rocketmq-connect-activemq/README.md | 31 ++++--
connectors/rocketmq-connect-activemq/pom.xml | 22 ++++-
.../apache/rocketmq/connect/activemq/Config.java | 8 ++
.../rocketmq/connect/activemq/Replicator.java | 4 +-
.../connector/ActivemqSourceConnector.java | 33 ++-----
.../activemq/connector/ActivemqSourceTask.java | 109 ++++++++++++++-------
.../connect/activemq/pattern/PatternProcessor.java | 10 +-
.../rocketmq/connect/activemq/ReplicatorTest.java | 2 +-
.../activemq/connector/ActivemqConnectorTest.java | 17 +---
.../activemq/connector/ActivemqSourceTaskTest.java | 30 +++---
11 files changed, 189 insertions(+), 108 deletions(-)
diff --git a/connectors/rocketmq-connect-activemq/README-CN.md
b/connectors/rocketmq-connect-activemq/README-CN.md
index be036839..21cf9f74 100644
--- a/connectors/rocketmq-connect-activemq/README-CN.md
+++ b/connectors/rocketmq-connect-activemq/README-CN.md
@@ -1,16 +1,31 @@
##### ActiveConnector完全限定名
org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
+**activemq-source-connector** 启动
+
+```
+POST http://${runtime-ip}:${runtime-port}/connectors/activeSourceConnector
+{
+
"connector.class":"org.apache.rocketmq.connect.activemq.connector.ActivemqSourceConnector",
+ "max-task":"2",
+ "activemqUrl":"tcp://localhost:61616",
+ "destinationType":"queue",
+ "destinationName":"testQueue",
+ "connect.topicname":"targetTopic",
+
"value-converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+
"key-converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}
+```
##### 配置参数
参数 | 作用 | 是否必填 | 默认值
---|--- |--- | ---
-activemq.url | activemq ip与端口号 | 是 | 无
-activemq.username | 用户名 | 否 | 无
-activemq.password| 密码 | 否 | 无
-jms.destination.name | 读取的队列或者主题名 | 是 | 无
-jms.destination.type | 读取的类型:queue(队列)或者topic(主题) | 是 | 无
-jms.message.selector | 过滤器 | 否 |无
-jms.session.acknowledge.mode | 消息确认 | 否 | Session.AUTO_ACKNOWLEDGE
-jms.session.transacted | 是否是事务会话 | 否 | false
+activemqUrl | activemq ip与端口号 | 是 | 无
+activemqUsername | 用户名 | 否 | 无
+activemqPassword| 密码 | 否 | 无
+destinationName | 读取的队列或者主题名 | 是 | 无
+destinationType | 读取的类型:queue(队列)或者topic(主题) | 是 | 无
+messageSelector | 过滤器 | 否 |无
+sessionAcknowledgeMode | 消息确认 | 否 | Session.AUTO_ACKNOWLEDGE
+sessionTransacted | 是否是事务会话 | 否 | false
diff --git a/connectors/rocketmq-connect-activemq/README.md
b/connectors/rocketmq-connect-activemq/README.md
index e15149e1..52aea0f2 100644
--- a/connectors/rocketmq-connect-activemq/README.md
+++ b/connectors/rocketmq-connect-activemq/README.md
@@ -1,16 +1,31 @@
##### ActiveConnector fully-qualified name
org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
+**activemq-source-connector** start
+
+```
+POST http://${runtime-ip}:${runtime-port}/connectors/activeSourceConnector
+{
+
"connector.class":"org.apache.rocketmq.connect.activemq.connector.ActivemqSourceConnector",
+ "max-task":"3",
+ "activemqUrl":"tcp://localhost:61616",
+ "destinationType":"queue",
+ "destinationName":"testQueue",
+ "connect.topicname":"targetTopic",
+
"value-converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+
"key-converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}
+```
##### parameter configuration
parameter | effect | required |default
---|--- |--- | ---
-activemq.url | The URL of the ActiveMQ broker | yes | null
-activemq.username | The username to use when connecting to ActiveMQ | no |
null
-activemq.password| The password to use when connecting to ActiveMQ | no |
null
-jms.destination.name | The name of the JMS destination (queue or topic) to
read from | yes | null
-jms.destination.type | The type of JMS destination, which is either queue or
topic | yes | null
-jms.message.selector | The message selector that should be applied to messages
in the destination | no | null
-jms.session.acknowledge.mode | The acknowledgement mode for the JMS Session |
null | Session.AUTO_ACKNOWLEDGE
-jms.session.transacted | Flag to determine if the session is transacted and
the session completely controls. the message delivery by either committing or
rolling back the session | null | false
+activemqUrl | The URL of the ActiveMQ broker | yes | null
+activemqUsername | The username to use when connecting to ActiveMQ | no | null
+activemqPassword| The password to use when connecting to ActiveMQ | no |
null
+destinationName | The name of the JMS destination (queue or topic) to read
from | yes | null
+destinationType | The type of JMS destination, which is either queue or topic
| yes | null
+messageSelector | The message selector that should be applied to messages in
the destination | no | null
+sessionAcknowledgeMode | The acknowledgement mode for the JMS Session | null
| Session.AUTO_ACKNOWLEDGE
+sessionTransacted | Flag to determine if the session is transacted and the
session completely controls. the message delivery by either committing or
rolling back the session | null | false
diff --git a/connectors/rocketmq-connect-activemq/pom.xml
b/connectors/rocketmq-connect-activemq/pom.xml
index b9b7bb92..738bf7cf 100644
--- a/connectors/rocketmq-connect-activemq/pom.xml
+++ b/connectors/rocketmq-connect-activemq/pom.xml
@@ -139,6 +139,24 @@
</excludes>
</configuration>
</plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
@@ -164,7 +182,7 @@
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-connector</artifactId>
- <version>0.1.0-beta</version>
+ <version>0.1.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
@@ -199,7 +217,7 @@
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
- <version>5.9.0</version>
+ <version>5.16.5</version>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
diff --git
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
index 30f898df..bc42576e 100644
---
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
+++
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
@@ -34,6 +34,14 @@ public class Config {
}
};
+ public static final String DESTINATION_TYPE = "destinationType";
+
+ public static final String DESTINATION_NAME = "destinationName";
+
+ public static final String MESSAGE = "message";
+
+ public static final String POSITION = "position";
+
public String activemqUrl;
public String activemqUsername;
diff --git
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
index e0ebe125..2959f646 100644
---
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
+++
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
@@ -39,9 +39,9 @@ public class Replicator {
this.config = config;
}
- public void start() throws Exception {
+ public void start(long offset) throws Exception {
processor = new PatternProcessor(this);
- processor.start();
+ processor.start(offset);
LOGGER.info("Replicator start succeed");
}
diff --git
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java
index 7e6290bc..134197b0 100644
---
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java
+++
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java
@@ -18,8 +18,8 @@
package org.apache.rocketmq.connect.activemq.connector;
import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.Task;
-import io.openmessaging.connector.api.source.SourceConnector;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.connect.activemq.Config;
@@ -28,34 +28,27 @@ public class ActivemqSourceConnector extends
SourceConnector {
private KeyValue config;
- @Override
- public String verifyAndSetConfig(KeyValue config) {
+
+ @Override public void start(KeyValue config) {
for (String requestKey : Config.REQUEST_CONFIG) {
if (!config.containsKey(requestKey)) {
- return "Request config key: " + requestKey;
+ throw new RuntimeException("Request config key: " +
requestKey);
}
}
this.config = config;
- return "";
- }
-
- @Override
- public void start() {
-
}
@Override
public void stop() {
-
- }
-
- @Override public void pause() {
-
+ this.config = null;
}
- @Override public void resume() {
+ @Override public List<KeyValue> taskConfigs(int maxTasks) {
+ List<KeyValue> config = new ArrayList<>();
+ config.add(this.config);
+ return config;
}
@Override
@@ -63,10 +56,4 @@ public class ActivemqSourceConnector extends SourceConnector
{
return ActivemqSourceTask.class;
}
- @Override
- public List<KeyValue> taskConfigs() {
- List<KeyValue> config = new ArrayList<>();
- config.add(this.config);
- return config;
- }
}
diff --git
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
index c0092741..600e7b97 100644
---
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
+++
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
@@ -19,14 +19,17 @@ package org.apache.rocketmq.connect.activemq.connector;
import com.alibaba.fastjson.JSON;
import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.EntryType;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import io.openmessaging.connector.api.exception.DataConnectException;
-import io.openmessaging.connector.api.source.SourceTask;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
import java.io.ByteArrayOutputStream;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
@@ -40,7 +43,6 @@ import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.rocketmq.connect.activemq.Config;
-import org.apache.rocketmq.connect.activemq.ErrorCode;
import org.apache.rocketmq.connect.activemq.Replicator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,17 +55,13 @@ public class ActivemqSourceTask extends SourceTask {
private Config config;
- private ByteBuffer sourcePartition;
-
@Override
- public Collection<SourceDataEntry> poll() {
- List<SourceDataEntry> res = new ArrayList<>();
+ public List<ConnectRecord> poll() {
+ List<ConnectRecord> res = new ArrayList<>();
try {
Message message = replicator.getQueue().poll(1000,
TimeUnit.MILLISECONDS);
if (message != null) {
- Object[] payload = new Object[] {config.getDestinationType(),
config.getDestinationName(), getMessageContent(message)};
- SourceDataEntry sourceDataEntry = new
SourceDataEntry(sourcePartition, null, System.currentTimeMillis(),
EntryType.CREATE, null, null, payload);
- res.add(sourceDataEntry);
+ res.add(message2ConnectRecord(message));
}
} catch (Exception e) {
log.error("activemq task poll error, current config:" +
JSON.toJSONString(config), e);
@@ -76,12 +74,18 @@ public class ActivemqSourceTask extends SourceTask {
try {
this.config = new Config();
this.config.load(props);
- this.sourcePartition =
ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8"));
this.replicator = new Replicator(config);
- this.replicator.start();
+ final RecordOffset recordOffset =
this.sourceTaskContext.offsetStorageReader().readOffset(buildRecordPartition());
+ long offset = 0L;
+ if (recordOffset != null) {
+ final Object position =
recordOffset.getOffset().get(Config.POSITION);
+ if (position != null) {
+ offset = Long.valueOf(position.toString());
+ }
+ }
+ this.replicator.start(offset);
} catch (Exception e) {
log.error("activemq task start failed.", e);
- throw new DataConnectException(ErrorCode.START_ERROR_CODE,
e.getMessage(), e);
}
}
@@ -91,29 +95,21 @@ public class ActivemqSourceTask extends SourceTask {
replicator.stop();
} catch (Exception e) {
log.error("activemq task stop failed.", e);
- throw new DataConnectException(ErrorCode.STOP_ERROR_CODE,
e.getMessage(), e);
}
}
- @Override public void pause() {
- }
-
- @Override public void resume() {
-
- }
@SuppressWarnings("unchecked")
- public ByteBuffer getMessageContent(Message message) throws JMSException {
- byte[] data = null;
+ public String getMessageContent(Message message) throws JMSException {
+ String data = null;
if (message instanceof TextMessage) {
- data = ((TextMessage) message).getText().getBytes();
+ data = ((TextMessage) message).getText();
} else if (message instanceof ObjectMessage) {
- data = JSON.toJSONBytes(((ObjectMessage) message).getObject());
+ data = JSON.toJSONString(((ObjectMessage) message).getObject());
} else if (message instanceof BytesMessage) {
BytesMessage bytesMessage = (BytesMessage) message;
- data = new byte[(int) bytesMessage.getBodyLength()];
- bytesMessage.readBytes(data);
+ data = bytesMessage.toString();
} else if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
Map<String, Object> map = new HashMap<>();
@@ -122,7 +118,7 @@ public class ActivemqSourceTask extends SourceTask {
String name = names.nextElement().toString();
map.put(name, mapMessage.getObject(name));
}
- data = JSON.toJSONBytes(map);
+ data = JSON.toJSONString(map);
} else if (message instanceof StreamMessage) {
StreamMessage streamMessage = (StreamMessage) message;
ByteArrayOutputStream bis = new ByteArrayOutputStream();
@@ -131,11 +127,58 @@ public class ActivemqSourceTask extends SourceTask {
while ((i = streamMessage.readBytes(by)) != -1) {
bis.write(by, 0, i);
}
- data = bis.toByteArray();
+ data = bis.toString();
} else {
// The exception is printed and does not need to be written as a
DataConnectException
throw new RuntimeException("message type exception");
}
- return ByteBuffer.wrap(data);
+ return data;
+ }
+
+ private ConnectRecord message2ConnectRecord(Message message) throws
JMSException {
+ Schema schema = SchemaBuilder.struct().name("activemq").build();
+ final List<Field> fields = buildFields();
+ schema.setFields(fields);
+ final ConnectRecord connectRecord = new
ConnectRecord(buildRecordPartition(),
+ buildRecordOffset(message),
+ System.currentTimeMillis(),
+ schema,
+ buildPayLoad(fields, message, schema));
+ connectRecord.setExtensions(buildExtendFiled());
+ return connectRecord;
+ }
+
+ private RecordOffset buildRecordOffset(Message message) throws
JMSException {
+ Map<String, Long> offsetMap = new HashMap<>();
+ offsetMap.put(Config.POSITION,
Long.parseLong(message.getJMSMessageID().split(":")[5]));
+ RecordOffset recordOffset = new RecordOffset(offsetMap);
+ return recordOffset;
+ }
+
+ private RecordPartition buildRecordPartition() {
+ Map<String, String> partitionMap = new HashMap<>();
+ partitionMap.put("partition", "defaultPartition");
+ RecordPartition recordPartition = new RecordPartition(partitionMap);
+ return recordPartition;
+ }
+
+ private List<Field> buildFields() {
+ final Schema stringSchema = SchemaBuilder.string().build();
+ List<Field> fields = new ArrayList<>();
+ fields.add(new Field(0, Config.MESSAGE, stringSchema));
+ return fields;
+ }
+
+ private Struct buildPayLoad(List<Field> fields, Message message, Schema
schema) throws JMSException {
+ Struct payLoad = new Struct(schema);
+ payLoad.put(fields.get(0), getMessageContent(message));
+ return payLoad;
+ }
+
+ private KeyValue buildExtendFiled() {
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(Config.DESTINATION_NAME, config.getDestinationName());
+ keyValue.put(Config.DESTINATION_TYPE, config.getDestinationType());
+ return keyValue;
}
}
diff --git
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
index 6e39a7ec..71f744b0 100644
---
a/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
+++
b/connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.connect.activemq.pattern;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -47,7 +48,7 @@ public class PatternProcessor {
this.config = replicator.getConfig();
}
- public void start() throws Exception {
+ public void start(long offset) throws Exception {
if (!StringUtils.equals("topic", config.getDestinationType())
&& !StringUtils.equals("queue", config.getDestinationType())) {
// RuntimeException is caught by DataConnectException
@@ -74,6 +75,13 @@ public class PatternProcessor {
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
+ try {
+ if (Long.valueOf(message.getJMSMessageID().split(":")[5])
< offset) {
+ return;
+ }
+ } catch (JMSException e) {
+ throw new RuntimeException("parse JMSMessageId failed", e);
+ }
replicator.commit(message, true);
}
});
diff --git
a/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
b/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
index b94237ea..0573bf79 100644
---
a/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
+++
b/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
@@ -51,7 +51,7 @@ public class ReplicatorTest {
@Test(expected = RuntimeException.class)
public void startTest() throws Exception {
- replicator.start();
+ replicator.start(0L);
}
@Test
diff --git
a/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
b/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
index 22b301c5..115f668f 100644
---
a/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
+++
b/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
@@ -29,17 +29,6 @@ public class ActivemqConnectorTest {
ActivemqSourceConnector connector = new ActivemqSourceConnector();
- @Test
- public void verifyAndSetConfigTest() {
- KeyValue keyValue = new DefaultKeyValue();
-
- for (String requestKey : Config.REQUEST_CONFIG) {
- assertEquals(connector.verifyAndSetConfig(keyValue), "Request
config key: " + requestKey);
- keyValue.put(requestKey, requestKey);
- }
- assertEquals(connector.verifyAndSetConfig(keyValue), "");
- }
-
@Test
public void taskClassTest() {
assertEquals(connector.taskClass(), ActivemqSourceTask.class);
@@ -47,12 +36,12 @@ public class ActivemqConnectorTest {
@Test
public void taskConfigsTest() {
- assertEquals(connector.taskConfigs().get(0), null);
+ assertEquals(connector.taskConfigs(2).get(0), null);
KeyValue keyValue = new DefaultKeyValue();
for (String requestKey : Config.REQUEST_CONFIG) {
keyValue.put(requestKey, requestKey);
}
- connector.verifyAndSetConfig(keyValue);
- assertEquals(connector.taskConfigs().get(0), keyValue);
+ connector.start(keyValue);
+ assertEquals(connector.taskConfigs(2), keyValue);
}
}
diff --git
a/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java
b/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java
index 049496da..dbecbe43 100644
---
a/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java
+++
b/connectors/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java
@@ -17,13 +17,12 @@
package org.apache.rocketmq.connect.activemq.connector;
+import io.openmessaging.connector.api.data.ConnectRecord;
import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@@ -54,7 +53,6 @@ import org.mockito.Mockito;
import com.alibaba.fastjson.JSON;
import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.SourceDataEntry;
import io.openmessaging.internal.DefaultKeyValue;
public class ActivemqSourceTaskTest {
@@ -80,7 +78,7 @@ public class ActivemqSourceTaskTest {
connection.close();
}
- //@Test
+ @Test
public void test() throws InterruptedException {
KeyValue kv = new DefaultKeyValue();
kv.put("activemqUrl", "tcp://112.74.48.251:6166");
@@ -89,7 +87,7 @@ public class ActivemqSourceTaskTest {
ActivemqSourceTask task = new ActivemqSourceTask();
task.start(kv);
for (int i = 0; i < 20; ) {
- Collection<SourceDataEntry> sourceDataEntry = task.poll();
+ Collection<ConnectRecord> sourceDataEntry = task.poll();
i = i + sourceDataEntry.size();
System.out.println(sourceDataEntry);
}
@@ -115,7 +113,7 @@ public class ActivemqSourceTaskTest {
config.set(task, new Config());
queue.put(textMessage);
- Collection<SourceDataEntry> list = task.poll();
+ Collection<ConnectRecord> list = task.poll();
Assert.assertEquals(list.size(), 1);
list = task.poll();
@@ -129,24 +127,24 @@ public class ActivemqSourceTaskTest {
ActivemqSourceTask task = new ActivemqSourceTask();
TextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText(value);
- ByteBuffer buffer = task.getMessageContent(textMessage);
- Assert.assertEquals(new String(buffer.array()), textMessage.getText());
+ String content = task.getMessageContent(textMessage);
+ Assert.assertEquals(content, textMessage.getText());
ObjectMessage objectMessage = new ActiveMQObjectMessage();
objectMessage.setObject(value);
- buffer = task.getMessageContent(objectMessage);
- Assert.assertEquals(new String(buffer.array()), "\"" +
objectMessage.getObject().toString() + "\"");
+ content = task.getMessageContent(objectMessage);
+ Assert.assertEquals(content, "\"" +
objectMessage.getObject().toString() + "\"");
BytesMessage bytes = new ActiveMQBytesMessage();
bytes.writeBytes(value.getBytes());
bytes.reset();
- buffer = task.getMessageContent(bytes);
- Assert.assertEquals(new String(buffer.array()), value);
+ content = task.getMessageContent(bytes);
+ Assert.assertEquals(content, value);
MapMessage mapMessage = new ActiveMQMapMessage();
mapMessage.setString("hello", "rocketmq");
- buffer = task.getMessageContent(mapMessage);
- Map<String, String> map = JSON.parseObject(buffer.array(), Map.class);
+ content = task.getMessageContent(mapMessage);
+ Map<String, String> map = JSON.parseObject(content, Map.class);
Assert.assertEquals(map.get("hello"), "rocketmq");
Assert.assertEquals(map.size(), 1);
@@ -157,8 +155,8 @@ public class ActivemqSourceTaskTest {
}
streamMessage.writeBytes(valueTwo.getBytes());
streamMessage.reset();
- buffer = task.getMessageContent(streamMessage);
- Assert.assertEquals(new String(buffer.array()), valueTwo);
+ content = task.getMessageContent(streamMessage);
+ Assert.assertEquals(content, valueTwo);
task.getMessageContent(null);
}