This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit c7c0c04a7fd28d1067ec2c8700d404ac8dd2f391
Author: laohu <[email protected]>
AuthorDate: Sun Jun 2 21:10:20 2019 +0800

    init complete
---
 pom.xml                                            | 259 +++++++++++++++++++++
 .../java/io/openmessaging/activemq/Config.java     | 133 +++++++++++
 .../java/io/openmessaging/activemq/Replicator.java |  72 ++++++
 .../activemq/connector/ActivemqConnector.java      |  72 ++++++
 .../activemq/connector/ActivemqTask.java           |  87 +++++++
 .../activemq/pattern/PatternProcessor.java         |  83 +++++++
 6 files changed, 706 insertions(+)

diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..b021330
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,259 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor 
+       license agreements. See the NOTICE file distributed with this work for 
additional 
+       information regarding copyright ownership. The ASF licenses this file 
to 
+       You under the Apache License, Version 2.0 (the "License"); you may not 
use 
+       this file except in compliance with the License. You may obtain a copy 
of 
+       the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required 
+       by applicable law or agreed to in writing, software distributed under 
the 
+       License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS 
+       OF ANY KIND, either express or implied. See the License for the 
specific 
+       language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <groupId>org.apache.rocketmq</groupId>
+       <artifactId>rocketmq-connect-activemq</artifactId>
+       <version>1.0.0</version>
+
+       <name>connect-activemq</name>
+       <description>Redis Replicator is a redis RDB and Command parser written 
in java.
+        It can parse,filter,broadcast the RDB and Command events in a real 
time manner
+        and resent to Apache RocketMQ, then consumer could subscribe topic to 
receive data.
+    </description>
+       
<url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-activemq</url>
+
+       <licenses>
+               <license>
+                       <name>The Apache Software License, Version 2.0</name>
+                       
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+               </license>
+       </licenses>
+
+       <developers>
+               <developer>
+                       <name>Leon Chen</name>
+                       <email>[email protected]</email>
+                       <organization>moilioncircle</organization>
+                       
<organizationUrl>http://www.moilioncircle.com/</organizationUrl>
+                       <roles>
+                               <role>Developer</role>
+                       </roles>
+                       <timezone>+8</timezone>
+               </developer>
+
+               <developer>
+                       <name>Adrian Yao</name>
+                       <email>[email protected]</email>
+                       <organization>unstudy</organization>
+                       <timezone>+8</timezone>
+               </developer>
+
+               <developer>
+                       <name>Rick Zhang</name>
+                       <email>[email protected]</email>
+                       <organization>treefinance.com.cn</organization>
+                       <roles>
+                               <role>Developer</role>
+                       </roles>
+                       <timezone>+8</timezone>
+               </developer>
+       </developers>
+
+       <issueManagement>
+               <system>jira</system>
+               <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+       </issueManagement>
+
+       <properties>
+               
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+               
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+               <!-- Compiler settings properties -->
+               <maven.compiler.source>1.8</maven.compiler.source>
+               <maven.compiler.target>1.8</maven.compiler.target>
+       </properties>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               <artifactId>versions-maven-plugin</artifactId>
+                               <version>2.3</version>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               <artifactId>clirr-maven-plugin</artifactId>
+                               <version>2.7</version>
+                       </plugin>
+                       <plugin>
+                               <artifactId>maven-compiler-plugin</artifactId>
+                               <version>3.6.1</version>
+                               <configuration>
+                                       
<source>${maven.compiler.source}</source>
+                                       
<target>${maven.compiler.target}</target>
+                                       
<compilerVersion>${maven.compiler.source}</compilerVersion>
+                                       <showDeprecation>true</showDeprecation>
+                                       <showWarnings>true</showWarnings>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <version>2.19.1</version>
+                               <configuration>
+                                       <argLine>-Xms512m -Xmx1024m</argLine>
+                                       <forkMode>always</forkMode>
+                                       <includes>
+                                               <include>**/*Test.java</include>
+                                       </includes>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <artifactId>maven-site-plugin</artifactId>
+                               <version>3.6</version>
+                               <configuration>
+                                       <locales>en_US</locales>
+                                       <outputEncoding>UTF-8</outputEncoding>
+                                       <inputEncoding>UTF-8</inputEncoding>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <artifactId>maven-source-plugin</artifactId>
+                               <version>3.0.1</version>
+                               <executions>
+                                       <execution>
+                                               <id>attach-sources</id>
+                                               <goals>
+                                                       <goal>jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <artifactId>maven-javadoc-plugin</artifactId>
+                               <version>2.10.4</version>
+                               <configuration>
+                                       <charset>UTF-8</charset>
+                                       <locale>en_US</locale>
+                                       
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                               </configuration>
+                               <executions>
+                                       <execution>
+                                               <id>aggregate</id>
+                                               <goals>
+                                                       <goal>aggregate</goal>
+                                               </goals>
+                                               <phase>site</phase>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <artifactId>maven-resources-plugin</artifactId>
+                               <version>3.0.2</version>
+                               <configuration>
+                                       
<encoding>${project.build.sourceEncoding}</encoding>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               <artifactId>findbugs-maven-plugin</artifactId>
+                               <version>3.0.4</version>
+                       </plugin>
+               </plugins>
+       </build>
+
+       <dependencies>
+               <dependency>
+                       <groupId>junit</groupId>
+                       <artifactId>junit</artifactId>
+                       <version>4.11</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.assertj</groupId>
+                       <artifactId>assertj-core</artifactId>
+                       <version>2.6.0</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.mockito</groupId>
+                       <artifactId>mockito-core</artifactId>
+                       <version>2.6.3</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.httpcomponents</groupId>
+                       <artifactId>httpclient</artifactId>
+                       <version>4.5.5</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>io.openmessaging</groupId>
+                       <artifactId>openmessaging-connect-runtime</artifactId>
+                       <version>0.0.1-SNAPSHOT</version>
+               </dependency>
+               <dependency>
+                       <groupId>io.openmessaging</groupId>
+                       <artifactId>openmessaging-connector</artifactId>
+                       <version>0.1.0-beta</version>
+               </dependency>
+               <dependency>
+                       <groupId>io.openmessaging</groupId>
+                       <artifactId>openmessaging-api</artifactId>
+                       <version>0.3.1-alpha</version>
+               </dependency>
+               <dependency>
+                       <groupId>com.alibaba</groupId>
+                       <artifactId>fastjson</artifactId>
+                       <version>1.2.51</version>
+               </dependency>
+               <!-- <dependency> <groupId>io.javalin</groupId> 
<artifactId>javalin</artifactId> 
+                       <version>1.3.0</version> </dependency> -->
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-api</artifactId>
+                       <version>1.7.7</version>
+               </dependency>
+               <dependency>
+                       <groupId>ch.qos.logback</groupId>
+                       <artifactId>logback-classic</artifactId>
+                       <version>1.0.13</version>
+               </dependency>
+               <dependency>
+                       <groupId>ch.qos.logback</groupId>
+                       <artifactId>logback-core</artifactId>
+                       <version>1.0.13</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.rocketmq</groupId>
+                       <artifactId>rocketmq-openmessaging</artifactId>
+                       <version>4.3.2</version>
+               </dependency>
+               <dependency>
+                       <groupId>commons-cli</groupId>
+                       <artifactId>commons-cli</artifactId>
+                       <version>1.2</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.activemq</groupId>
+                       <artifactId>activemq-all</artifactId>
+                       <version>5.9.0</version>
+               </dependency>
+               <dependency>
+                       <groupId>javax.jms</groupId>
+                       <artifactId>javax.jms-api</artifactId>
+                       <version>2.0</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.glassfish.main.javaee-api</groupId>
+                       <artifactId>javax.jms</artifactId>
+                       <version>3.1.2.2</version>
+               </dependency>
+
+       </dependencies>
+
+</project>
diff --git a/src/main/java/io/openmessaging/activemq/Config.java 
b/src/main/java/io/openmessaging/activemq/Config.java
new file mode 100644
index 0000000..10a5d9f
--- /dev/null
+++ b/src/main/java/io/openmessaging/activemq/Config.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.activemq;
+
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Set;
+
+import io.openmessaging.KeyValue;
+
+public class Config {
+
+       public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+               {
+                       add("activemqUrl");
+                       add("activemqUsername");
+                       add("activemqPassword");
+                       add("destinationType");
+                       add("destinationName");
+               }
+       };
+
+       public String activemqUrl;
+
+       public String activemqUsername;
+
+       public String activemqPassword;
+
+       public String destinationType;
+
+       public String destinationName;
+
+       public void load(KeyValue props) {
+
+               properties2Object(props, this);
+       }
+
+       private void properties2Object(final KeyValue p, final Object object) {
+
+               Method[] methods = object.getClass().getMethods();
+               for (Method method : methods) {
+                       String mn = method.getName();
+                       if (mn.startsWith("set")) {
+                               try {
+                                       String tmp = mn.substring(4);
+                                       String first = mn.substring(3, 4);
+
+                                       String key = first.toLowerCase() + tmp;
+                                       String property = p.getString(key);
+                                       if (property != null) {
+                                               Class<?>[] pt = 
method.getParameterTypes();
+                                               if (pt != null && pt.length > 
0) {
+                                                       String cn = 
pt[0].getSimpleName();
+                                                       Object arg;
+                                                       if (cn.equals("int") || 
cn.equals("Integer")) {
+                                                               arg = 
Integer.parseInt(property);
+                                                       } else if 
(cn.equals("long") || cn.equals("Long")) {
+                                                               arg = 
Long.parseLong(property);
+                                                       } else if 
(cn.equals("double") || cn.equals("Double")) {
+                                                               arg = 
Double.parseDouble(property);
+                                                       } else if 
(cn.equals("boolean") || cn.equals("Boolean")) {
+                                                               arg = 
Boolean.parseBoolean(property);
+                                                       } else if 
(cn.equals("float") || cn.equals("Float")) {
+                                                               arg = 
Float.parseFloat(property);
+                                                       } else if 
(cn.equals("String")) {
+                                                               arg = property;
+                                                       } else {
+                                                               continue;
+                                                       }
+                                                       method.invoke(object, 
arg);
+                                               }
+                                       }
+                               } catch (Throwable ignored) {
+                               }
+                       }
+               }
+       }
+
+       public String getActivemqUrl() {
+               return activemqUrl;
+       }
+
+       public void setActivemqUrl(String activemqUrl) {
+               this.activemqUrl = activemqUrl;
+       }
+
+       public String getActivemqUsername() {
+               return activemqUsername;
+       }
+
+       public void setActivemqUsername(String activemqUsername) {
+               this.activemqUsername = activemqUsername;
+       }
+
+       public String getActivemqPassword() {
+               return activemqPassword;
+       }
+
+       public void setActivemqPassword(String activemqPassword) {
+               this.activemqPassword = activemqPassword;
+       }
+
+       public String getDestinationType() {
+               return destinationType;
+       }
+
+       public void setDestinationType(String destinationType) {
+               this.destinationType = destinationType;
+       }
+
+       public String getDestinationName() {
+               return destinationName;
+       }
+
+       public void setDestinationName(String destinationName) {
+               this.destinationName = destinationName;
+       }
+}
\ No newline at end of file
diff --git a/src/main/java/io/openmessaging/activemq/Replicator.java 
b/src/main/java/io/openmessaging/activemq/Replicator.java
new file mode 100644
index 0000000..51ca6c1
--- /dev/null
+++ b/src/main/java/io/openmessaging/activemq/Replicator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.activemq;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jms.Message;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.openmessaging.activemq.pattern.PatternProcessor;
+
+public class Replicator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Replicator.class);
+
+    private static final Logger POSITION_LOGGER = 
LoggerFactory.getLogger("PositionLogger");
+
+    private PatternProcessor processor;
+    
+    private Config config;
+    private Object lock = new Object();
+    private BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
+
+    public Replicator(Config config){
+        this.config = config;
+    }
+
+    public void start() {
+
+        try {
+            processor = new PatternProcessor(this);
+            processor.start();
+
+        } catch (Exception e) {
+            LOGGER.error("Start error.", e);
+        }
+    }
+
+    public void stop(){
+       processor.stop();
+    }
+
+    public void commit(Message message, boolean isComplete) {
+        queue.add(message);
+    }
+
+    public Config getConfig() {
+       return this.config;
+    }
+
+    public BlockingQueue<Message> getQueue() {
+        return queue;
+    }
+}
diff --git 
a/src/main/java/io/openmessaging/activemq/connector/ActivemqConnector.java 
b/src/main/java/io/openmessaging/activemq/connector/ActivemqConnector.java
new file mode 100644
index 0000000..1c9a530
--- /dev/null
+++ b/src/main/java/io/openmessaging/activemq/connector/ActivemqConnector.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.activemq.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.activemq.Config;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActivemqConnector extends SourceConnector {
+
+    private KeyValue config;
+
+    @Override
+    public String verifyAndSetConfig(KeyValue config) {
+
+        for(String requestKey : Config.REQUEST_CONFIG){
+            if(!config.containsKey(requestKey)){
+                return "Request config key: " + requestKey;
+            }
+        }
+        this.config = config;
+        return "";
+    }
+
+    @Override
+    public void start() {
+
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override public void pause() {
+
+    }
+
+    @Override public void resume() {
+
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return ActivemqTask.class;
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs() {
+        List<KeyValue> config = new ArrayList<>();
+        config.add(this.config);
+        return config;
+    }
+}
diff --git 
a/src/main/java/io/openmessaging/activemq/connector/ActivemqTask.java 
b/src/main/java/io/openmessaging/activemq/connector/ActivemqTask.java
new file mode 100644
index 0000000..a04fc50
--- /dev/null
+++ b/src/main/java/io/openmessaging/activemq/connector/ActivemqTask.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.activemq.connector;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Message;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.fastjson.JSON;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.activemq.Config;
+import io.openmessaging.activemq.Replicator;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.source.SourceTask;
+
+public class ActivemqTask extends SourceTask {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ActivemqTask.class);
+
+    private Replicator replicator;
+
+    private Config config;
+
+    @Override
+    public Collection<SourceDataEntry> poll() {
+
+        List<SourceDataEntry> res = new ArrayList<>();
+
+        try {
+               Message message = replicator.getQueue().poll(1000, 
TimeUnit.MILLISECONDS);
+            SourceDataEntry sourceDataEntry = null;
+            
+            res.add(sourceDataEntry);
+        } catch (Exception e) {
+            log.error("Mysql task poll error, current config:" + 
JSON.toJSONString(config), e);
+        }
+        return res;
+    }
+
+    @Override
+    public void start(KeyValue props) {
+
+        try {
+            this.config = new Config();
+            this.config.load(props);
+            this.replicator = new Replicator(config);
+        } catch (Exception e) {
+            log.error("Mysql task start failed.", e);
+        }
+        this.replicator.start();
+    }
+
+    @Override
+    public void stop() {
+        replicator.stop();
+    }
+
+    @Override public void pause() {
+
+    }
+
+    @Override public void resume() {
+
+    }
+}
diff --git 
a/src/main/java/io/openmessaging/activemq/pattern/PatternProcessor.java 
b/src/main/java/io/openmessaging/activemq/pattern/PatternProcessor.java
new file mode 100644
index 0000000..4b4b450
--- /dev/null
+++ b/src/main/java/io/openmessaging/activemq/pattern/PatternProcessor.java
@@ -0,0 +1,83 @@
+package io.openmessaging.activemq.pattern;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.commons.lang3.StringUtils;
+
+import io.openmessaging.activemq.Config;
+import io.openmessaging.activemq.Replicator;
+
+public class PatternProcessor {
+
+       private Replicator replicator;
+       
+       private Config config;
+       
+       Connection connection;
+       
+       Session session;
+       
+       MessageConsumer consumer;
+       
+       public PatternProcessor(Replicator replicator) {
+               this.replicator = replicator;
+               this.config = replicator.getConfig();
+       }
+       
+       public void start() {
+               try {
+                  ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(config.getActivemqUrl());
+                  
+               //2、使用连接工厂创建一个连接对象
+                  if(StringUtils.isNotBlank(config.getActivemqUsername()) && 
StringUtils.isNotBlank(config.getActivemqPassword()) ) {
+                  connection = 
connectionFactory.createConnection(config.getActivemqUsername() , 
config.getActivemqPassword());
+                  }else {
+                          connection = connectionFactory.createConnection();
+                  }
+               //3、开启连接
+               connection.start();
+               //4、使用连接对象创建会话(session)对象
+               Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+               //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
+               Destination destination = null;
+               if(StringUtils.equals("topic", config.getDestinationType())) {
+                       destination = 
session.createTopic(config.getDestinationName());
+               }else if(StringUtils.equals("queue", 
config.getDestinationType())){
+                       destination = 
session.createQueue(config.getDestinationName());
+               }else {
+                       throw new RuntimeException("");
+               }
+               consumer = session.createConsumer(destination);
+               //6、使用会话对象创建生产者对象
+               //7、向consumer对象中设置一个messageListener对象,用来接收消息
+               consumer.setMessageListener(new MessageListener() {
+                   @Override
+                   public void onMessage(Message message) {
+                       replicator.commit(message, true);
+                   }
+               });
+               }catch(Exception e) {
+                       
+               }
+       }
+       
+       public void stop() {
+        try {
+               consumer.close();
+               session.close();
+                       connection.close();
+               } catch (JMSException e) {
+                       e.printStackTrace();
+               }
+       }
+       
+ 
+}

Reply via email to