This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit a7ab1c46a9938df2984c3ad730609c1a7797cc9b
Author: githublaohu <[email protected]>
AuthorDate: Sun Jul 21 21:17:48 2019 +0800

    [ISSUE #312] Implement rocketmq connect RabbitMQ (#313)
    
    * complete RabbitMQ  connector
    
    * delete class file
---
 README.md                                          |  16 ++
 pom.xml                                            | 205 +++++++++++++++++++++
 .../rocketmq/connect/rabbitmq/ErrorCode.java       |   8 +
 .../rocketmq/connect/rabbitmq/RabbitmqConfig.java  |  61 ++++++
 .../connector/RabbitmqSourceConnector.java         |  35 ++++
 .../rabbitmq/connector/RabbitmqSourceTask.java     |  37 ++++
 .../rabbitmq/pattern/RabbitMQPatternProcessor.java |  48 +++++
 .../rocketmq/connect/jms/RabbitmqConfigTest.java   |  28 +++
 .../connector/RabbitmqSourceConnectorTest.java     |  54 ++++++
 .../rabbitmq/connector/RabbitmqSourceTaskTest.java | 164 +++++++++++++++++
 .../pattern/RabbitMQPatternProcessorTest.java      |  41 +++++
 11 files changed, 697 insertions(+)

diff --git a/README.md b/README.md
index 8b13789..708bee3 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,17 @@
+##### ActiveConnector fully-qualified name
+org.apache.rocketmq.connect.rabbitmq.connector.RabbitmqSourceConnector
+
+
+##### parameter configuration
+
+parameter | effect | required |default
+---|--- |--- | ---
+rabbtimq.url | The URL of the RabbtiMQ broker | yes | null
+rabbtimq.username | The username to use when connecting to RabbtiMQ | yes |  
null
+rabbtimq.password|  The password to use when connecting to RabbtiMQ    | yes  
| null
+jms.destination.name | The name of the JMS destination (queue or topic) to 
read from   |  yes | null
+jms.destination.type | The type of JMS destination, which is either queue or 
topic | yes | null
+jms.message.selector | The message selector that should be applied to messages 
in the destination    |  no  | null 
+jms.session.acknowledge.mode | The acknowledgement mode for the JMS Session  | 
null | Session.AUTO_ACKNOWLEDGE
+jms.session.transacted | Flag to determine if the session is transacted and 
the session completely controls. the message delivery by either committing or 
rolling back the session      | null | false
 
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..d5687c1
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,205 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor 
+       license agreements. See the NOTICE file distributed with this work for 
additional 
+       information regarding copyright ownership. The ASF licenses this file 
to 
+       You under the Apache License, Version 2.0 (the "License"); you may not 
use 
+       this file except in compliance with the License. You may obtain a copy 
of 
+       the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required 
+       by applicable law or agreed to in writing, software distributed under 
the 
+       License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS 
+       OF ANY KIND, either express or implied. See the License for the 
specific 
+       language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <groupId>org.apache.rocketmq</groupId>
+       <artifactId>rocketmq-connect-rabbitmq</artifactId>
+       <version>1.0.0</version>
+
+       <name>connect-rabbitmq</name>
+       
<url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-rabbitmq</url>
+
+       <licenses>
+               <license>
+                       <name>The Apache Software License, Version 2.0</name>
+                       
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+               </license>
+       </licenses>
+
+       <issueManagement>
+               <system>jira</system>
+               <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+       </issueManagement>
+
+       <properties>
+               
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+               
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+               <!-- Compiler settings properties -->
+               <maven.compiler.source>1.8</maven.compiler.source>
+               <maven.compiler.target>1.8</maven.compiler.target>
+       </properties>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               <artifactId>versions-maven-plugin</artifactId>
+                               <version>2.3</version>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               <artifactId>clirr-maven-plugin</artifactId>
+                               <version>2.7</version>
+                       </plugin>
+                       <plugin>
+                               <artifactId>maven-compiler-plugin</artifactId>
+                               <version>3.6.1</version>
+                               <configuration>
+                                       
<source>${maven.compiler.source}</source>
+                                       
<target>${maven.compiler.target}</target>
+                                       
<compilerVersion>${maven.compiler.source}</compilerVersion>
+                                       <showDeprecation>true</showDeprecation>
+                                       <showWarnings>true</showWarnings>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <version>2.19.1</version>
+                               <configuration>
+                                       <argLine>-Xms512m -Xmx1024m</argLine>
+                                       <forkMode>always</forkMode>
+                                       <includes>
+                                               <include>**/*Test.java</include>
+                                       </includes>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <artifactId>maven-site-plugin</artifactId>
+                               <version>3.6</version>
+                               <configuration>
+                                       <locales>en_US</locales>
+                                       <outputEncoding>UTF-8</outputEncoding>
+                                       <inputEncoding>UTF-8</inputEncoding>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <artifactId>maven-source-plugin</artifactId>
+                               <version>3.0.1</version>
+                               <executions>
+                                       <execution>
+                                               <id>attach-sources</id>
+                                               <goals>
+                                                       <goal>jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <artifactId>maven-javadoc-plugin</artifactId>
+                               <version>2.10.4</version>
+                               <configuration>
+                                       <charset>UTF-8</charset>
+                                       <locale>en_US</locale>
+                                       
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                               </configuration>
+                               <executions>
+                                       <execution>
+                                               <id>aggregate</id>
+                                               <goals>
+                                                       <goal>aggregate</goal>
+                                               </goals>
+                                               <phase>site</phase>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <artifactId>maven-resources-plugin</artifactId>
+                               <version>3.0.2</version>
+                               <configuration>
+                                       
<encoding>${project.build.sourceEncoding}</encoding>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               <artifactId>findbugs-maven-plugin</artifactId>
+                               <version>3.0.4</version>
+                       </plugin>
+               </plugins>
+       </build>
+
+       <dependencies>
+               <dependency>
+                       <groupId>junit</groupId>
+                       <artifactId>junit</artifactId>
+                       <version>4.11</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.assertj</groupId>
+                       <artifactId>assertj-core</artifactId>
+                       <version>2.6.0</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.mockito</groupId>
+                       <artifactId>mockito-core</artifactId>
+                       <version>2.6.3</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>io.openmessaging</groupId>
+                       <artifactId>openmessaging-connector</artifactId>
+                       <version>0.1.0-beta</version>
+               </dependency>
+               <dependency>
+                       <groupId>com.alibaba</groupId>
+                       <artifactId>fastjson</artifactId>
+                       <version>1.2.51</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-api</artifactId>
+                       <version>1.7.7</version>
+               </dependency>
+               <dependency>
+                       <groupId>ch.qos.logback</groupId>
+                       <artifactId>logback-classic</artifactId>
+                       <version>1.0.13</version>
+               </dependency>
+               <dependency>
+                       <groupId>ch.qos.logback</groupId>
+                       <artifactId>logback-core</artifactId>
+                       <version>1.0.13</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.rocketmq</groupId>
+                       <artifactId>rocketmq-openmessaging</artifactId>
+                       <version>4.3.2</version>
+               </dependency>
+               <dependency>
+                       <groupId>commons-cli</groupId>
+                       <artifactId>commons-cli</artifactId>
+                       <version>1.2</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.rocketmq</groupId>
+                       <artifactId>rocketmq-connect-jms</artifactId>
+                       <version>1.0.0</version>
+               </dependency>
+               <dependency>
+                       <groupId>com.rabbitmq</groupId>
+                       <artifactId>amqp-client</artifactId>
+                       <version>5.7.1</version>
+               </dependency>
+               <dependency>
+                       <groupId>com.rabbitmq.jms</groupId>
+                       <artifactId>rabbitmq-jms</artifactId>
+                       <version>1.11.2</version>
+               </dependency>
+       </dependencies>
+
+</project>
diff --git a/src/main/java/org/apache/rocketmq/connect/rabbitmq/ErrorCode.java 
b/src/main/java/org/apache/rocketmq/connect/rabbitmq/ErrorCode.java
new file mode 100644
index 0000000..5f70361
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/rabbitmq/ErrorCode.java
@@ -0,0 +1,8 @@
+package org.apache.rocketmq.connect.rabbitmq;
+
+public class ErrorCode {
+
+    public static final int START_ERROR_CODE = 10001;
+
+    public static final int STOP_ERROR_CODE = 10002;
+}
diff --git 
a/src/main/java/org/apache/rocketmq/connect/rabbitmq/RabbitmqConfig.java 
b/src/main/java/org/apache/rocketmq/connect/rabbitmq/RabbitmqConfig.java
new file mode 100644
index 0000000..2b12c18
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/rabbitmq/RabbitmqConfig.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.rabbitmq;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.rocketmq.connect.jms.Config;
+
+public class RabbitmqConfig extends Config {
+
+    @SuppressWarnings("serial")
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+        {
+            add("rabbitmqUrl");
+            add("rabbitmqUsername");
+            add("rabbitmqPassword");
+            add("destinationType");
+            add("destinationName");
+        }
+    };
+
+    public String getRabbitmqUrl() {
+        return getBrokerUrl();
+    }
+
+    public void setRabbitmqUrl(String rabbitmqUrl) {
+        setBrokerUrl(rabbitmqUrl);
+    }
+
+    public String getRabbitmqUsername() {
+        return getUsername();
+    }
+
+    public void setRabbitmqUsername(String rabbitmqUsername) {
+        setUsername(rabbitmqUsername);
+    }
+
+    public String getRabbitmqPassword() {
+        return getPassword();
+    }
+
+    public void setRabbitmqPassword(String rabbitmqPassword) {
+        setPassword(rabbitmqPassword);
+    }
+
+}
\ No newline at end of file
diff --git 
a/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnector.java
 
b/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnector.java
new file mode 100644
index 0000000..328632d
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnector.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.rabbitmq.connector;
+
+import io.openmessaging.connector.api.Task;
+import java.util.Set;
+import org.apache.rocketmq.connect.jms.connector.BaseJmsSourceConnector;
+import org.apache.rocketmq.connect.rabbitmq.RabbitmqConfig;
+
+public class RabbitmqSourceConnector extends BaseJmsSourceConnector {
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return RabbitmqSourceTask.class;
+    }
+
+    public Set<String> getRequiredConfig() {
+        return RabbitmqConfig.REQUEST_CONFIG;
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTask.java
 
b/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTask.java
new file mode 100644
index 0000000..ab2d1e4
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTask.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.rabbitmq.connector;
+
+import org.apache.rocketmq.connect.jms.Config;
+import org.apache.rocketmq.connect.jms.Replicator;
+import org.apache.rocketmq.connect.jms.connector.BaseJmsSourceTask;
+import org.apache.rocketmq.connect.jms.pattern.PatternProcessor;
+import org.apache.rocketmq.connect.rabbitmq.RabbitmqConfig;
+import org.apache.rocketmq.connect.rabbitmq.pattern.RabbitMQPatternProcessor;
+
+public class RabbitmqSourceTask extends BaseJmsSourceTask {
+
+    public PatternProcessor getPatternProcessor(Replicator replicator) {
+        return new RabbitMQPatternProcessor(replicator);
+    }
+
+    @Override
+    public Config getConfig() {
+        return new RabbitmqConfig();
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessor.java
 
b/src/main/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessor.java
new file mode 100644
index 0000000..5056a11
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.rabbitmq.pattern;
+
+import com.rabbitmq.jms.admin.RMQConnectionFactory;
+import io.openmessaging.connector.api.exception.DataConnectException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import org.apache.rocketmq.connect.jms.ErrorCode;
+import org.apache.rocketmq.connect.jms.Replicator;
+import org.apache.rocketmq.connect.jms.pattern.PatternProcessor;
+
+public class RabbitMQPatternProcessor extends PatternProcessor {
+
+    public RabbitMQPatternProcessor(Replicator replicator) {
+        super(replicator);
+    }
+
+    public ConnectionFactory connectionFactory() {
+        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
+        try {
+            List<String> urlList = new ArrayList<>();
+            urlList.add(config.getBrokerUrl());
+            connectionFactory.setUris(urlList);
+        } catch (JMSException e) {
+            throw new DataConnectException(ErrorCode.START_ERROR_CODE, 
e.getMessage(), e);
+        }
+        return connectionFactory;
+    }
+
+}
diff --git 
a/src/test/java/org/apache/rocketmq/connect/jms/RabbitmqConfigTest.java 
b/src/test/java/org/apache/rocketmq/connect/jms/RabbitmqConfigTest.java
new file mode 100644
index 0000000..7228f68
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/jms/RabbitmqConfigTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms;
+
+import org.apache.rocketmq.connect.rabbitmq.RabbitmqConfig;
+
+public class RabbitmqConfigTest {
+       
+    RabbitmqConfig config;
+
+   
+    
+}
diff --git 
a/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnectorTest.java
 
b/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnectorTest.java
new file mode 100644
index 0000000..ea52a43
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnectorTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.rabbitmq.connector;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.rocketmq.connect.jms.Config;
+import org.apache.rocketmq.connect.rabbitmq.RabbitmqConfig;
+import org.junit.Test;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+
+public class RabbitmqSourceConnectorTest {
+
+       RabbitmqSourceConnector rabbitmqSourceConnector = new 
RabbitmqSourceConnector();
+       
+       @Test
+       public void taskClass() {
+                assertEquals( RabbitmqSourceTask.class, 
rabbitmqSourceConnector.taskClass());
+       }
+
+       @Test
+       public void getRequiredConfig() {
+               assertEquals( RabbitmqConfig.REQUEST_CONFIG , 
rabbitmqSourceConnector.getRequiredConfig());
+       }
+       
+       
+       @Test
+       public void verifyAndSetConfig() {
+        KeyValue keyValue = new DefaultKeyValue();
+
+        for (String requestKey :RabbitmqConfig.REQUEST_CONFIG) {
+            assertEquals(rabbitmqSourceConnector.verifyAndSetConfig(keyValue), 
"Request config key: " + requestKey);
+            keyValue.put(requestKey, requestKey);
+        }
+        assertEquals(rabbitmqSourceConnector.verifyAndSetConfig(keyValue), "");
+       }
+}
diff --git 
a/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTaskTest.java
 
b/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTaskTest.java
new file mode 100644
index 0000000..232ddc1
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTaskTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.rabbitmq.connector;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.rocketmq.connect.jms.Replicator;
+import org.apache.rocketmq.connect.rabbitmq.RabbitmqConfig;
+import org.apache.rocketmq.connect.rabbitmq.pattern.RabbitMQPatternProcessor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.alibaba.fastjson.JSON;
+import com.rabbitmq.jms.admin.RMQConnectionFactory;
+import com.rabbitmq.jms.client.message.RMQBytesMessage;
+import com.rabbitmq.jms.client.message.RMQMapMessage;
+import com.rabbitmq.jms.client.message.RMQObjectMessage;
+import com.rabbitmq.jms.client.message.RMQStreamMessage;
+import com.rabbitmq.jms.client.message.RMQTextMessage;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.internal.DefaultKeyValue;
+
+public class RabbitmqSourceTaskTest {
+
+       //@Before
+       public void befores() throws JMSException, InterruptedException {
+               RMQConnectionFactory connectionFactory = new 
RMQConnectionFactory();
+               connectionFactory.setUri("amqp://112.74.48.251:5672");
+               Connection connection = 
connectionFactory.createConnection("admin", "admin");
+
+               connection.start();
+               Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+               Destination destination = session.createQueue("test-queue");
+
+               MessageProducer producer = session.createProducer(destination);
+
+               producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+               for (int i = 0; i < 20; i++) {
+                       TextMessage message = session.createTextMessage("hello 
我是消息:" + i);
+                       producer.send(message);
+               }
+
+               session.commit();
+               session.close();
+               connection.close();
+       }
+
+       //@Test
+       public void test() throws InterruptedException {
+               KeyValue kv = new DefaultKeyValue();
+               kv.put("rabbitmqUrl", "amqp://112.74.48.251:5672");
+               kv.put("rabbitmqUsername", "admin");
+               kv.put("rabbitmqPassword", "admin");
+               kv.put("destinationType", "queue");
+               kv.put("destinationName", "test-queue");
+               RabbitmqSourceTask task = new RabbitmqSourceTask();
+               task.start(kv);
+               for (int i = 0; i < 20;) {
+                       Collection<SourceDataEntry> sourceDataEntry = 
task.poll();
+                       i = i + sourceDataEntry.size();
+                       System.out.println(sourceDataEntry);
+               }
+               Thread.sleep(20000);
+       }
+
+       @Test
+       public void getMessageConnentTest() throws JMSException {
+               String value = "hello rocketmq";
+               RabbitmqSourceTask task = new RabbitmqSourceTask();
+               RMQTextMessage textMessage = new RMQTextMessage();
+               textMessage.setText(value);
+               ByteBuffer buffer = task.getMessageContent(textMessage);
+               Assert.assertEquals(new String(buffer.array()), 
textMessage.getText());
+
+               ObjectMessage objectMessage = new RMQObjectMessage();
+               objectMessage.setObject(value);
+               buffer = task.getMessageContent(objectMessage);
+               Assert.assertEquals(new String(buffer.array()), "\"" + 
objectMessage.getObject().toString() + "\"");
+
+               BytesMessage bytes = new RMQBytesMessage();
+               bytes.writeBytes(value.getBytes());
+               bytes.reset();
+               buffer = task.getMessageContent(bytes);
+               Assert.assertEquals(new String(buffer.array()), value);
+
+               MapMessage mapMessage = new RMQMapMessage();
+               mapMessage.setString("hello", "rocketmq");
+               buffer = task.getMessageContent(mapMessage);
+               Map<String, String> map = JSON.parseObject(buffer.array(), 
Map.class);
+               Assert.assertEquals(map.get("hello"), "rocketmq");
+               Assert.assertEquals(map.size(), 1);
+
+               StreamMessage streamMessage = new RMQStreamMessage();
+               String valueTwo = null;
+               for (int i = 0; i < 200; i++) {
+                       valueTwo = valueTwo + value;
+               }
+               streamMessage.writeBytes(valueTwo.getBytes());
+               streamMessage.reset();
+               //buffer = task.getMessageContent(streamMessage);
+               //Assert.assertEquals(new String(buffer.array()), valueTwo);
+
+       }
+       
+       @Test(expected=Exception.class)
+       public void getMessageConnentException() throws JMSException {
+               RabbitmqSourceTask task = new RabbitmqSourceTask();
+               task.getMessageContent(null);
+               
+       }
+
+       public void getPatternProcessor(Replicator replicator) {
+               KeyValue kv = new DefaultKeyValue();
+               kv.put("rabbitmqUrl", "amqp://112.74.48.251:5672");
+               kv.put("rabbitmqUsername", "admin");
+               kv.put("rabbitmqPassword", "admin");
+               kv.put("destinationType", "queue");
+               kv.put("destinationName", "test-queue");
+        RabbitmqConfig config = new RabbitmqConfig();
+        config.load(kv);
+        replicator = new Replicator(config,null);
+        RabbitmqSourceTask task = new RabbitmqSourceTask();
+               assertEquals(RabbitMQPatternProcessor.class, 
task.getPatternProcessor(replicator).getClass());
+       }
+
+       @Test
+       public void getConfig() {
+               RabbitmqSourceTask task = new RabbitmqSourceTask();
+               assertEquals(task.getConfig().getClass() , 
RabbitmqConfig.class);
+       }
+}
diff --git 
a/src/test/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessorTest.java
 
b/src/test/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessorTest.java
new file mode 100644
index 0000000..a23d233
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessorTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.rabbitmq.pattern;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.rocketmq.connect.jms.Replicator;
+import org.apache.rocketmq.connect.rabbitmq.RabbitmqConfig;
+import org.junit.Test;
+
+import com.rabbitmq.jms.admin.RMQConnectionFactory;
+
+public  class RabbitMQPatternProcessorTest{
+
+
+       @Test
+       public  void connectionFactory() {
+               RabbitmqConfig rabbitmqConfig = new RabbitmqConfig();
+               rabbitmqConfig.setRabbitmqUrl("amqp://112.74.48.251:5672");
+               Replicator replicator = new Replicator(rabbitmqConfig, null);
+               RabbitMQPatternProcessor patternProcessor = new 
RabbitMQPatternProcessor(replicator);
+               assertEquals(RMQConnectionFactory.class, 
patternProcessor.connectionFactory().getClass());
+    }
+    
+
+}

Reply via email to