This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 508eea1dc30c0b64abcadd9eb6083cb06ee57190 Author: laohu <[email protected]> AuthorDate: Wed Jun 12 00:53:24 2019 +0800 add unit test --- .../apache/rocketmq/connect/activemq/Config.java | 276 ++++++++++----------- .../rocketmq/connect/activemq/Replicator.java | 13 +- ...Connector.java => ActivemqSourceConnector.java} | 9 +- .../{ActivemqTask.java => ActivemqSourceTask.java} | 100 ++++---- .../connect/activemq/pattern/PatternProcessor.java | 116 +++++---- .../rocketmq/connect/activemq/ReplicatorTest.java | 57 +++++ .../activemq/connector/ActivemqConnectorTest.java | 41 +++ .../activemq/connector/ActivemqSourceTaskTest.java | 148 +++++++++++ .../activemq/connector/ActivemqTaskTest.java | 68 ----- 9 files changed, 496 insertions(+), 332 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/Config.java b/src/main/java/org/apache/rocketmq/connect/activemq/Config.java index af218c0..30f898d 100644 --- a/src/main/java/org/apache/rocketmq/connect/activemq/Config.java +++ b/src/main/java/org/apache/rocketmq/connect/activemq/Config.java @@ -17,151 +17,147 @@ package org.apache.rocketmq.connect.activemq; +import io.openmessaging.KeyValue; import java.lang.reflect.Method; import java.util.HashSet; import java.util.Set; - import javax.jms.Session; -import io.openmessaging.KeyValue; - public class Config { - @SuppressWarnings("serial") - public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { - { - add("activemqUrl"); - add("destinationType"); - add("destinationName"); - } - }; - - public String activemqUrl; - - public String activemqUsername; - - public String activemqPassword; - - public String destinationType; - - public String destinationName; - - public String messageSelector; - - private Integer sessionAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; - - private Boolean sessionTransacted = Boolean.FALSE; - - public void load(KeyValue props) { - - properties2Object(props, this); - } - - private void properties2Object(final KeyValue p, final Object object) { - - Method[] methods = object.getClass().getMethods(); - for (Method method : methods) { - String mn = method.getName(); - if (mn.startsWith("set")) { - try { - String tmp = mn.substring(4); - String first = mn.substring(3, 4); - - String key = first.toLowerCase() + tmp; - String property = p.getString(key); - if (property != null) { - Class<?>[] pt = method.getParameterTypes(); - if (pt != null && pt.length > 0) { - String cn = pt[0].getSimpleName(); - Object arg; - if (cn.equals("int") || cn.equals("Integer")) { - arg = Integer.parseInt(property); - } else if (cn.equals("long") || cn.equals("Long")) { - arg = Long.parseLong(property); - } else if (cn.equals("double") || cn.equals("Double")) { - arg = Double.parseDouble(property); - } else if (cn.equals("boolean") || cn.equals("Boolean")) { - arg = Boolean.parseBoolean(property); - } else if (cn.equals("float") || cn.equals("Float")) { - arg = Float.parseFloat(property); - } else if (cn.equals("String")) { - arg = property; - } else { - continue; - } - method.invoke(object, arg); - } - } - } catch (Throwable ignored) { - } - } - } - } - - public String getActivemqUrl() { - return activemqUrl; - } - - public void setActivemqUrl(String activemqUrl) { - this.activemqUrl = activemqUrl; - } - - public String getActivemqUsername() { - return activemqUsername; - } - - public void setActivemqUsername(String activemqUsername) { - this.activemqUsername = activemqUsername; - } - - public String getActivemqPassword() { - return activemqPassword; - } - - public void setActivemqPassword(String activemqPassword) { - this.activemqPassword = activemqPassword; - } - - public String getDestinationType() { - return destinationType; - } - - public void setDestinationType(String destinationType) { - this.destinationType = destinationType; - } - - public String getDestinationName() { - return destinationName; - } - - public void setDestinationName(String destinationName) { - this.destinationName = destinationName; - } - - public String getMessageSelector() { - return messageSelector; - } - - public void setMessageSelector(String messageSelector) { - this.messageSelector = messageSelector; - } - - public Integer getSessionAcknowledgeMode() { - return sessionAcknowledgeMode; - } - - public void setSessionAcknowledgeMode(Integer sessionAcknowledgeMode) { - this.sessionAcknowledgeMode = sessionAcknowledgeMode; - } - - public Boolean getSessionTransacted() { - return sessionTransacted; - } - - public void setSessionTransacted(Boolean sessionTransacted) { - this.sessionTransacted = sessionTransacted; - } - - - + @SuppressWarnings("serial") + public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { + { + add("activemqUrl"); + add("destinationType"); + add("destinationName"); + } + }; + + public String activemqUrl; + + public String activemqUsername; + + public String activemqPassword; + + public String destinationType; + + public String destinationName; + + public String messageSelector; + + private Integer sessionAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; + + private Boolean sessionTransacted = Boolean.FALSE; + + public void load(KeyValue props) { + + properties2Object(props, this); + } + + private void properties2Object(final KeyValue p, final Object object) { + + Method[] methods = object.getClass().getMethods(); + for (Method method : methods) { + String mn = method.getName(); + if (mn.startsWith("set")) { + try { + String tmp = mn.substring(4); + String first = mn.substring(3, 4); + + String key = first.toLowerCase() + tmp; + String property = p.getString(key); + if (property != null) { + Class<?>[] pt = method.getParameterTypes(); + if (pt != null && pt.length > 0) { + String cn = pt[0].getSimpleName(); + Object arg; + if (cn.equals("int") || cn.equals("Integer")) { + arg = Integer.parseInt(property); + } else if (cn.equals("long") || cn.equals("Long")) { + arg = Long.parseLong(property); + } else if (cn.equals("double") || cn.equals("Double")) { + arg = Double.parseDouble(property); + } else if (cn.equals("boolean") || cn.equals("Boolean")) { + arg = Boolean.parseBoolean(property); + } else if (cn.equals("float") || cn.equals("Float")) { + arg = Float.parseFloat(property); + } else if (cn.equals("String")) { + arg = property; + } else { + continue; + } + method.invoke(object, arg); + } + } + } catch (Throwable ignored) { + } + } + } + } + + public String getActivemqUrl() { + return activemqUrl; + } + + public void setActivemqUrl(String activemqUrl) { + this.activemqUrl = activemqUrl; + } + + public String getActivemqUsername() { + return activemqUsername; + } + + public void setActivemqUsername(String activemqUsername) { + this.activemqUsername = activemqUsername; + } + + public String getActivemqPassword() { + return activemqPassword; + } + + public void setActivemqPassword(String activemqPassword) { + this.activemqPassword = activemqPassword; + } + + public String getDestinationType() { + return destinationType; + } + + public void setDestinationType(String destinationType) { + this.destinationType = destinationType; + } + + public String getDestinationName() { + return destinationName; + } + + public void setDestinationName(String destinationName) { + this.destinationName = destinationName; + } + + public String getMessageSelector() { + return messageSelector; + } + + public void setMessageSelector(String messageSelector) { + this.messageSelector = messageSelector; + } + + public Integer getSessionAcknowledgeMode() { + return sessionAcknowledgeMode; + } + + public void setSessionAcknowledgeMode(Integer sessionAcknowledgeMode) { + this.sessionAcknowledgeMode = sessionAcknowledgeMode; + } + + public Boolean getSessionTransacted() { + return sessionTransacted; + } + + public void setSessionTransacted(Boolean sessionTransacted) { + this.sessionTransacted = sessionTransacted; + } + } \ No newline at end of file diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java index dd83c37..2b18481 100644 --- a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java +++ b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java @@ -19,9 +19,7 @@ package org.apache.rocketmq.connect.activemq; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; - import javax.jms.Message; - import org.apache.rocketmq.connect.activemq.pattern.PatternProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,11 +29,11 @@ public class Replicator { private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class); private PatternProcessor processor; - + private Config config; private BlockingQueue<Message> queue = new LinkedBlockingQueue<>(); - public Replicator(Config config){ + public Replicator(Config config) { this.config = config; } @@ -47,11 +45,12 @@ public class Replicator { } catch (Exception e) { LOGGER.error("Start error.", e); + throw new RuntimeException(e); } } - public void stop(){ - processor.stop(); + public void stop() { + processor.stop(); } public void commit(Message message, boolean isComplete) { @@ -59,7 +58,7 @@ public class Replicator { } public Config getConfig() { - return this.config; + return this.config; } public BlockingQueue<Message> getQueue() { diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnector.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java similarity index 89% rename from src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnector.java rename to src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java index 17d3efe..7e6290b 100644 --- a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnector.java +++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java @@ -22,18 +22,17 @@ import io.openmessaging.connector.api.Task; import io.openmessaging.connector.api.source.SourceConnector; import java.util.ArrayList; import java.util.List; - import org.apache.rocketmq.connect.activemq.Config; -public class ActivemqConnector extends SourceConnector { +public class ActivemqSourceConnector extends SourceConnector { private KeyValue config; @Override public String verifyAndSetConfig(KeyValue config) { - for(String requestKey : Config.REQUEST_CONFIG){ - if(!config.containsKey(requestKey)){ + for (String requestKey : Config.REQUEST_CONFIG) { + if (!config.containsKey(requestKey)) { return "Request config key: " + requestKey; } } @@ -61,7 +60,7 @@ public class ActivemqConnector extends SourceConnector { @Override public Class<? extends Task> taskClass() { - return ActivemqTask.class; + return ActivemqSourceTask.class; } @Override diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java similarity index 55% rename from src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java rename to src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java index 7dead7b..2140b5f 100644 --- a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java +++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java @@ -17,7 +17,10 @@ package org.apache.rocketmq.connect.activemq.connector; -import java.io.ByteArrayInputStream; +import com.alibaba.fastjson.JSON; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.SourceDataEntry; +import io.openmessaging.connector.api.source.SourceTask; import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -27,7 +30,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; - import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.MapMessage; @@ -35,40 +37,32 @@ import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.StreamMessage; import javax.jms.TextMessage; - import org.apache.rocketmq.connect.activemq.Config; import org.apache.rocketmq.connect.activemq.Replicator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.alibaba.fastjson.JSON; - -import io.openmessaging.KeyValue; -import io.openmessaging.connector.api.data.SourceDataEntry; -import io.openmessaging.connector.api.source.SourceTask; - -public class ActivemqTask extends SourceTask { +public class ActivemqSourceTask extends SourceTask { - private static final Logger log = LoggerFactory.getLogger(ActivemqTask.class); + private static final Logger log = LoggerFactory.getLogger(ActivemqSourceTask.class); private Replicator replicator; private Config config; - + private ByteBuffer sourcePartition; - - @Override + @Override public Collection<SourceDataEntry> poll() { List<SourceDataEntry> res = new ArrayList<>(); try { - Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS); - if(message != null) { - SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, getMessageConnent(message), System.currentTimeMillis(), null, config.getDestinationName(), null, null); - res.add(sourceDataEntry); - } + Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS); + if (message != null) { + SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, getMessageConnent(message), System.currentTimeMillis(), null, config.getDestinationName(), null, null); + res.add(sourceDataEntry); + } } catch (Exception e) { - log.error("Mysql task poll error, current config:" + JSON.toJSONString(config), e); + log.error("activemq task poll error, current config:" + JSON.toJSONString(config), e); } return res; } @@ -98,39 +92,39 @@ public class ActivemqTask extends SourceTask { @Override public void resume() { } - + @SuppressWarnings("unchecked") - public ByteBuffer getMessageConnent(Message message ) throws JMSException { - byte[] data = null; - if(message instanceof TextMessage) { - data = ((TextMessage) message).getText().getBytes(); - }else if(message instanceof ObjectMessage) { - data = JSON.toJSONBytes( ((ObjectMessage) message).getObject()); - }else if(message instanceof BytesMessage) { - BytesMessage bytesMessage = (BytesMessage)message; - data = new byte[(int) bytesMessage.getBodyLength()]; - bytesMessage.readBytes(data); - }else if(message instanceof MapMessage) { - MapMessage mapMessage = (MapMessage)message; - Map<String,Object> map = new HashMap<>(); - Enumeration<Object> names = mapMessage.getMapNames(); - while(names.hasMoreElements()) { - String name = names.nextElement().toString(); - map.put(name, mapMessage.getObject(name)); - } - data = JSON.toJSONBytes(map); - }else if(message instanceof StreamMessage) { - StreamMessage streamMessage = (StreamMessage)message; - ByteArrayOutputStream bis = new ByteArrayOutputStream(); - byte[] by = new byte[1024]; - int i = 0; - while( (i = streamMessage.readBytes(by)) != 0) { - bis.write(by, 0, i); - } - data = bis.toByteArray(); - }else { - throw new RuntimeException("message type exception"); - } - return data!=null ? ByteBuffer.wrap( data ) : null; + public ByteBuffer getMessageConnent(Message message) throws JMSException { + byte[] data = null; + if (message instanceof TextMessage) { + data = ((TextMessage) message).getText().getBytes(); + } else if (message instanceof ObjectMessage) { + data = JSON.toJSONBytes(((ObjectMessage) message).getObject()); + } else if (message instanceof BytesMessage) { + BytesMessage bytesMessage = (BytesMessage) message; + data = new byte[(int) bytesMessage.getBodyLength()]; + bytesMessage.readBytes(data); + } else if (message instanceof MapMessage) { + MapMessage mapMessage = (MapMessage) message; + Map<String, Object> map = new HashMap<>(); + Enumeration<Object> names = mapMessage.getMapNames(); + while (names.hasMoreElements()) { + String name = names.nextElement().toString(); + map.put(name, mapMessage.getObject(name)); + } + data = JSON.toJSONBytes(map); + } else if (message instanceof StreamMessage) { + StreamMessage streamMessage = (StreamMessage) message; + ByteArrayOutputStream bis = new ByteArrayOutputStream(); + byte[] by = new byte[1024]; + int i = 0; + while ((i = streamMessage.readBytes(by)) != -1) { + bis.write(by, 0, i); + } + data = bis.toByteArray(); + } else { + throw new RuntimeException("message type exception"); + } + return data != null ? ByteBuffer.wrap(data) : null; } } diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java index b26bfb9..60a34cf 100644 --- a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java +++ b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java @@ -8,7 +8,6 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.connect.activemq.Config; @@ -16,63 +15,62 @@ import org.apache.rocketmq.connect.activemq.Replicator; public class PatternProcessor { - private Replicator replicator; - - private Config config; - - Connection connection; - - Session session; - - MessageConsumer consumer; - - public PatternProcessor(Replicator replicator) { - this.replicator = replicator; - this.config = replicator.getConfig(); - } - - public void start() { - if(!StringUtils.equals("topic", config.getDestinationType())&&!StringUtils.equals("queue", config.getDestinationType())) { - throw new RuntimeException("destination type is incorrectness"); - } - - try { - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActivemqUrl()); - - if(StringUtils.isNotBlank(config.getActivemqUsername()) && StringUtils.isNotBlank(config.getActivemqPassword()) ) { - connection = connectionFactory.createConnection(config.getActivemqUsername() , config.getActivemqPassword()); - }else { - connection = connectionFactory.createConnection(); - } - connection.start(); - Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode()); - Destination destination = null; - if(StringUtils.equals("topic", config.getDestinationType())) { - destination = session.createTopic(config.getDestinationName()); - }else if(StringUtils.equals("queue", config.getDestinationType())){ - destination = session.createQueue(config.getDestinationName()); - } - consumer = session.createConsumer(destination, config.getMessageSelector()); - consumer.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - replicator.commit(message, true); - } - }); - }catch(Exception e) { - throw new RuntimeException(e); - } - } - - public void stop() { + private Replicator replicator; + + private Config config; + + Connection connection; + + Session session; + + MessageConsumer consumer; + + public PatternProcessor(Replicator replicator) { + this.replicator = replicator; + this.config = replicator.getConfig(); + } + + public void start() { + if (!StringUtils.equals("topic", config.getDestinationType()) && !StringUtils.equals("queue", config.getDestinationType())) { + throw new RuntimeException("destination type is incorrectness"); + } + + try { + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActivemqUrl()); + + if (StringUtils.isNotBlank(config.getActivemqUsername()) && StringUtils.isNotBlank(config.getActivemqPassword())) { + connection = connectionFactory.createConnection(config.getActivemqUsername(), config.getActivemqPassword()); + } else { + connection = connectionFactory.createConnection(); + } + connection.start(); + Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode()); + Destination destination = null; + if (StringUtils.equals("topic", config.getDestinationType())) { + destination = session.createTopic(config.getDestinationName()); + } else if (StringUtils.equals("queue", config.getDestinationType())) { + destination = session.createQueue(config.getDestinationName()); + } + consumer = session.createConsumer(destination, config.getMessageSelector()); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + replicator.commit(message, true); + } + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void stop() { try { - consumer.close(); - session.close(); - connection.close(); - } catch (JMSException e) { - throw new RuntimeException(e); - } - } - - + consumer.close(); + session.close(); + connection.close(); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + } diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java new file mode 100644 index 0000000..909a5d7 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java @@ -0,0 +1,57 @@ +package org.apache.rocketmq.connect.activemq; + +import java.lang.reflect.Field; + +import javax.jms.Message; + +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.rocketmq.connect.activemq.pattern.PatternProcessor; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import org.junit.Assert; + +public class ReplicatorTest { + + Replicator replicator; + + PatternProcessor patternProcessor; + + Config config; + + @Before + public void before() throws IllegalArgumentException, IllegalAccessException, NoSuchFieldException, SecurityException { + config = new Config(); + replicator = new Replicator(config); + + patternProcessor = Mockito.mock(PatternProcessor.class); + + Field processor = Replicator.class.getDeclaredField("processor"); + processor.setAccessible(true); + processor.set(replicator, patternProcessor); + } + + @Test(expected = RuntimeException.class) + public void startTest() { + replicator.start(); + } + + @Test + public void stop() { + replicator.stop(); + Mockito.verify(patternProcessor, Mockito.times(1)).stop(); + } + + @Test + public void commitAddGetQueueTest() { + Message message = new ActiveMQTextMessage(); + replicator.commit(message, false); + Assert.assertEquals(replicator.getQueue().poll(), message); + } + + @Test + public void getConfigTest() { + Assert.assertEquals(replicator.getConfig(), config); + } +} diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java new file mode 100644 index 0000000..eae1ae6 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java @@ -0,0 +1,41 @@ +package org.apache.rocketmq.connect.activemq.connector; + +import static org.junit.Assert.assertEquals; + +import org.apache.rocketmq.connect.activemq.Config; +import org.junit.Test; + +import io.openmessaging.KeyValue; +import io.openmessaging.internal.DefaultKeyValue; + +public class ActivemqConnectorTest { + + ActivemqSourceConnector connector = new ActivemqSourceConnector(); + + @Test + public void verifyAndSetConfigTest() { + KeyValue keyValue = new DefaultKeyValue(); + + for (String requestKey : Config.REQUEST_CONFIG) { + assertEquals(connector.verifyAndSetConfig(keyValue), "Request config key: " + requestKey); + keyValue.put(requestKey, requestKey); + } + assertEquals(connector.verifyAndSetConfig(keyValue), ""); + } + + @Test + public void taskClassTest() { + assertEquals(connector.taskClass(), ActivemqSourceTask.class); + } + + @Test + public void taskConfigsTest() { + assertEquals(connector.taskConfigs().get(0), null); + KeyValue keyValue = new DefaultKeyValue(); + for (String requestKey : Config.REQUEST_CONFIG) { + keyValue.put(requestKey, requestKey); + } + connector.verifyAndSetConfig(keyValue); + assertEquals(connector.taskConfigs().get(0), keyValue); + } +} diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java new file mode 100644 index 0000000..2b0821b --- /dev/null +++ b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java @@ -0,0 +1,148 @@ +package org.apache.rocketmq.connect.activemq.connector; + +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.activemq.command.ActiveMQStreamMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.rocketmq.connect.activemq.Config; +import org.apache.rocketmq.connect.activemq.Replicator; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import com.alibaba.fastjson.JSON; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.SourceDataEntry; +import io.openmessaging.internal.DefaultKeyValue; + +public class ActivemqSourceTaskTest { + + public void befores() throws JMSException, InterruptedException { + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://112.74.48.251:6166"); + Connection connection = connectionFactory.createConnection(); + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue("test-queue"); + + MessageProducer producer = session.createProducer(destination); + + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + for (int i = 0; i < 20; i++) { + TextMessage message = session.createTextMessage("hello 我是消息:" + i); + producer.send(message); + } + + session.commit(); + session.close(); + connection.close(); + } + + //@Test + public void test() throws InterruptedException { + KeyValue kv = new DefaultKeyValue(); + kv.put("activemqUrl", "tcp://112.74.48.251:6166"); + kv.put("destinationType", "queue"); + kv.put("destinationName", "test-queue"); + ActivemqSourceTask task = new ActivemqSourceTask(); + task.start(kv); + for (int i = 0; i < 20; ) { + Collection<SourceDataEntry> sourceDataEntry = task.poll(); + i = i + sourceDataEntry.size(); + System.out.println(sourceDataEntry); + } + Thread.sleep(20000); + } + + @Test + public void pollTest() throws Exception { + ActivemqSourceTask task = new ActivemqSourceTask(); + TextMessage textMessage = new ActiveMQTextMessage(); + textMessage.setText("hello rocketmq"); + + Replicator replicatorObject = Mockito.mock(Replicator.class); + BlockingQueue<Message> queue = new LinkedBlockingQueue<>(); + Mockito.when(replicatorObject.getQueue()).thenReturn(queue); + + Field replicator = ActivemqSourceTask.class.getDeclaredField("replicator"); + replicator.setAccessible(true); + replicator.set(task, replicatorObject); + + Field config = ActivemqSourceTask.class.getDeclaredField("config"); + config.setAccessible(true); + config.set(task, new Config()); + + queue.put(textMessage); + Collection<SourceDataEntry> list = task.poll(); + Assert.assertEquals(list.size(), 1); + + list = task.poll(); + Assert.assertEquals(list.size(), 0); + + } + + @Test(expected = RuntimeException.class) + public void getMessageConnentTest() throws JMSException { + String value = "hello rocketmq"; + ActivemqSourceTask task = new ActivemqSourceTask(); + TextMessage textMessage = new ActiveMQTextMessage(); + textMessage.setText(value); + ByteBuffer buffer = task.getMessageConnent(textMessage); + Assert.assertEquals(new String(buffer.array()), textMessage.getText()); + + ObjectMessage objectMessage = new ActiveMQObjectMessage(); + objectMessage.setObject(value); + buffer = task.getMessageConnent(objectMessage); + Assert.assertEquals(new String(buffer.array()), "\"" + objectMessage.getObject().toString() + "\""); + + BytesMessage bytes = new ActiveMQBytesMessage(); + bytes.writeBytes(value.getBytes()); + bytes.reset(); + buffer = task.getMessageConnent(bytes); + Assert.assertEquals(new String(buffer.array()), value); + + MapMessage mapMessage = new ActiveMQMapMessage(); + mapMessage.setString("hello", "rocketmq"); + buffer = task.getMessageConnent(mapMessage); + Map<String, String> map = JSON.parseObject(buffer.array(), Map.class); + Assert.assertEquals(map.get("hello"), "rocketmq"); + Assert.assertEquals(map.size(), 1); + + StreamMessage streamMessage = new ActiveMQStreamMessage(); + String valueTwo = null; + for (int i = 0; i < 200; i++) { + valueTwo = valueTwo + value; + } + streamMessage.writeBytes(valueTwo.getBytes()); + streamMessage.reset(); + buffer = task.getMessageConnent(streamMessage); + Assert.assertEquals(new String(buffer.array()), valueTwo); + + task.getMessageConnent(null); + } +} diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java deleted file mode 100644 index 780cbc9..0000000 --- a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java +++ /dev/null @@ -1,68 +0,0 @@ -package org.apache.rocketmq.connect.activemq.connector; - -import java.util.Collection; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.rocketmq.connect.activemq.Config; -import org.junit.Before; -import org.junit.Test; - -import io.openmessaging.KeyValue; -import io.openmessaging.connector.api.data.SourceDataEntry; -import io.openmessaging.internal.DefaultKeyValue; - -public class ActivemqTaskTest { - - @Before - public void befores() throws JMSException, InterruptedException { - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://112.74.48.251:6166"); - Connection connection = connectionFactory.createConnection(); - - connection.start(); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue("test-queue"); - - MessageProducer producer = session.createProducer(destination); - - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - for (int i = 0; i < 20; i++) { - TextMessage message = session.createTextMessage("hello 我是消息:" + i); - producer.send(message); - } - - session.commit(); - session.close(); - connection.close(); - } - - @Test - public void nullTest() { - - } - - @Test - public void test() throws InterruptedException { - KeyValue kv = new DefaultKeyValue(); - kv.put("activemqUrl", "tcp://112.74.48.251:6166"); - kv.put("destinationType", "queue"); - kv.put("destinationName", "test-queue"); - ActivemqTask task = new ActivemqTask(); - task.start(kv); - for(int i = 0 ; i < 20;) { - Collection<SourceDataEntry> sourceDataEntry = task.poll(); - i = i+sourceDataEntry.size(); - System.out.println(sourceDataEntry); - } - Thread.sleep(20000); - - } -}
