This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 508eea1dc30c0b64abcadd9eb6083cb06ee57190
Author: laohu <[email protected]>
AuthorDate: Wed Jun 12 00:53:24 2019 +0800

    add unit test
---
 .../apache/rocketmq/connect/activemq/Config.java   | 276 ++++++++++-----------
 .../rocketmq/connect/activemq/Replicator.java      |  13 +-
 ...Connector.java => ActivemqSourceConnector.java} |   9 +-
 .../{ActivemqTask.java => ActivemqSourceTask.java} | 100 ++++----
 .../connect/activemq/pattern/PatternProcessor.java | 116 +++++----
 .../rocketmq/connect/activemq/ReplicatorTest.java  |  57 +++++
 .../activemq/connector/ActivemqConnectorTest.java  |  41 +++
 .../activemq/connector/ActivemqSourceTaskTest.java | 148 +++++++++++
 .../activemq/connector/ActivemqTaskTest.java       |  68 -----
 9 files changed, 496 insertions(+), 332 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/Config.java 
b/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
index af218c0..30f898d 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
@@ -17,151 +17,147 @@
 
 package org.apache.rocketmq.connect.activemq;
 
+import io.openmessaging.KeyValue;
 import java.lang.reflect.Method;
 import java.util.HashSet;
 import java.util.Set;
-
 import javax.jms.Session;
 
