This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit d50ffb8ed6ddef82e24a4071c03a96ee3e4f1c84 Author: laohu <[email protected]> AuthorDate: Tue Jun 11 08:36:05 2019 +0800 message type --- pom.xml | 5 -- .../connect/activemq/connector/ActivemqTask.java | 63 +++++++++++++++++++--- .../activemq/connector/ActivemqTaskTest.java | 5 ++ 3 files changed, 60 insertions(+), 13 deletions(-) diff --git a/pom.xml b/pom.xml index f22c291..6e933bd 100644 --- a/pom.xml +++ b/pom.xml @@ -152,11 +152,6 @@ </dependency> <dependency> <groupId>io.openmessaging</groupId> - <artifactId>openmessaging-connect-runtime</artifactId> - <version>0.0.1-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>io.openmessaging</groupId> <artifactId>openmessaging-connector</artifactId> <version>0.1.0-beta</version> </dependency> diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java index c2950fa..7dead7b 100644 --- a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java +++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java @@ -17,13 +17,24 @@ package org.apache.rocketmq.connect.activemq.connector; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; +import java.util.Enumeration; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.MapMessage; import javax.jms.Message; +import javax.jms.ObjectMessage; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; import org.apache.rocketmq.connect.activemq.Config; import org.apache.rocketmq.connect.activemq.Replicator; @@ -33,7 +44,6 @@ import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; import io.openmessaging.KeyValue; -import io.openmessaging.connector.api.data.DataEntryBuilder; import io.openmessaging.connector.api.data.SourceDataEntry; import io.openmessaging.connector.api.source.SourceTask; @@ -44,17 +54,19 @@ public class ActivemqTask extends SourceTask { private Replicator replicator; private Config config; + + private ByteBuffer sourcePartition; - @Override + + @Override public Collection<SourceDataEntry> poll() { - List<SourceDataEntry> res = new ArrayList<>(); - try { Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS); - SourceDataEntry sourceDataEntry = new SourceDataEntry(ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8")), ByteBuffer.wrap(JSON.toJSONBytes(message)), System.currentTimeMillis(), null, config.getDestinationName(), null, null); - - res.add(sourceDataEntry); + if(message != null) { + SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, getMessageConnent(message), System.currentTimeMillis(), null, config.getDestinationName(), null, null); + res.add(sourceDataEntry); + } } catch (Exception e) { log.error("Mysql task poll error, current config:" + JSON.toJSONString(config), e); } @@ -63,10 +75,10 @@ public class ActivemqTask extends SourceTask { @Override public void start(KeyValue props) { - try { this.config = new Config(); this.config.load(props); + this.sourcePartition = ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8")); this.replicator = new Replicator(config); } catch (Exception e) { log.error("Mysql task start failed.", e); @@ -86,4 +98,39 @@ public class ActivemqTask extends SourceTask { @Override public void resume() { } + + @SuppressWarnings("unchecked") + public ByteBuffer getMessageConnent(Message message ) throws JMSException { + byte[] data = null; + if(message instanceof TextMessage) { + data = ((TextMessage) message).getText().getBytes(); + }else if(message instanceof ObjectMessage) { + data = JSON.toJSONBytes( ((ObjectMessage) message).getObject()); + }else if(message instanceof BytesMessage) { + BytesMessage bytesMessage = (BytesMessage)message; + data = new byte[(int) bytesMessage.getBodyLength()]; + bytesMessage.readBytes(data); + }else if(message instanceof MapMessage) { + MapMessage mapMessage = (MapMessage)message; + Map<String,Object> map = new HashMap<>(); + Enumeration<Object> names = mapMessage.getMapNames(); + while(names.hasMoreElements()) { + String name = names.nextElement().toString(); + map.put(name, mapMessage.getObject(name)); + } + data = JSON.toJSONBytes(map); + }else if(message instanceof StreamMessage) { + StreamMessage streamMessage = (StreamMessage)message; + ByteArrayOutputStream bis = new ByteArrayOutputStream(); + byte[] by = new byte[1024]; + int i = 0; + while( (i = streamMessage.readBytes(by)) != 0) { + bis.write(by, 0, i); + } + data = bis.toByteArray(); + }else { + throw new RuntimeException("message type exception"); + } + return data!=null ? ByteBuffer.wrap( data ) : null; + } } diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java index 5dcb6a7..780cbc9 100644 --- a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java +++ b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java @@ -45,6 +45,11 @@ public class ActivemqTaskTest { } @Test + public void nullTest() { + + } + + @Test public void test() throws InterruptedException { KeyValue kv = new DefaultKeyValue(); kv.put("activemqUrl", "tcp://112.74.48.251:6166");
