This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 6801219d34b33501c70a2e76e6147c07edfd2b41 Author: githublaohu <[email protected]> AuthorDate: Sun Jul 21 21:17:48 2019 +0800 [ISSUE #312] Implement rocketmq connect RabbitMQ (#313) * complete RabbitMQ connector * delete class file --- README-CN.md | 16 -- README.md | 16 -- pom.xml | 5 - rocketmq-connect-jms.iml | 21 +++ .../org/apache/rocketmq/connect/jms/Config.java | 4 +- .../jms/connector/BaseJmsSourceConnector.java | 4 +- .../connect/jms/connector/BaseJmsSourceTask.java | 4 +- .../connect/jms/pattern/PatternProcessor.java | 4 +- .../rocketmq/connect/jms/ReplicatorTest.java | 74 ---------- .../jms/connector/ActivemqSourceTaskTest.java | 164 --------------------- .../jms/connector/BaseJmsSourceConnectorTest.java | 2 +- 11 files changed, 29 insertions(+), 285 deletions(-) diff --git a/README-CN.md b/README-CN.md deleted file mode 100644 index be03683..0000000 --- a/README-CN.md +++ /dev/null @@ -1,16 +0,0 @@ -##### ActiveConnector完全限定名 -org.apache.rocketmq.connect.activemq.connector.ActivemqConnector - - -##### 配置参数 - -参数 | 作用 | 是否必填 | 默认值 ----|--- |--- | --- -activemq.url | activemq ip与端口号 | 是 | 无 -activemq.username | 用户名 | 否 | 无 -activemq.password| 密码 | 否 | 无 -jms.destination.name | 读取的队列或者主题名 | 是 | 无 -jms.destination.type | 读取的类型:queue(队列)或者topic(主题) | 是 | 无 -jms.message.selector | 过滤器 | 否 |无 -jms.session.acknowledge.mode | 消息确认 | 否 | Session.AUTO_ACKNOWLEDGE -jms.session.transacted | 是否是事务会话 | 否 | false diff --git a/README.md b/README.md deleted file mode 100644 index e15149e..0000000 --- a/README.md +++ /dev/null @@ -1,16 +0,0 @@ -##### ActiveConnector fully-qualified name -org.apache.rocketmq.connect.activemq.connector.ActivemqConnector - - -##### parameter configuration - -parameter | effect | required |default ----|--- |--- | --- -activemq.url | The URL of the ActiveMQ broker | yes | null -activemq.username | The username to use when connecting to ActiveMQ | no | null -activemq.password| The password to use when connecting to ActiveMQ | no | null -jms.destination.name | The name of the JMS destination (queue or topic) to read from | yes | null -jms.destination.type | The type of JMS destination, which is either queue or topic | yes | null -jms.message.selector | The message selector that should be applied to messages in the destination | no | null -jms.session.acknowledge.mode | The acknowledgement mode for the JMS Session | null | Session.AUTO_ACKNOWLEDGE -jms.session.transacted | Flag to determine if the session is transacted and the session completely controls. the message delivery by either committing or rolling back the session | null | false diff --git a/pom.xml b/pom.xml index c7c3ba2..50e0490 100644 --- a/pom.xml +++ b/pom.xml @@ -186,11 +186,6 @@ <version>1.2</version> </dependency> <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-all</artifactId> - <version>5.9.0</version> - </dependency> - <dependency> <groupId>javax.jms</groupId> <artifactId>javax.jms-api</artifactId> <version>2.0</version> diff --git a/rocketmq-connect-jms.iml b/rocketmq-connect-jms.iml new file mode 100644 index 0000000..b187d53 --- /dev/null +++ b/rocketmq-connect-jms.iml @@ -0,0 +1,21 @@ +<?xml version="1.0" encoding="UTF-8"?> +<module type="JAVA_MODULE" version="4"> + <component name="EclipseModuleManager"> + <conelement value="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER" /> + <src_description expected_position="0"> + <src_folder value="file://$MODULE_DIR$/src/main/java" expected_position="0" /> + <src_folder value="file://$MODULE_DIR$/src/test/java" expected_position="1" /> + </src_description> + </component> + <component name="NewModuleRootManager"> + <output url="file://$MODULE_DIR$/target/classes" /> + <exclude-output /> + <content url="file://$MODULE_DIR$"> + <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" /> + <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="false" /> + </content> + <orderEntry type="sourceFolder" forTests="false" /> + <orderEntry type="jdk" jdkName="JavaSE-1.8" jdkType="JavaSDK" /> + <orderEntry type="library" name="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER" level="application" /> + </component> +</module> \ No newline at end of file diff --git a/src/main/java/org/apache/rocketmq/connect/jms/Config.java b/src/main/java/org/apache/rocketmq/connect/jms/Config.java index e9cd178..1e69f89 100644 --- a/src/main/java/org/apache/rocketmq/connect/jms/Config.java +++ b/src/main/java/org/apache/rocketmq/connect/jms/Config.java @@ -94,9 +94,7 @@ public class Config { } } } - } - - + } public String getBrokerUrl() { return brokerUrl; diff --git a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java index f939881..68f7677 100644 --- a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java +++ b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java @@ -20,7 +20,6 @@ package org.apache.rocketmq.connect.jms.connector; import java.util.ArrayList; import java.util.List; import java.util.Set; - import io.openmessaging.KeyValue; import io.openmessaging.connector.api.Task; import io.openmessaging.connector.api.source.SourceConnector; @@ -31,7 +30,6 @@ public abstract class BaseJmsSourceConnector extends SourceConnector { @Override public String verifyAndSetConfig(KeyValue config) { - for (String requestKey : getRequiredConfig()) { if (!config.containsKey(requestKey)) { return "Request config key: " + requestKey; @@ -69,5 +67,5 @@ public abstract class BaseJmsSourceConnector extends SourceConnector { return config; } - abstract Set<String> getRequiredConfig(); + public abstract Set<String> getRequiredConfig(); } diff --git a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java index f43e7fb..7b1f7c4 100644 --- a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java +++ b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java @@ -82,7 +82,6 @@ public abstract class BaseJmsSourceTask extends SourceTask { this.config = new Config(); this.config.load(props); this.sourcePartition = ByteBuffer.wrap(config.getBrokerUrl().getBytes("UTF-8")); - this.replicator = new Replicator(config,this); this.replicator.start(); } catch (Exception e) { log.error("activemq task start failed.", e); @@ -143,6 +142,9 @@ public abstract class BaseJmsSourceTask extends SourceTask { } return ByteBuffer.wrap(data); } + + + public abstract Config getConfig(); public abstract PatternProcessor getPatternProcessor(Replicator replicator); } diff --git a/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java index e03691f..5399045 100644 --- a/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java +++ b/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java @@ -33,7 +33,7 @@ public abstract class PatternProcessor { private Replicator replicator; - Config config; + protected Config config; private Connection connection; @@ -47,7 +47,7 @@ public abstract class PatternProcessor { } public abstract ConnectionFactory connectionFactory(); - + public void start() throws Exception { if (!StringUtils.equals("topic", config.getDestinationType()) && !StringUtils.equals("queue", config.getDestinationType())) { diff --git a/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java b/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java deleted file mode 100644 index 32b8f04..0000000 --- a/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.connect.jms; - -import java.lang.reflect.Field; - -import javax.jms.Message; - -import org.apache.activemq.command.ActiveMQTextMessage; -import org.apache.rocketmq.connect.jms.pattern.PatternProcessor; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -import org.junit.Assert; - -public class ReplicatorTest { - - Replicator replicator; - - PatternProcessor patternProcessor; - - Config config; - - @Before - public void before() throws IllegalArgumentException, IllegalAccessException, NoSuchFieldException, SecurityException { - config = new Config(); - replicator = new Replicator(config,null); - - patternProcessor = Mockito.mock(PatternProcessor.class); - - Field processor = Replicator.class.getDeclaredField("processor"); - processor.setAccessible(true); - processor.set(replicator, patternProcessor); - } - - @Test(expected = RuntimeException.class) - public void startTest() throws Exception { - replicator.start(); - } - - @Test - public void stop() throws Exception { - replicator.stop(); - Mockito.verify(patternProcessor, Mockito.times(1)).stop(); - } - - @Test - public void commitAddGetQueueTest() { - Message message = new ActiveMQTextMessage(); - replicator.commit(message, false); - Assert.assertEquals(replicator.getQueue().poll(), message); - } - - @Test - public void getConfigTest() { - Assert.assertEquals(replicator.getConfig(), config); - } -} diff --git a/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java deleted file mode 100644 index 72e818a..0000000 --- a/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.connect.jms.connector; - -import java.lang.reflect.Field; -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Session; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.command.ActiveMQMapMessage; -import org.apache.activemq.command.ActiveMQObjectMessage; -import org.apache.activemq.command.ActiveMQStreamMessage; -import org.apache.activemq.command.ActiveMQTextMessage; -import org.apache.rocketmq.connect.jms.Config; -import org.apache.rocketmq.connect.jms.Replicator; -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Mockito; - -import com.alibaba.fastjson.JSON; - -import io.openmessaging.KeyValue; -import io.openmessaging.connector.api.data.SourceDataEntry; -import io.openmessaging.internal.DefaultKeyValue; - -public class ActivemqSourceTaskTest { - - public void befores() throws JMSException, InterruptedException { - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://112.74.48.251:6166"); - Connection connection = connectionFactory.createConnection(); - - connection.start(); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue("test-queue"); - - MessageProducer producer = session.createProducer(destination); - - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - for (int i = 0; i < 20; i++) { - TextMessage message = session.createTextMessage("hello 我是消息:" + i); - producer.send(message); - } - - session.commit(); - session.close(); - connection.close(); - } - - //@Test - public void test() throws InterruptedException { - KeyValue kv = new DefaultKeyValue(); - kv.put("activemqUrl", "tcp://112.74.48.251:6166"); - kv.put("destinationType", "queue"); - kv.put("destinationName", "test-queue"); - ActivemqSourceTask task = new ActivemqSourceTask(); - task.start(kv); - for (int i = 0; i < 20; ) { - Collection<SourceDataEntry> sourceDataEntry = task.poll(); - i = i + sourceDataEntry.size(); - System.out.println(sourceDataEntry); - } - Thread.sleep(20000); - } - - @Test - public void pollTest() throws Exception { - ActivemqSourceTask task = new ActivemqSourceTask(); - TextMessage textMessage = new ActiveMQTextMessage(); - textMessage.setText("hello rocketmq"); - - Replicator replicatorObject = Mockito.mock(Replicator.class); - BlockingQueue<Message> queue = new LinkedBlockingQueue<>(); - Mockito.when(replicatorObject.getQueue()).thenReturn(queue); - - Field replicator = ActivemqSourceTask.class.getDeclaredField("replicator"); - replicator.setAccessible(true); - replicator.set(task, replicatorObject); - - Field config = ActivemqSourceTask.class.getDeclaredField("config"); - config.setAccessible(true); - config.set(task, new Config()); - - queue.put(textMessage); - Collection<SourceDataEntry> list = task.poll(); - Assert.assertEquals(list.size(), 1); - - list = task.poll(); - Assert.assertEquals(list.size(), 0); - - } - - @Test(expected = RuntimeException.class) - public void getMessageConnentTest() throws JMSException { - String value = "hello rocketmq"; - ActivemqSourceTask task = new ActivemqSourceTask(); - TextMessage textMessage = new ActiveMQTextMessage(); - textMessage.setText(value); - ByteBuffer buffer = task.getMessageConnent(textMessage); - Assert.assertEquals(new String(buffer.array()), textMessage.getText()); - - ObjectMessage objectMessage = new ActiveMQObjectMessage(); - objectMessage.setObject(value); - buffer = task.getMessageConnent(objectMessage); - Assert.assertEquals(new String(buffer.array()), "\"" + objectMessage.getObject().toString() + "\""); - - BytesMessage bytes = new ActiveMQBytesMessage(); - bytes.writeBytes(value.getBytes()); - bytes.reset(); - buffer = task.getMessageConnent(bytes); - Assert.assertEquals(new String(buffer.array()), value); - - MapMessage mapMessage = new ActiveMQMapMessage(); - mapMessage.setString("hello", "rocketmq"); - buffer = task.getMessageConnent(mapMessage); - Map<String, String> map = JSON.parseObject(buffer.array(), Map.class); - Assert.assertEquals(map.get("hello"), "rocketmq"); - Assert.assertEquals(map.size(), 1); - - StreamMessage streamMessage = new ActiveMQStreamMessage(); - String valueTwo = null; - for (int i = 0; i < 200; i++) { - valueTwo = valueTwo + value; - } - streamMessage.writeBytes(valueTwo.getBytes()); - streamMessage.reset(); - buffer = task.getMessageConnent(streamMessage); - Assert.assertEquals(new String(buffer.array()), valueTwo); - - task.getMessageConnent(null); - } -} diff --git a/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java index 6c4029a..632c9d3 100644 --- a/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java +++ b/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java @@ -48,7 +48,7 @@ public class BaseJmsSourceConnectorTest { } @Override - Set<String> getRequiredConfig() { + public Set<String> getRequiredConfig() { return REQUEST_CONFIG; } };