-import io.openmessaging.KeyValue;
-
 public class Config {
 
-       @SuppressWarnings("serial")
-       public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
-               {
-                       add("activemqUrl");
-                       add("destinationType");
-                       add("destinationName");
-               }
-       };
-
-       public String activemqUrl;
-
-       public String activemqUsername;
-
-       public String activemqPassword;
-
-       public String destinationType;
-
-       public String destinationName;
-       
-       public String messageSelector;
-       
-       private Integer sessionAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-
-       private Boolean sessionTransacted = Boolean.FALSE;
-       
-       public void load(KeyValue props) {
-
-               properties2Object(props, this);
-       }
-
-       private void properties2Object(final KeyValue p, final Object object) {
-
-               Method[] methods = object.getClass().getMethods();
-               for (Method method : methods) {
-                       String mn = method.getName();
-                       if (mn.startsWith("set")) {
-                               try {
-                                       String tmp = mn.substring(4);
-                                       String first = mn.substring(3, 4);
-
-                                       String key = first.toLowerCase() + tmp;
-                                       String property = p.getString(key);
-                                       if (property != null) {
-                                               Class<?>[] pt = 
method.getParameterTypes();
-                                               if (pt != null && pt.length > 
0) {
-                                                       String cn = 
pt[0].getSimpleName();
-                                                       Object arg;
-                                                       if (cn.equals("int") || 
cn.equals("Integer")) {
-                                                               arg = 
Integer.parseInt(property);
-                                                       } else if 
(cn.equals("long") || cn.equals("Long")) {
-                                                               arg = 
Long.parseLong(property);
-                                                       } else if 
(cn.equals("double") || cn.equals("Double")) {
-                                                               arg = 
Double.parseDouble(property);
-                                                       } else if 
(cn.equals("boolean") || cn.equals("Boolean")) {
-                                                               arg = 
Boolean.parseBoolean(property);
-                                                       } else if 
(cn.equals("float") || cn.equals("Float")) {
-                                                               arg = 
Float.parseFloat(property);
-                                                       } else if 
(cn.equals("String")) {
-                                                               arg = property;
-                                                       } else {
-                                                               continue;
-                                                       }
-                                                       method.invoke(object, 
arg);
-                                               }
-                                       }
-                               } catch (Throwable ignored) {
-                               }
-                       }
-               }
-       }
-
-       public String getActivemqUrl() {
-               return activemqUrl;
-       }
-
-       public void setActivemqUrl(String activemqUrl) {
-               this.activemqUrl = activemqUrl;
-       }
-
-       public String getActivemqUsername() {
-               return activemqUsername;
-       }
-
-       public void setActivemqUsername(String activemqUsername) {
-               this.activemqUsername = activemqUsername;
-       }
-
-       public String getActivemqPassword() {
-               return activemqPassword;
-       }
-
-       public void setActivemqPassword(String activemqPassword) {
-               this.activemqPassword = activemqPassword;
-       }
-
-       public String getDestinationType() {
-               return destinationType;
-       }
-
-       public void setDestinationType(String destinationType) {
-               this.destinationType = destinationType;
-       }
-
-       public String getDestinationName() {
-               return destinationName;
-       }
-
-       public void setDestinationName(String destinationName) {
-               this.destinationName = destinationName;
-       }
-
-       public String getMessageSelector() {
-               return messageSelector;
-       }
-
-       public void setMessageSelector(String messageSelector) {
-               this.messageSelector = messageSelector;
-       }
-
-       public Integer getSessionAcknowledgeMode() {
-               return sessionAcknowledgeMode;
-       }
-
-       public void setSessionAcknowledgeMode(Integer sessionAcknowledgeMode) {
-               this.sessionAcknowledgeMode = sessionAcknowledgeMode;
-       }
-
-       public Boolean getSessionTransacted() {
-               return sessionTransacted;
-       }
-
-       public void setSessionTransacted(Boolean sessionTransacted) {
-               this.sessionTransacted = sessionTransacted;
-       }
-       
-       
-       
+    @SuppressWarnings("serial")
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+        {
+            add("activemqUrl");
+            add("destinationType");
+            add("destinationName");
+        }
+    };
+
+    public String activemqUrl;
+
+    public String activemqUsername;
+
+    public String activemqPassword;
+
+    public String destinationType;
+
+    public String destinationName;
+
+    public String messageSelector;
+
+    private Integer sessionAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+
+    private Boolean sessionTransacted = Boolean.FALSE;
+
+    public void load(KeyValue props) {
+
+        properties2Object(props, this);
+    }
+
+    private void properties2Object(final KeyValue p, final Object object) {
+
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getString(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) 
{
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || 
cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || 
cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || 
cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+
+    public String getActivemqUrl() {
+        return activemqUrl;
+    }
+
+    public void setActivemqUrl(String activemqUrl) {
+        this.activemqUrl = activemqUrl;
+    }
+
+    public String getActivemqUsername() {
+        return activemqUsername;
+    }
+
+    public void setActivemqUsername(String activemqUsername) {
+        this.activemqUsername = activemqUsername;
+    }
+
+    public String getActivemqPassword() {
+        return activemqPassword;
+    }
+
+    public void setActivemqPassword(String activemqPassword) {
+        this.activemqPassword = activemqPassword;
+    }
+
+    public String getDestinationType() {
+        return destinationType;
+    }
+
+    public void setDestinationType(String destinationType) {
+        this.destinationType = destinationType;
+    }
+
+    public String getDestinationName() {
+        return destinationName;
+    }
+
+    public void setDestinationName(String destinationName) {
+        this.destinationName = destinationName;
+    }
+
+    public String getMessageSelector() {
+        return messageSelector;
+    }
+
+    public void setMessageSelector(String messageSelector) {
+        this.messageSelector = messageSelector;
+    }
+
+    public Integer getSessionAcknowledgeMode() {
+        return sessionAcknowledgeMode;
+    }
+
+    public void setSessionAcknowledgeMode(Integer sessionAcknowledgeMode) {
+        this.sessionAcknowledgeMode = sessionAcknowledgeMode;
+    }
+
+    public Boolean getSessionTransacted() {
+        return sessionTransacted;
+    }
+
+    public void setSessionTransacted(Boolean sessionTransacted) {
+        this.sessionTransacted = sessionTransacted;
+    }
+
 }
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java 
b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
index dd83c37..2b18481 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
@@ -19,9 +19,7 @@ package org.apache.rocketmq.connect.activemq;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-
 import javax.jms.Message;
-
 import org.apache.rocketmq.connect.activemq.pattern.PatternProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,11 +29,11 @@ public class Replicator {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(Replicator.class);
 
     private PatternProcessor processor;
-    
+
     private Config config;
     private BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
 
-    public Replicator(Config config){
+    public Replicator(Config config) {
         this.config = config;
     }
 
@@ -47,11 +45,12 @@ public class Replicator {
 
         } catch (Exception e) {
             LOGGER.error("Start error.", e);
+            throw new RuntimeException(e);
         }
     }
 
-    public void stop(){
-       processor.stop();
+    public void stop() {
+        processor.stop();
     }
 
     public void commit(Message message, boolean isComplete) {
@@ -59,7 +58,7 @@ public class Replicator {
     }
 
     public Config getConfig() {
-       return this.config;
+        return this.config;
     }
 
     public BlockingQueue<Message> getQueue() {
diff --git 
a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnector.java
 
b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java
similarity index 89%
rename from 
src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnector.java
rename to 
src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java
index 17d3efe..7e6290b 100644
--- 
a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnector.java
+++ 
b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java
@@ -22,18 +22,17 @@ import io.openmessaging.connector.api.Task;
 import io.openmessaging.connector.api.source.SourceConnector;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.rocketmq.connect.activemq.Config;
 
-public class ActivemqConnector extends SourceConnector {
+public class ActivemqSourceConnector extends SourceConnector {
 
     private KeyValue config;
 
     @Override
     public String verifyAndSetConfig(KeyValue config) {
 
-        for(String requestKey : Config.REQUEST_CONFIG){
-            if(!config.containsKey(requestKey)){
+        for (String requestKey : Config.REQUEST_CONFIG) {
+            if (!config.containsKey(requestKey)) {
                 return "Request config key: " + requestKey;
             }
         }
@@ -61,7 +60,7 @@ public class ActivemqConnector extends SourceConnector {
 
     @Override
     public Class<? extends Task> taskClass() {
-        return ActivemqTask.class;
+        return ActivemqSourceTask.class;
     }
 
     @Override
diff --git 
a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
 
b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
similarity index 55%
rename from 
src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
rename to 
src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
index 7dead7b..2140b5f 100644
--- 
a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
+++ 
b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
@@ -17,7 +17,10 @@
 
 package org.apache.rocketmq.connect.activemq.connector;
 
-import java.io.ByteArrayInputStream;
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.source.SourceTask;
 import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -27,7 +30,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-
 import javax.jms.BytesMessage;
 import javax.jms.JMSException;
 import javax.jms.MapMessage;
@@ -35,40 +37,32 @@ import javax.jms.Message;
 import javax.jms.ObjectMessage;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
-
 import org.apache.rocketmq.connect.activemq.Config;
 import org.apache.rocketmq.connect.activemq.Replicator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.fastjson.JSON;
-
-import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import io.openmessaging.connector.api.source.SourceTask;
-
-public class ActivemqTask extends SourceTask {
+public class ActivemqSourceTask extends SourceTask {
 
-    private static final Logger log = 
LoggerFactory.getLogger(ActivemqTask.class);
+    private static final Logger log = 
LoggerFactory.getLogger(ActivemqSourceTask.class);
 
     private Replicator replicator;
 
     private Config config;
-    
+
     private ByteBuffer sourcePartition;
 
-    
-       @Override
+    @Override
     public Collection<SourceDataEntry> poll() {
         List<SourceDataEntry> res = new ArrayList<>();
         try {
-               Message message = replicator.getQueue().poll(1000, 
TimeUnit.MILLISECONDS);
-               if(message != null) {                   
-                       SourceDataEntry sourceDataEntry = new 
SourceDataEntry(sourcePartition, getMessageConnent(message), 
System.currentTimeMillis(), null, config.getDestinationName(), null, null);
-                       res.add(sourceDataEntry);
-               }
+            Message message = replicator.getQueue().poll(1000, 
TimeUnit.MILLISECONDS);
+            if (message != null) {
+                SourceDataEntry sourceDataEntry = new 
SourceDataEntry(sourcePartition, getMessageConnent(message), 
System.currentTimeMillis(), null, config.getDestinationName(), null, null);
+                res.add(sourceDataEntry);
+            }
         } catch (Exception e) {
-            log.error("Mysql task poll error, current config:" + 
JSON.toJSONString(config), e);
+            log.error("activemq task poll error, current config:" + 
JSON.toJSONString(config), e);
         }
         return res;
     }
@@ -98,39 +92,39 @@ public class ActivemqTask extends SourceTask {
     @Override public void resume() {
 
     }
-    
+
     @SuppressWarnings("unchecked")
-    public ByteBuffer getMessageConnent(Message message ) throws JMSException {
-       byte[] data = null;
-               if(message instanceof TextMessage) {
-                       data = ((TextMessage) message).getText().getBytes();
-               }else if(message instanceof ObjectMessage) {
-                       data = JSON.toJSONBytes( ((ObjectMessage) 
message).getObject());
-               }else if(message instanceof BytesMessage) {
-                       BytesMessage bytesMessage = (BytesMessage)message;
-                       data = new byte[(int) bytesMessage.getBodyLength()];
-                       bytesMessage.readBytes(data);
-               }else if(message instanceof MapMessage) {
-                       MapMessage mapMessage = (MapMessage)message;
-                       Map<String,Object> map = new HashMap<>();
-                       Enumeration<Object> names = mapMessage.getMapNames();
-                       while(names.hasMoreElements()) {
-                               String name = names.nextElement().toString();
-                               map.put(name, mapMessage.getObject(name));
-                       }
-                       data = JSON.toJSONBytes(map);
-               }else if(message instanceof StreamMessage) {
-                       StreamMessage streamMessage = (StreamMessage)message;
-                       ByteArrayOutputStream bis = new ByteArrayOutputStream();
-                       byte[] by = new byte[1024];
-                       int i = 0;
-                       while( (i = streamMessage.readBytes(by)) != 0) {
-                               bis.write(by, 0, i);
-                       }
-                       data = bis.toByteArray();
-               }else {
-                       throw new RuntimeException("message type exception");
-               }
-               return data!=null ? ByteBuffer.wrap( data ) : null;
+    public ByteBuffer getMessageConnent(Message message) throws JMSException {
+        byte[] data = null;
+        if (message instanceof TextMessage) {
+            data = ((TextMessage) message).getText().getBytes();
+        } else if (message instanceof ObjectMessage) {
+            data = JSON.toJSONBytes(((ObjectMessage) message).getObject());
+        } else if (message instanceof BytesMessage) {
+            BytesMessage bytesMessage = (BytesMessage) message;
+            data = new byte[(int) bytesMessage.getBodyLength()];
+            bytesMessage.readBytes(data);
+        } else if (message instanceof MapMessage) {
+            MapMessage mapMessage = (MapMessage) message;
+            Map<String, Object> map = new HashMap<>();
+            Enumeration<Object> names = mapMessage.getMapNames();
+            while (names.hasMoreElements()) {
+                String name = names.nextElement().toString();
+                map.put(name, mapMessage.getObject(name));
+            }
+            data = JSON.toJSONBytes(map);
+        } else if (message instanceof StreamMessage) {
+            StreamMessage streamMessage = (StreamMessage) message;
+            ByteArrayOutputStream bis = new ByteArrayOutputStream();
+            byte[] by = new byte[1024];
+            int i = 0;
+            while ((i = streamMessage.readBytes(by)) != -1) {
+                bis.write(by, 0, i);
+            }
+            data = bis.toByteArray();
+        } else {
+            throw new RuntimeException("message type exception");
+        }
+        return data != null ? ByteBuffer.wrap(data) : null;
     }
 }
diff --git 
a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
 
b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
index b26bfb9..60a34cf 100644
--- 
a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
+++ 
b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
@@ -8,7 +8,6 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Session;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.connect.activemq.Config;
@@ -16,63 +15,62 @@ import org.apache.rocketmq.connect.activemq.Replicator;
 
 public class PatternProcessor {
 
-       private Replicator replicator;
-       
-       private Config config;
-       
-       Connection connection;
-       
-       Session session;
-       
-       MessageConsumer consumer;
-       
-       public PatternProcessor(Replicator replicator) {
-               this.replicator = replicator;
-               this.config = replicator.getConfig();
-       }
-       
-       public void start() {
-               if(!StringUtils.equals("topic", 
config.getDestinationType())&&!StringUtils.equals("queue", 
config.getDestinationType())) {
-                       throw new RuntimeException("destination type is 
incorrectness");
-               }
-               
-               try {
-                  ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(config.getActivemqUrl());
-                  
-                  if(StringUtils.isNotBlank(config.getActivemqUsername()) && 
StringUtils.isNotBlank(config.getActivemqPassword()) ) {
-                  connection = 
connectionFactory.createConnection(config.getActivemqUsername() , 
config.getActivemqPassword());
-                  }else {
-                          connection = connectionFactory.createConnection();
-                  }
-               connection.start();
-               Session session = 
connection.createSession(config.getSessionTransacted(), 
config.getSessionAcknowledgeMode());
-               Destination destination = null;
-               if(StringUtils.equals("topic", config.getDestinationType())) {
-                       destination = 
session.createTopic(config.getDestinationName());
-               }else if(StringUtils.equals("queue", 
config.getDestinationType())){
-                       destination = 
session.createQueue(config.getDestinationName());
-               }
-               consumer = session.createConsumer(destination, 
config.getMessageSelector());
-               consumer.setMessageListener(new MessageListener() {
-                   @Override
-                   public void onMessage(Message message) {
-                       replicator.commit(message, true);
-                   }
-               });
-               }catch(Exception e) {
-                       throw new RuntimeException(e);
-               }
-       }
-       
-       public void stop() {
+    private Replicator replicator;
+
+    private Config config;
+
+    Connection connection;
+
+    Session session;
+
+    MessageConsumer consumer;
+
+    public PatternProcessor(Replicator replicator) {
+        this.replicator = replicator;
+        this.config = replicator.getConfig();
+    }
+
+    public void start() {
+        if (!StringUtils.equals("topic", config.getDestinationType()) && 
!StringUtils.equals("queue", config.getDestinationType())) {
+            throw new RuntimeException("destination type is incorrectness");
+        }
+
+        try {
+            ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(config.getActivemqUrl());
+
+            if (StringUtils.isNotBlank(config.getActivemqUsername()) && 
StringUtils.isNotBlank(config.getActivemqPassword())) {
+                connection = 
connectionFactory.createConnection(config.getActivemqUsername(), 
config.getActivemqPassword());
+            } else {
+                connection = connectionFactory.createConnection();
+            }
+            connection.start();
+            Session session = 
connection.createSession(config.getSessionTransacted(), 
config.getSessionAcknowledgeMode());
+            Destination destination = null;
+            if (StringUtils.equals("topic", config.getDestinationType())) {
+                destination = session.createTopic(config.getDestinationName());
+            } else if (StringUtils.equals("queue", 
config.getDestinationType())) {
+                destination = session.createQueue(config.getDestinationName());
+            }
+            consumer = session.createConsumer(destination, 
config.getMessageSelector());
+            consumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    replicator.commit(message, true);
+                }
+            });
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void stop() {
         try {
-               consumer.close();
-               session.close();
-                       connection.close();
-               } catch (JMSException e) {
-                       throw new RuntimeException(e);
-               }
-       }
-       
- 
+            consumer.close();
+            session.close();
+            connection.close();
+        } catch (JMSException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
 }
diff --git 
a/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java 
b/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
new file mode 100644
index 0000000..909a5d7
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
@@ -0,0 +1,57 @@
+package org.apache.rocketmq.connect.activemq;
+
+import java.lang.reflect.Field;
+
+import javax.jms.Message;
+
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.rocketmq.connect.activemq.pattern.PatternProcessor;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.junit.Assert;
+
+public class ReplicatorTest {
+
+    Replicator replicator;
+
+    PatternProcessor patternProcessor;
+
+    Config config;
+
+    @Before
+    public void before() throws IllegalArgumentException, 
IllegalAccessException, NoSuchFieldException, SecurityException {
+        config = new Config();
+        replicator = new Replicator(config);
+
+        patternProcessor = Mockito.mock(PatternProcessor.class);
+
+        Field processor = Replicator.class.getDeclaredField("processor");
+        processor.setAccessible(true);
+        processor.set(replicator, patternProcessor);
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void startTest() {
+        replicator.start();
+    }
+
+    @Test
+    public void stop() {
+        replicator.stop();
+        Mockito.verify(patternProcessor, Mockito.times(1)).stop();
+    }
+
+    @Test
+    public void commitAddGetQueueTest() {
+        Message message = new ActiveMQTextMessage();
+        replicator.commit(message, false);
+        Assert.assertEquals(replicator.getQueue().poll(), message);
+    }
+
+    @Test
+    public void getConfigTest() {
+        Assert.assertEquals(replicator.getConfig(), config);
+    }
+}
diff --git 
a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
 
b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
new file mode 100644
index 0000000..eae1ae6
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
@@ -0,0 +1,41 @@
+package org.apache.rocketmq.connect.activemq.connector;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.rocketmq.connect.activemq.Config;
+import org.junit.Test;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+
+public class ActivemqConnectorTest {
+
+    ActivemqSourceConnector connector = new ActivemqSourceConnector();
+
+    @Test
+    public void verifyAndSetConfigTest() {
+        KeyValue keyValue = new DefaultKeyValue();
+
+        for (String requestKey : Config.REQUEST_CONFIG) {
+            assertEquals(connector.verifyAndSetConfig(keyValue), "Request 
config key: " + requestKey);
+            keyValue.put(requestKey, requestKey);
+        }
+        assertEquals(connector.verifyAndSetConfig(keyValue), "");
+    }
+
+    @Test
+    public void taskClassTest() {
+        assertEquals(connector.taskClass(), ActivemqSourceTask.class);
+    }
+
+    @Test
+    public void taskConfigsTest() {
+        assertEquals(connector.taskConfigs().get(0), null);
+        KeyValue keyValue = new DefaultKeyValue();
+        for (String requestKey : Config.REQUEST_CONFIG) {
+            keyValue.put(requestKey, requestKey);
+        }
+        connector.verifyAndSetConfig(keyValue);
+        assertEquals(connector.taskConfigs().get(0), keyValue);
+    }
+}
diff --git 
a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java
 
b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java
new file mode 100644
index 0000000..2b0821b
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java
@@ -0,0 +1,148 @@
+package org.apache.rocketmq.connect.activemq.connector;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.rocketmq.connect.activemq.Config;
+import org.apache.rocketmq.connect.activemq.Replicator;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.alibaba.fastjson.JSON;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.internal.DefaultKeyValue;
+
+public class ActivemqSourceTaskTest {
+
+    public void befores() throws JMSException, InterruptedException {
+        ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("tcp://112.74.48.251:6166");
+        Connection connection = connectionFactory.createConnection();
+
+        connection.start();
+        Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue("test-queue");
+
+        MessageProducer producer = session.createProducer(destination);
+
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        for (int i = 0; i < 20; i++) {
+            TextMessage message = session.createTextMessage("hello 我是消息:" + i);
+            producer.send(message);
+        }
+
+        session.commit();
+        session.close();
+        connection.close();
+    }
+
+    //@Test
+    public void test() throws InterruptedException {
+        KeyValue kv = new DefaultKeyValue();
+        kv.put("activemqUrl", "tcp://112.74.48.251:6166");
+        kv.put("destinationType", "queue");
+        kv.put("destinationName", "test-queue");
+        ActivemqSourceTask task = new ActivemqSourceTask();
+        task.start(kv);
+        for (int i = 0; i < 20; ) {
+            Collection<SourceDataEntry> sourceDataEntry = task.poll();
+            i = i + sourceDataEntry.size();
+            System.out.println(sourceDataEntry);
+        }
+        Thread.sleep(20000);
+    }
+
+    @Test
+    public void pollTest() throws Exception {
+        ActivemqSourceTask task = new ActivemqSourceTask();
+        TextMessage textMessage = new ActiveMQTextMessage();
+        textMessage.setText("hello rocketmq");
+
+        Replicator replicatorObject = Mockito.mock(Replicator.class);
+        BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
+        Mockito.when(replicatorObject.getQueue()).thenReturn(queue);
+
+        Field replicator = 
ActivemqSourceTask.class.getDeclaredField("replicator");
+        replicator.setAccessible(true);
+        replicator.set(task, replicatorObject);
+
+        Field config = ActivemqSourceTask.class.getDeclaredField("config");
+        config.setAccessible(true);
+        config.set(task, new Config());
+
+        queue.put(textMessage);
+        Collection<SourceDataEntry> list = task.poll();
+        Assert.assertEquals(list.size(), 1);
+
+        list = task.poll();
+        Assert.assertEquals(list.size(), 0);
+
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void getMessageConnentTest() throws JMSException {
+        String value = "hello rocketmq";
+        ActivemqSourceTask task = new ActivemqSourceTask();
+        TextMessage textMessage = new ActiveMQTextMessage();
+        textMessage.setText(value);
+        ByteBuffer buffer = task.getMessageConnent(textMessage);
+        Assert.assertEquals(new String(buffer.array()), textMessage.getText());
+
+        ObjectMessage objectMessage = new ActiveMQObjectMessage();
+        objectMessage.setObject(value);
+        buffer = task.getMessageConnent(objectMessage);
+        Assert.assertEquals(new String(buffer.array()), "\"" + 
objectMessage.getObject().toString() + "\"");
+
+        BytesMessage bytes = new ActiveMQBytesMessage();
+        bytes.writeBytes(value.getBytes());
+        bytes.reset();
+        buffer = task.getMessageConnent(bytes);
+        Assert.assertEquals(new String(buffer.array()), value);
+
+        MapMessage mapMessage = new ActiveMQMapMessage();
+        mapMessage.setString("hello", "rocketmq");
+        buffer = task.getMessageConnent(mapMessage);
+        Map<String, String> map = JSON.parseObject(buffer.array(), Map.class);
+        Assert.assertEquals(map.get("hello"), "rocketmq");
+        Assert.assertEquals(map.size(), 1);
+
+        StreamMessage streamMessage = new ActiveMQStreamMessage();
+        String valueTwo = null;
+        for (int i = 0; i < 200; i++) {
+            valueTwo = valueTwo + value;
+        }
+        streamMessage.writeBytes(valueTwo.getBytes());
+        streamMessage.reset();
+        buffer = task.getMessageConnent(streamMessage);
+        Assert.assertEquals(new String(buffer.array()), valueTwo);
+
+        task.getMessageConnent(null);
+    }
+}
diff --git 
a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java
 
b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java
deleted file mode 100644
index 780cbc9..0000000
--- 
a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.apache.rocketmq.connect.activemq.connector;
-
-import java.util.Collection;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.rocketmq.connect.activemq.Config;
-import org.junit.Before;
-import org.junit.Test;
-
-import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import io.openmessaging.internal.DefaultKeyValue;
-
-public class ActivemqTaskTest {
-
-       @Before
-       public void befores() throws JMSException, InterruptedException {
-               ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("tcp://112.74.48.251:6166");
-               Connection connection = connectionFactory.createConnection();
-
-               connection.start();
-               Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
-               Destination destination = session.createQueue("test-queue");
-
-               MessageProducer producer = session.createProducer(destination);
-
-               producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-               for (int i = 0; i < 20; i++) {
-                       TextMessage message = session.createTextMessage("hello 
我是消息:" + i);
-                       producer.send(message);
-               }
-
-               session.commit();
-               session.close();
-               connection.close();
-       }
-
-       @Test
-       public void nullTest() {
-               
-       }
-       
-       @Test
-       public void test() throws InterruptedException {
-               KeyValue kv = new DefaultKeyValue();
-               kv.put("activemqUrl", "tcp://112.74.48.251:6166");
-               kv.put("destinationType", "queue");
-               kv.put("destinationName", "test-queue");
-               ActivemqTask task = new ActivemqTask();
-               task.start(kv);
-               for(int i = 0 ; i < 20;) {
-                       Collection<SourceDataEntry> sourceDataEntry = 
task.poll();
-                       i = i+sourceDataEntry.size();
-                       System.out.println(sourceDataEntry);
-               }
-               Thread.sleep(20000);
-               
-       }
-}

Reply via email to