This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit d50ffb8ed6ddef82e24a4071c03a96ee3e4f1c84
Author: laohu <[email protected]>
AuthorDate: Tue Jun 11 08:36:05 2019 +0800

    message type
---
 pom.xml                                            |  5 --
 .../connect/activemq/connector/ActivemqTask.java   | 63 +++++++++++++++++++---
 .../activemq/connector/ActivemqTaskTest.java       |  5 ++
 3 files changed, 60 insertions(+), 13 deletions(-)

diff --git a/pom.xml b/pom.xml
index f22c291..6e933bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,11 +152,6 @@
                </dependency>
                <dependency>
                        <groupId>io.openmessaging</groupId>
-                       <artifactId>openmessaging-connect-runtime</artifactId>
-                       <version>0.0.1-SNAPSHOT</version>
-               </dependency>
-               <dependency>
-                       <groupId>io.openmessaging</groupId>
                        <artifactId>openmessaging-connector</artifactId>
                        <version>0.1.0-beta</version>
                </dependency>
diff --git 
a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
 
b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
index c2950fa..7dead7b 100644
--- 
a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
+++ 
b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
@@ -17,13 +17,24 @@
 
 package org.apache.rocketmq.connect.activemq.connector;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
 import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
 
 import org.apache.rocketmq.connect.activemq.Config;
 import org.apache.rocketmq.connect.activemq.Replicator;
@@ -33,7 +44,6 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.fastjson.JSON;
 
 import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.DataEntryBuilder;
 import io.openmessaging.connector.api.data.SourceDataEntry;
 import io.openmessaging.connector.api.source.SourceTask;
 
@@ -44,17 +54,19 @@ public class ActivemqTask extends SourceTask {
     private Replicator replicator;
 
     private Config config;
+    
+    private ByteBuffer sourcePartition;
 
-    @Override
+    
+       @Override
     public Collection<SourceDataEntry> poll() {
-
         List<SourceDataEntry> res = new ArrayList<>();
-
         try {
                Message message = replicator.getQueue().poll(1000, 
TimeUnit.MILLISECONDS);
-            SourceDataEntry sourceDataEntry = new 
SourceDataEntry(ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8")), 
ByteBuffer.wrap(JSON.toJSONBytes(message)), System.currentTimeMillis(), null, 
config.getDestinationName(), null, null);
-            
-            res.add(sourceDataEntry);
+               if(message != null) {                   
+                       SourceDataEntry sourceDataEntry = new 
SourceDataEntry(sourcePartition, getMessageConnent(message), 
System.currentTimeMillis(), null, config.getDestinationName(), null, null);
+                       res.add(sourceDataEntry);
+               }
         } catch (Exception e) {
             log.error("Mysql task poll error, current config:" + 
JSON.toJSONString(config), e);
         }
@@ -63,10 +75,10 @@ public class ActivemqTask extends SourceTask {
 
     @Override
     public void start(KeyValue props) {
-
         try {
             this.config = new Config();
             this.config.load(props);
+            this.sourcePartition = 
ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8"));
             this.replicator = new Replicator(config);
         } catch (Exception e) {
             log.error("Mysql task start failed.", e);
@@ -86,4 +98,39 @@ public class ActivemqTask extends SourceTask {
     @Override public void resume() {
 
     }
+    
+    @SuppressWarnings("unchecked")
+    public ByteBuffer getMessageConnent(Message message ) throws JMSException {
+       byte[] data = null;
+               if(message instanceof TextMessage) {
+                       data = ((TextMessage) message).getText().getBytes();
+               }else if(message instanceof ObjectMessage) {
+                       data = JSON.toJSONBytes( ((ObjectMessage) 
message).getObject());
+               }else if(message instanceof BytesMessage) {
+                       BytesMessage bytesMessage = (BytesMessage)message;
+                       data = new byte[(int) bytesMessage.getBodyLength()];
+                       bytesMessage.readBytes(data);
+               }else if(message instanceof MapMessage) {
+                       MapMessage mapMessage = (MapMessage)message;
+                       Map<String,Object> map = new HashMap<>();
+                       Enumeration<Object> names = mapMessage.getMapNames();
+                       while(names.hasMoreElements()) {
+                               String name = names.nextElement().toString();
+                               map.put(name, mapMessage.getObject(name));
+                       }
+                       data = JSON.toJSONBytes(map);
+               }else if(message instanceof StreamMessage) {
+                       StreamMessage streamMessage = (StreamMessage)message;
+                       ByteArrayOutputStream bis = new ByteArrayOutputStream();
+                       byte[] by = new byte[1024];
+                       int i = 0;
+                       while( (i = streamMessage.readBytes(by)) != 0) {
+                               bis.write(by, 0, i);
+                       }
+                       data = bis.toByteArray();
+               }else {
+                       throw new RuntimeException("message type exception");
+               }
+               return data!=null ? ByteBuffer.wrap( data ) : null;
+    }
 }
diff --git 
a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java
 
b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java
index 5dcb6a7..780cbc9 100644
--- 
a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java
+++ 
b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java
@@ -45,6 +45,11 @@ public class ActivemqTaskTest {
        }
 
        @Test
+       public void nullTest() {
+               
+       }
+       
+       @Test
        public void test() throws InterruptedException {
                KeyValue kv = new DefaultKeyValue();
                kv.put("activemqUrl", "tcp://112.74.48.251:6166");

Reply via email to