This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit a7ab1c46a9938df2984c3ad730609c1a7797cc9b Author: githublaohu <[email protected]> AuthorDate: Sun Jul 21 21:17:48 2019 +0800 [ISSUE #312] Implement rocketmq connect RabbitMQ (#313) * complete RabbitMQ connector * delete class file --- README.md | 16 ++ pom.xml | 205 +++++++++++++++++++++ .../rocketmq/connect/rabbitmq/ErrorCode.java | 8 + .../rocketmq/connect/rabbitmq/RabbitmqConfig.java | 61 ++++++ .../connector/RabbitmqSourceConnector.java | 35 ++++ .../rabbitmq/connector/RabbitmqSourceTask.java | 37 ++++ .../rabbitmq/pattern/RabbitMQPatternProcessor.java | 48 +++++ .../rocketmq/connect/jms/RabbitmqConfigTest.java | 28 +++ .../connector/RabbitmqSourceConnectorTest.java | 54 ++++++ .../rabbitmq/connector/RabbitmqSourceTaskTest.java | 164 +++++++++++++++++ .../pattern/RabbitMQPatternProcessorTest.java | 41 +++++ 11 files changed, 697 insertions(+) diff --git a/README.md b/README.md index 8b13789..708bee3 100644 --- a/README.md +++ b/README.md @@ -1 +1,17 @@ +##### ActiveConnector fully-qualified name +org.apache.rocketmq.connect.rabbitmq.connector.RabbitmqSourceConnector + + +##### parameter configuration + +parameter | effect | required |default +---|--- |--- | --- +rabbtimq.url | The URL of the RabbtiMQ broker | yes | null +rabbtimq.username | The username to use when connecting to RabbtiMQ | yes | null +rabbtimq.password| The password to use when connecting to RabbtiMQ | yes | null +jms.destination.name | The name of the JMS destination (queue or topic) to read from | yes | null +jms.destination.type | The type of JMS destination, which is either queue or topic | yes | null +jms.message.selector | The message selector that should be applied to messages in the destination | no | null +jms.session.acknowledge.mode | The acknowledgement mode for the JMS Session | null | Session.AUTO_ACKNOWLEDGE +jms.session.transacted | Flag to determine if the session is transacted and the session completely controls. the message delivery by either committing or rolling back the session | null | false diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..d5687c1 --- /dev/null +++ b/pom.xml @@ -0,0 +1,205 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-connect-rabbitmq</artifactId> + <version>1.0.0</version> + + <name>connect-rabbitmq</name> + <url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-rabbitmq</url> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> + + <issueManagement> + <system>jira</system> + <url>https://issues.apache.org/jira/browse/RocketMQ</url> + </issueManagement> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + + <!-- Compiler settings properties --> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>versions-maven-plugin</artifactId> + <version>2.3</version> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>clirr-maven-plugin</artifactId> + <version>2.7</version> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.6.1</version> + <configuration> + <source>${maven.compiler.source}</source> + <target>${maven.compiler.target}</target> + <compilerVersion>${maven.compiler.source}</compilerVersion> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19.1</version> + <configuration> + <argLine>-Xms512m -Xmx1024m</argLine> + <forkMode>always</forkMode> + <includes> + <include>**/*Test.java</include> + </includes> + </configuration> + </plugin> + <plugin> + <artifactId>maven-site-plugin</artifactId> + <version>3.6</version> + <configuration> + <locales>en_US</locales> + <outputEncoding>UTF-8</outputEncoding> + <inputEncoding>UTF-8</inputEncoding> + </configuration> + </plugin> + <plugin> + <artifactId>maven-source-plugin</artifactId> + <version>3.0.1</version> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.10.4</version> + <configuration> + <charset>UTF-8</charset> + <locale>en_US</locale> + <excludePackageNames>io.openmessaging.internal</excludePackageNames> + </configuration> + <executions> + <execution> + <id>aggregate</id> + <goals> + <goal>aggregate</goal> + </goals> + <phase>site</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>3.0.2</version> + <configuration> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <version>3.0.4</version> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <version>2.6.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>2.6.3</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-connector</artifactId> + <version>0.1.0-beta</version> + </dependency> + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>1.2.51</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.7</version> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <version>1.0.13</version> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-core</artifactId> + <version>1.0.13</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-openmessaging</artifactId> + <version>4.3.2</version> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + <version>1.2</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-connect-jms</artifactId> + <version>1.0.0</version> + </dependency> + <dependency> + <groupId>com.rabbitmq</groupId> + <artifactId>amqp-client</artifactId> + <version>5.7.1</version> + </dependency> + <dependency> + <groupId>com.rabbitmq.jms</groupId> + <artifactId>rabbitmq-jms</artifactId> + <version>1.11.2</version> + </dependency> + </dependencies> + +</project> diff --git a/src/main/java/org/apache/rocketmq/connect/rabbitmq/ErrorCode.java b/src/main/java/org/apache/rocketmq/connect/rabbitmq/ErrorCode.java new file mode 100644 index 0000000..5f70361 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/rabbitmq/ErrorCode.java @@ -0,0 +1,8 @@ +package org.apache.rocketmq.connect.rabbitmq; + +public class ErrorCode { + + public static final int START_ERROR_CODE = 10001; + + public static final int STOP_ERROR_CODE = 10002; +} diff --git a/src/main/java/org/apache/rocketmq/connect/rabbitmq/RabbitmqConfig.java b/src/main/java/org/apache/rocketmq/connect/rabbitmq/RabbitmqConfig.java new file mode 100644 index 0000000..2b12c18 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/rabbitmq/RabbitmqConfig.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.rabbitmq; + +import java.util.HashSet; +import java.util.Set; +import org.apache.rocketmq.connect.jms.Config; + +public class RabbitmqConfig extends Config { + + @SuppressWarnings("serial") + public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { + { + add("rabbitmqUrl"); + add("rabbitmqUsername"); + add("rabbitmqPassword"); + add("destinationType"); + add("destinationName"); + } + }; + + public String getRabbitmqUrl() { + return getBrokerUrl(); + } + + public void setRabbitmqUrl(String rabbitmqUrl) { + setBrokerUrl(rabbitmqUrl); + } + + public String getRabbitmqUsername() { + return getUsername(); + } + + public void setRabbitmqUsername(String rabbitmqUsername) { + setUsername(rabbitmqUsername); + } + + public String getRabbitmqPassword() { + return getPassword(); + } + + public void setRabbitmqPassword(String rabbitmqPassword) { + setPassword(rabbitmqPassword); + } + +} \ No newline at end of file diff --git a/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnector.java new file mode 100644 index 0000000..328632d --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnector.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.rabbitmq.connector; + +import io.openmessaging.connector.api.Task; +import java.util.Set; +import org.apache.rocketmq.connect.jms.connector.BaseJmsSourceConnector; +import org.apache.rocketmq.connect.rabbitmq.RabbitmqConfig; + +public class RabbitmqSourceConnector extends BaseJmsSourceConnector { + + @Override + public Class<? extends Task> taskClass() { + return RabbitmqSourceTask.class; + } + + public Set<String> getRequiredConfig() { + return RabbitmqConfig.REQUEST_CONFIG; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTask.java b/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTask.java new file mode 100644 index 0000000..ab2d1e4 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTask.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.rabbitmq.connector; + +import org.apache.rocketmq.connect.jms.Config; +import org.apache.rocketmq.connect.jms.Replicator; +import org.apache.rocketmq.connect.jms.connector.BaseJmsSourceTask; +import org.apache.rocketmq.connect.jms.pattern.PatternProcessor; +import org.apache.rocketmq.connect.rabbitmq.RabbitmqConfig; +import org.apache.rocketmq.connect.rabbitmq.pattern.RabbitMQPatternProcessor; + +public class RabbitmqSourceTask extends BaseJmsSourceTask { + + public PatternProcessor getPatternProcessor(Replicator replicator) { + return new RabbitMQPatternProcessor(replicator); + } + + @Override + public Config getConfig() { + return new RabbitmqConfig(); + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessor.java new file mode 100644 index 0000000..5056a11 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessor.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.rabbitmq.pattern; + +import com.rabbitmq.jms.admin.RMQConnectionFactory; +import io.openmessaging.connector.api.exception.DataConnectException; +import java.util.ArrayList; +import java.util.List; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import org.apache.rocketmq.connect.jms.ErrorCode; +import org.apache.rocketmq.connect.jms.Replicator; +import org.apache.rocketmq.connect.jms.pattern.PatternProcessor; + +public class RabbitMQPatternProcessor extends PatternProcessor { + + public RabbitMQPatternProcessor(Replicator replicator) { + super(replicator); + } + + public ConnectionFactory connectionFactory() { + RMQConnectionFactory connectionFactory = new RMQConnectionFactory(); + try { + List<String> urlList = new ArrayList<>(); + urlList.add(config.getBrokerUrl()); + connectionFactory.setUris(urlList); + } catch (JMSException e) { + throw new DataConnectException(ErrorCode.START_ERROR_CODE, e.getMessage(), e); + } + return connectionFactory; + } + +} diff --git a/src/test/java/org/apache/rocketmq/connect/jms/RabbitmqConfigTest.java b/src/test/java/org/apache/rocketmq/connect/jms/RabbitmqConfigTest.java new file mode 100644 index 0000000..7228f68 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/connect/jms/RabbitmqConfigTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jms; + +import org.apache.rocketmq.connect.rabbitmq.RabbitmqConfig; + +public class RabbitmqConfigTest { + + RabbitmqConfig config; + + + +} diff --git a/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnectorTest.java new file mode 100644 index 0000000..ea52a43 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnectorTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.rabbitmq.connector; + +import static org.junit.Assert.assertEquals; + +import org.apache.rocketmq.connect.jms.Config; +import org.apache.rocketmq.connect.rabbitmq.RabbitmqConfig; +import org.junit.Test; + +import io.openmessaging.KeyValue; +import io.openmessaging.internal.DefaultKeyValue; + +public class RabbitmqSourceConnectorTest { + + RabbitmqSourceConnector rabbitmqSourceConnector = new RabbitmqSourceConnector(); + + @Test + public void taskClass() { + assertEquals( RabbitmqSourceTask.class, rabbitmqSourceConnector.taskClass()); + } + + @Test + public void getRequiredConfig() { + assertEquals( RabbitmqConfig.REQUEST_CONFIG , rabbitmqSourceConnector.getRequiredConfig()); + } + + + @Test + public void verifyAndSetConfig() { + KeyValue keyValue = new DefaultKeyValue(); + + for (String requestKey :RabbitmqConfig.REQUEST_CONFIG) { + assertEquals(rabbitmqSourceConnector.verifyAndSetConfig(keyValue), "Request config key: " + requestKey); + keyValue.put(requestKey, requestKey); + } + assertEquals(rabbitmqSourceConnector.verifyAndSetConfig(keyValue), ""); + } +} diff --git a/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTaskTest.java new file mode 100644 index 0000000..232ddc1 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTaskTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.rabbitmq.connector; + +import static org.junit.Assert.assertEquals; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Map; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +import org.apache.rocketmq.connect.jms.Replicator; +import org.apache.rocketmq.connect.rabbitmq.RabbitmqConfig; +import org.apache.rocketmq.connect.rabbitmq.pattern.RabbitMQPatternProcessor; +import org.junit.Assert; +import org.junit.Test; + +import com.alibaba.fastjson.JSON; +import com.rabbitmq.jms.admin.RMQConnectionFactory; +import com.rabbitmq.jms.client.message.RMQBytesMessage; +import com.rabbitmq.jms.client.message.RMQMapMessage; +import com.rabbitmq.jms.client.message.RMQObjectMessage; +import com.rabbitmq.jms.client.message.RMQStreamMessage; +import com.rabbitmq.jms.client.message.RMQTextMessage; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.SourceDataEntry; +import io.openmessaging.internal.DefaultKeyValue; + +public class RabbitmqSourceTaskTest { + + //@Before + public void befores() throws JMSException, InterruptedException { + RMQConnectionFactory connectionFactory = new RMQConnectionFactory(); + connectionFactory.setUri("amqp://112.74.48.251:5672"); + Connection connection = connectionFactory.createConnection("admin", "admin"); + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue("test-queue"); + + MessageProducer producer = session.createProducer(destination); + + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + for (int i = 0; i < 20; i++) { + TextMessage message = session.createTextMessage("hello 我是消息:" + i); + producer.send(message); + } + + session.commit(); + session.close(); + connection.close(); + } + + //@Test + public void test() throws InterruptedException { + KeyValue kv = new DefaultKeyValue(); + kv.put("rabbitmqUrl", "amqp://112.74.48.251:5672"); + kv.put("rabbitmqUsername", "admin"); + kv.put("rabbitmqPassword", "admin"); + kv.put("destinationType", "queue"); + kv.put("destinationName", "test-queue"); + RabbitmqSourceTask task = new RabbitmqSourceTask(); + task.start(kv); + for (int i = 0; i < 20;) { + Collection<SourceDataEntry> sourceDataEntry = task.poll(); + i = i + sourceDataEntry.size(); + System.out.println(sourceDataEntry); + } + Thread.sleep(20000); + } + + @Test + public void getMessageConnentTest() throws JMSException { + String value = "hello rocketmq"; + RabbitmqSourceTask task = new RabbitmqSourceTask(); + RMQTextMessage textMessage = new RMQTextMessage(); + textMessage.setText(value); + ByteBuffer buffer = task.getMessageContent(textMessage); + Assert.assertEquals(new String(buffer.array()), textMessage.getText()); + + ObjectMessage objectMessage = new RMQObjectMessage(); + objectMessage.setObject(value); + buffer = task.getMessageContent(objectMessage); + Assert.assertEquals(new String(buffer.array()), "\"" + objectMessage.getObject().toString() + "\""); + + BytesMessage bytes = new RMQBytesMessage(); + bytes.writeBytes(value.getBytes()); + bytes.reset(); + buffer = task.getMessageContent(bytes); + Assert.assertEquals(new String(buffer.array()), value); + + MapMessage mapMessage = new RMQMapMessage(); + mapMessage.setString("hello", "rocketmq"); + buffer = task.getMessageContent(mapMessage); + Map<String, String> map = JSON.parseObject(buffer.array(), Map.class); + Assert.assertEquals(map.get("hello"), "rocketmq"); + Assert.assertEquals(map.size(), 1); + + StreamMessage streamMessage = new RMQStreamMessage(); + String valueTwo = null; + for (int i = 0; i < 200; i++) { + valueTwo = valueTwo + value; + } + streamMessage.writeBytes(valueTwo.getBytes()); + streamMessage.reset(); + //buffer = task.getMessageContent(streamMessage); + //Assert.assertEquals(new String(buffer.array()), valueTwo); + + } + + @Test(expected=Exception.class) + public void getMessageConnentException() throws JMSException { + RabbitmqSourceTask task = new RabbitmqSourceTask(); + task.getMessageContent(null); + + } + + public void getPatternProcessor(Replicator replicator) { + KeyValue kv = new DefaultKeyValue(); + kv.put("rabbitmqUrl", "amqp://112.74.48.251:5672"); + kv.put("rabbitmqUsername", "admin"); + kv.put("rabbitmqPassword", "admin"); + kv.put("destinationType", "queue"); + kv.put("destinationName", "test-queue"); + RabbitmqConfig config = new RabbitmqConfig(); + config.load(kv); + replicator = new Replicator(config,null); + RabbitmqSourceTask task = new RabbitmqSourceTask(); + assertEquals(RabbitMQPatternProcessor.class, task.getPatternProcessor(replicator).getClass()); + } + + @Test + public void getConfig() { + RabbitmqSourceTask task = new RabbitmqSourceTask(); + assertEquals(task.getConfig().getClass() , RabbitmqConfig.class); + } +} diff --git a/src/test/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessorTest.java b/src/test/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessorTest.java new file mode 100644 index 0000000..a23d233 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessorTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.rabbitmq.pattern; + +import static org.junit.Assert.assertEquals; + +import org.apache.rocketmq.connect.jms.Replicator; +import org.apache.rocketmq.connect.rabbitmq.RabbitmqConfig; +import org.junit.Test; + +import com.rabbitmq.jms.admin.RMQConnectionFactory; + +public class RabbitMQPatternProcessorTest{ + + + @Test + public void connectionFactory() { + RabbitmqConfig rabbitmqConfig = new RabbitmqConfig(); + rabbitmqConfig.setRabbitmqUrl("amqp://112.74.48.251:5672"); + Replicator replicator = new Replicator(rabbitmqConfig, null); + RabbitMQPatternProcessor patternProcessor = new RabbitMQPatternProcessor(replicator); + assertEquals(RMQConnectionFactory.class, patternProcessor.connectionFactory().getClass()); + } + + +}
