This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 666376814f146732aec5be3251e4f75f170654cb Author: laohu <[email protected]> AuthorDate: Thu Jun 6 19:36:31 2019 +0800 change --- .../connect/activemq/connector/ActivemqTask.java | 6 +-- .../activemq/connector/ActivemqTaskTest.java | 63 ++++++++++++++++++++++ 2 files changed, 64 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java index 9743a19..c2950fa 100644 --- a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java +++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java @@ -52,11 +52,7 @@ public class ActivemqTask extends SourceTask { try { Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS); - DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(null); - dataEntryBuilder.timestamp(System.currentTimeMillis()); - SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry( - ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8")), - ByteBuffer.wrap(JSON.toJSONBytes(message))); + SourceDataEntry sourceDataEntry = new SourceDataEntry(ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8")), ByteBuffer.wrap(JSON.toJSONBytes(message)), System.currentTimeMillis(), null, config.getDestinationName(), null, null); res.add(sourceDataEntry); } catch (Exception e) { diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java new file mode 100644 index 0000000..5dcb6a7 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java @@ -0,0 +1,63 @@ +package org.apache.rocketmq.connect.activemq.connector; + +import java.util.Collection; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.rocketmq.connect.activemq.Config; +import org.junit.Before; +import org.junit.Test; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.SourceDataEntry; +import io.openmessaging.internal.DefaultKeyValue; + +public class ActivemqTaskTest { + + @Before + public void befores() throws JMSException, InterruptedException { + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://112.74.48.251:6166"); + Connection connection = connectionFactory.createConnection(); + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue("test-queue"); + + MessageProducer producer = session.createProducer(destination); + + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + for (int i = 0; i < 20; i++) { + TextMessage message = session.createTextMessage("hello 我是消息:" + i); + producer.send(message); + } + + session.commit(); + session.close(); + connection.close(); + } + + @Test + public void test() throws InterruptedException { + KeyValue kv = new DefaultKeyValue(); + kv.put("activemqUrl", "tcp://112.74.48.251:6166"); + kv.put("destinationType", "queue"); + kv.put("destinationName", "test-queue"); + ActivemqTask task = new ActivemqTask(); + task.start(kv); + for(int i = 0 ; i < 20;) { + Collection<SourceDataEntry> sourceDataEntry = task.poll(); + i = i+sourceDataEntry.size(); + System.out.println(sourceDataEntry); + } + Thread.sleep(20000); + + } +}
