http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/util/URISpecParserTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/rocketmq/jms/util/URISpecParserTest.java 
b/core/src/test/java/org/apache/rocketmq/jms/util/URISpecParserTest.java
deleted file mode 100644
index af57f67..0000000
--- a/core/src/test/java/org/apache/rocketmq/jms/util/URISpecParserTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.util;
-
-import java.util.Map;
-import org.junit.Test;
-
-public class URISpecParserTest {
-
-    @Test
-    public void parseURI_NormalTest() {
-        Map<String, String> result = 
URISpecParser.parseURI("rocketmq://localhost");
-        System.out.println(result);
-
-        result = URISpecParser
-            .parseURI("rocketmq://xxx?appId=test&consumerId=testGroup");
-        System.out.println(result);
-
-        result = 
URISpecParser.parseURI("rocketmq:!@#$%^&*()//localhost?appId=test!@#$%^&*()");
-        System.out.println(result);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void parseURI_AbnormalTest() {
-        URISpecParser.parseURI("metaq3://localhost?appId=test");
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
deleted file mode 100644
index 469b9be..0000000
--- a/pom.xml
+++ /dev/null
@@ -1,196 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~  Unless required by applicable law or agreed to in writing, software
-  ~  distributed under the License is distributed on an "AS IS" BASIS,
-  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~  See the License for the specific language governing permissions and
-  ~  limitations under the License.
-  -->
-
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xmlns="http://maven.apache.org/POM/4.0.0";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-
-    <groupId>org.apache.rocketmq</groupId>
-    <artifactId>rocketmq-jms-all</artifactId>
-    <packaging>pom</packaging>
-    <version>1.0-SNAPSHOT</version>
-    <modules>
-        <module>spring</module>
-        <module>core</module>
-    </modules>
-
-    <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <!--maven properties -->
-        <maven.test.skip>false</maven.test.skip>
-        <maven.javadoc.skip>true</maven.javadoc.skip>
-        <!-- compiler settings properties -->
-        <maven.compiler.source>1.6</maven.compiler.source>
-        <maven.compiler.target>1.6</maven.compiler.target>
-        <surefire.version>2.19.1</surefire.version>
-        <rocketmq.version>4.0.0-incubating</rocketmq.version>
-
-    </properties>
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-client</artifactId>
-            <version>${rocketmq.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>javax.jms</groupId>
-            <artifactId>jms-api</artifactId>
-            <version>1.1-rev-1</version>
-        </dependency>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>18.0</version>
-        </dependency>
-        <dependency>
-            <groupId>commons-lang</groupId>
-            <artifactId>commons-lang</artifactId>
-            <version>2.6</version>
-        </dependency>
-
-        <!--test-->
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-            <version>4.12</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-namesrv</artifactId>
-            <version>${rocketmq.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-broker</artifactId>
-            <version>${rocketmq.version}</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-
-    <build>
-        <plugins>
-            <plugin>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <version>3.5.1</version>
-                <configuration>
-                    <source>${maven.compiler.source}</source>
-                    <target>${maven.compiler.target}</target>
-                    <compilerVersion>${maven.compiler.source}</compilerVersion>
-                    <showDeprecation>true</showDeprecation>
-                    <showWarnings>true</showWarnings>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.jacoco</groupId>
-                <artifactId>jacoco-maven-plugin</artifactId>
-                <version>0.7.8</version>
-                <executions>
-                    <execution>
-                        <id>default-prepare-agent</id>
-                        <goals>
-                            <goal>prepare-agent</goal>
-                        </goals>
-                        <configuration>
-                            
<destFile>${project.build.directory}/jacoco.exec</destFile>
-                        </configuration>
-                    </execution>
-                    <execution>
-                        <id>default-prepare-agent-integration</id>
-                        <phase>pre-integration-test</phase>
-                        <goals>
-                            <goal>prepare-agent-integration</goal>
-                        </goals>
-                        <configuration>
-                            
<destFile>${project.build.directory}/jacoco-it.exec</destFile>
-                            <propertyName>failsafeArgLine</propertyName>
-                        </configuration>
-                    </execution>
-                    <execution>
-                        <id>default-report</id>
-                        <goals>
-                            <goal>report</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>default-report-integration</id>
-                        <goals>
-                            <goal>report-integration</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-
-            <plugin>
-                <artifactId>maven-surefire-plugin</artifactId>
-                <version>${surefire.version}</version>
-            </plugin>
-            <plugin>
-                <artifactId>maven-failsafe-plugin</artifactId>
-                <version>${surefire.version}</version>
-                <configuration>
-                    <forkCount>1</forkCount>
-                    <reuseForks>true</reuseForks>
-                    <argLine>@{failsafeArgLine}</argLine>
-                </configuration>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>integration-test</goal>
-                            <goal>verify</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.eluder.coveralls</groupId>
-                <artifactId>coveralls-maven-plugin</artifactId>
-                <version>4.3.0</version>
-            </plugin>
-            <plugin>
-                <artifactId>maven-checkstyle-plugin</artifactId>
-                <version>2.17</version>
-                <executions>
-                    <execution>
-                        <id>verify</id>
-                        <phase>verify</phase>
-                        <configuration>
-                            
<configLocation>style/rmq_checkstyle.xml</configLocation>
-                            <encoding>UTF-8</encoding>
-                            <consoleOutput>true</consoleOutput>
-                            <failsOnError>true</failsOnError>
-                        </configuration>
-                        <goals>
-                            <goal>check</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-        <resources>
-            <resource>
-                <directory>src/main/resources</directory>
-                <filtering>true</filtering>
-            </resource>
-        </resources>
-    </build>
-
-</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/.gitignore
----------------------------------------------------------------------
diff --git a/rocketmq-jms/.gitignore b/rocketmq-jms/.gitignore
new file mode 100644
index 0000000..d2e5aaf
--- /dev/null
+++ b/rocketmq-jms/.gitignore
@@ -0,0 +1,5 @@
+.idea/
+*.iml
+*.ipr
+*.iws
+target/

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/.travis.yml
----------------------------------------------------------------------
diff --git a/rocketmq-jms/.travis.yml b/rocketmq-jms/.travis.yml
new file mode 100644
index 0000000..9f430b2
--- /dev/null
+++ b/rocketmq-jms/.travis.yml
@@ -0,0 +1,43 @@
+notifications:
+  email:
+    recipients:
+      - zhangke.huangs...@gmail.com
+      - zhendongli...@gmail.com
+  on_success: change
+  on_failure: always
+
+language: java
+
+matrix:
+  include:
+  # On OSX, run with default JDK only.
+  # - os: osx
+  # On Linux, run with specific JDKs only.
+  # - os: linux
+  #  env: CUSTOM_JDK="oraclejdk8"
+  - os: linux
+    env: CUSTOM_JDK="oraclejdk7"
+  #- os: linux
+  #  env: CUSTOM_JDK="openjdk7"
+
+before_install:
+  - echo 'MAVEN_OPTS="$MAVEN_OPTS -Xmx1024m -XX:MaxPermSize=512m 
-XX:+BytecodeVerificationLocal"' >> ~/.mavenrc
+  - cat ~/.mavenrc
+  - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export 
JAVA_HOME=$(/usr/libexec/java_home); fi
+  - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; 
fi
+
+#os:
+#  - linux
+#  - osx
+#jdk:
+#  - oraclejdk8
+#  - oraclejdk7
+#  - openjdk7
+
+
+script:
+  - travis_retry mvn -B clean install jacoco:report coveralls:report
+
+#after_success:
+#  - mvn clean install
+#  - mvn sonar:sonar

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/README.md
----------------------------------------------------------------------
diff --git a/rocketmq-jms/README.md b/rocketmq-jms/README.md
new file mode 100644
index 0000000..a05e27e
--- /dev/null
+++ b/rocketmq-jms/README.md
@@ -0,0 +1,31 @@
+# RocketMQ-JMS   [![Build 
Status](https://travis-ci.org/rocketmq/rocketmq-jms.svg?branch=master)](https://travis-ci.org/rocketmq/rocketmq-jms)
 [![Coverage 
Status](https://coveralls.io/repos/github/rocketmq/rocketmq-jms/badge.svg?branch=master)](https://coveralls.io/github/rocketmq/rocketmq-jms?branch=master)
+
+
+## Introduction
+RocketMQ-JMS is an implement of JMS specification,taking Apache RocketMQ as 
broker.
+Now we are on the way of supporting JMS 1.1 and JMS2.0 is our final target.   
+
+Now RocketMQ-JMS will release the first version soon, and new features will be 
developed on the branch "v1.1".
+Please visit the [issue 
board](https://github.com/rocketmq/rocketmq-jms/issues) to see features in next 
version. 
+
+
+## Building
+
+  > cd rocketmq-jms  
+  > mvn clean install  
+  
+  **run unit test:**  
+  > mvn test    
+  
+  **run integration test:**  
+  > mvn verify
+  
+  **see jacoco code coverage report**
+  > open core/target/site/jacoco/index.html  
+  > open core/target/site/jacoco-it/index.html  
+  > open spring/target/site/jacoco-it/index.html 
+  
+  
+## Guidelines
+
+ Please see [Coding Guidelines 
Introduction](http://rocketmq.apache.org/docs/code-guidelines/)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/pom.xml b/rocketmq-jms/core/pom.xml
new file mode 100644
index 0000000..1b36e14
--- /dev/null
+++ b/rocketmq-jms/core/pom.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>rocketmq-jms-all</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>rocketmq-jms</artifactId>
+    <version>1.0-SNAPSHOT</version>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java
 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java
new file mode 100644
index 0000000..80a8b64
--- /dev/null
+++ 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+public interface CommonConstant {
+
+    String PRODUCERID = "producerId";
+
+    String CONSUMERID = "consumerId";
+
+    String PROVIDER = "provider";
+
+    String NAMESERVER = "nameServer";
+
+    String INSTANCE_NAME = "instanceName";
+
+    String CONSUME_THREAD_NUMS = "consumeThreadNums";
+
+    String SEND_TIMEOUT_MILLIS = "sendMsgTimeoutMillis";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java
 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java
new file mode 100644
index 0000000..c8e4276
--- /dev/null
+++ 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import org.apache.commons.lang.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+
+public class CommonContext {
+    private String accessKey;
+    private String secretKey;
+
+    private String consumerId;
+    private String producerId;
+
+    private String provider;
+
+    private String appId;
+
+    private String nameServer;
+
+    /**
+     * MQType
+     */
+    private String mqType;
+
+    /**
+     * Using for distinguishing client jvm process
+     */
+    private String instanceName;
+    /**
+     * Set consumer threadPool Size
+     */
+    private int consumeThreadNums;
+    /**
+     * Set send message timeOut
+     */
+    private int sendMsgTimeoutMillis = -1;
+
+    /**
+     * @return the appId
+     */
+    public String getAppId() {
+        return appId;
+    }
+
+    /**
+     * @param appId the appId to set
+     */
+    public void setAppId(String appId) {
+        this.appId = appId;
+    }
+
+    /**
+     * @return the provider
+     */
+    public String getProvider() {
+        return provider;
+    }
+
+    /**
+     * @param provider the provider to set
+     */
+    public void setProvider(String provider) {
+        this.provider = provider;
+    }
+
+    /**
+     * @return the instanceName
+     */
+    public String getInstanceName() {
+        return instanceName;
+    }
+
+    /**
+     * @param instanceName the instanceName to set
+     */
+    public void setInstanceName(String instanceName) {
+        this.instanceName = instanceName;
+    }
+
+    /**
+     * @return the accessKey
+     */
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    /**
+     * @param accessKey the accessKey to set
+     */
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    /**
+     * @return the secretKey
+     */
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    /**
+     * @param secretKey the secretKey to set
+     */
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+
+    /**
+     * @return consumer thread nums
+     */
+    public int getConsumeThreadNums() {
+        return consumeThreadNums;
+    }
+
+    /**
+     * @param consumeThreadNums
+     */
+    public void setConsumeThreadNums(int consumeThreadNums) {
+        this.consumeThreadNums = consumeThreadNums;
+    }
+
+    public String getConsumerId() {
+        return consumerId;
+    }
+
+    public void setConsumerId(String consumerId) {
+        this.consumerId = consumerId;
+    }
+
+    public String getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(String producerId) {
+        this.producerId = producerId;
+    }
+
+    public int getSendMsgTimeoutMillis() {
+        return sendMsgTimeoutMillis;
+    }
+
+    public void setSendMsgTimeoutMillis(int sendMsgTimeoutMillis) {
+        this.sendMsgTimeoutMillis = sendMsgTimeoutMillis;
+    }
+
+    public String getMqType() {
+        return mqType;
+    }
+
+    public void setMqType(String mqType) {
+        this.mqType = mqType;
+    }
+
+    public String getNameServer() {
+        return nameServer;
+    }
+
+    public void setNameServer(String nameServer) {
+        this.nameServer = nameServer;
+    }
+
+    @Override
+    public String toString() {
+        return ReflectionToStringBuilder.toString(this, 
ToStringStyle.DEFAULT_STYLE);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java
 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java
new file mode 100644
index 0000000..4c809c7
--- /dev/null
+++ 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+import org.apache.commons.lang.StringUtils;
+
+public class JmsBaseConnection implements Connection {
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    protected String clientID;
+    protected ExceptionListener exceptionListener;
+    protected CommonContext context;
+    protected JmsBaseSession session;
+
+    public JmsBaseConnection(Map<String, String> connectionParams) {
+
+        this.clientID = UUID.randomUUID().toString();
+
+        context = new CommonContext();
+
+        //At lease one should be set
+        context.setProducerId(connectionParams.get(CommonConstant.PRODUCERID));
+        context.setConsumerId(connectionParams.get(CommonConstant.CONSUMERID));
+
+        //optional
+        context.setProvider(connectionParams.get(CommonConstant.PROVIDER));
+
+        String nameServer = connectionParams.get(CommonConstant.NAMESERVER);
+        String consumerThreadNums = 
connectionParams.get(CommonConstant.CONSUME_THREAD_NUMS);
+        String sendMsgTimeoutMillis = 
connectionParams.get(CommonConstant.SEND_TIMEOUT_MILLIS);
+        String instanceName = 
connectionParams.get(CommonConstant.INSTANCE_NAME);
+
+        if (StringUtils.isNotEmpty(nameServer)) {
+            context.setNameServer(nameServer);
+        }
+        if (StringUtils.isNotEmpty(instanceName)) {
+            
context.setInstanceName(connectionParams.get(CommonConstant.INSTANCE_NAME));
+        }
+
+        if (StringUtils.isNotEmpty(consumerThreadNums)) {
+            context.setConsumeThreadNums(Integer.parseInt(consumerThreadNums));
+        }
+        if (StringUtils.isNotEmpty(sendMsgTimeoutMillis)) {
+            
context.setSendMsgTimeoutMillis(Integer.parseInt(sendMsgTimeoutMillis));
+        }
+    }
+
+    @Override
+    public Session createSession(boolean transacted, int acknowledgeMode) 
throws JMSException {
+
+        Preconditions.checkArgument(!transacted, "Not support transaction 
Session !");
+        Preconditions.checkArgument(Session.AUTO_ACKNOWLEDGE == 
acknowledgeMode,
+            "Not support this acknowledge mode: " + acknowledgeMode);
+
+        if (null != this.session) {
+            return this.session;
+        }
+        synchronized (this) {
+            if (null != this.session) {
+                return this.session;
+            }
+            this.session = new JmsBaseSession(this, transacted, 
acknowledgeMode, context);
+            if (isStarted()) {
+                this.session.start();
+            }
+            return this.session;
+        }
+    }
+
+    @Override
+    public String getClientID() throws JMSException {
+        return this.clientID;
+    }
+
+    @Override
+    public void setClientID(String clientID) throws JMSException {
+        this.clientID = clientID;
+    }
+
+    @Override
+    public ConnectionMetaData getMetaData() throws JMSException {
+        return new JmsBaseConnectionMetaData();
+    }
+
+    @Override
+    public ExceptionListener getExceptionListener() throws JMSException {
+        return this.exceptionListener;
+    }
+
+    @Override
+    public void setExceptionListener(ExceptionListener listener) throws 
JMSException {
+        this.exceptionListener = listener;
+    }
+
+    @Override
+    public void start() throws JMSException {
+        if (started.compareAndSet(false, true)) {
+            if (this.session != null) {
+                this.session.start();
+            }
+
+        }
+    }
+
+    @Override
+    public void stop() throws JMSException {
+        //Stop the connection before closing it.
+        //Do nothing here.
+    }
+
+    @Override
+    public void close() throws JMSException {
+        if (started.compareAndSet(true, false)) {
+            if (this.session != null) {
+                this.session.close();
+            }
+
+        }
+    }
+
+    @Override
+    public ConnectionConsumer createConnectionConsumer(Destination destination,
+        String messageSelector,
+        ServerSessionPool sessionPool,
+        int maxMessages) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, 
String subscriptionName,
+        String messageSelector,
+        ServerSessionPool sessionPool,
+        int maxMessages) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    /**
+     * Whether the connection is started.
+     *
+     * @return whether the connection is started.
+     */
+    public boolean isStarted() {
+        return started.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java
 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java
new file mode 100644
index 0000000..1b9da06
--- /dev/null
+++ 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import java.net.URI;
+import java.util.Map;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import org.apache.rocketmq.jms.util.URISpecParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsBaseConnectionFactory implements ConnectionFactory {
+
+    private static Logger logger = LoggerFactory
+        .getLogger(JmsBaseConnectionFactory.class);
+    /**
+     * Synchronization monitor for the shared Connection
+     */
+    private final Object connectionMonitor = new Object();
+    /**
+     * Can be configured in a consistent way without too much URL hacking.
+     */
+    protected URI connectionUri;
+    /**
+     * Store connection uri query parameters.
+     */
+    protected Map<String, String> connectionParams;
+    /**
+     * Wrapped Connection
+     */
+    protected JmsBaseConnection connection;
+
+    public JmsBaseConnectionFactory() {
+
+    }
+
+    public JmsBaseConnectionFactory(URI connectionUri) {
+        setConnectionUri(connectionUri);
+    }
+
+    public void setConnectionUri(URI connectionUri) {
+        Preconditions.checkNotNull(connectionUri, "Please set URI !");
+        this.connectionUri = connectionUri;
+        this.connectionParams = 
URISpecParser.parseURI(connectionUri.toString());
+
+        if (null != connectionParams) {
+            Preconditions.checkState(null != 
connectionParams.get(CommonConstant.CONSUMERID) ||
+                null != connectionParams.get(CommonConstant.PRODUCERID), 
"Please set consumerId or ProducerId !");
+        }
+
+    }
+
+    @Override
+    public Connection createConnection() throws JMSException {
+        synchronized (this.connectionMonitor) {
+            if (this.connection == null) {
+                initConnection();
+            }
+            return this.connection;
+        }
+    }
+
+    /**
+     * Using userName and Password to create a connection
+     *
+     * @param userName ignored
+     * @param password ignored
+     * @return the new JMS Connection
+     * @throws JMSException
+     */
+    @Override
+    public Connection createConnection(String userName, String password) 
throws JMSException {
+        logger.debug("Using userName and Password to create a connection.");
+        return this.createConnection();
+    }
+
+    /**
+     * Initialize the underlying shared Connection.
+     * <p/>
+     * Closes and reInitializes the Connection if an underlying Connection is 
present already.
+     *
+     * @throws javax.jms.JMSException if thrown by JMS API methods
+     */
+    protected void initConnection() throws JMSException {
+        synchronized (this.connectionMonitor) {
+            if (this.connection != null) {
+                closeConnection(this.connection);
+            }
+            this.connection = doCreateConnection();
+            logger.debug("Established shared JMS Connection: {}", 
this.connection);
+        }
+    }
+
+    /**
+     * Close the given Connection.
+     *
+     * @param con the Connection to close
+     */
+    protected void closeConnection(Connection con) {
+        logger.debug("Closing shared JMS Connection: {}", this.connection);
+        try {
+            try {
+                con.stop();
+            }
+            finally {
+                con.close();
+            }
+        }
+        catch (Throwable ex) {
+            logger.error("Could not close shared JMS Connection.", ex);
+        }
+    }
+
+    /**
+     * Create a JMS Connection
+     *
+     * @return the new JMS Connection
+     * @throws javax.jms.JMSException if thrown by JMS API methods
+     */
+    protected JmsBaseConnection doCreateConnection() throws JMSException {
+        Preconditions.checkState(null != this.connectionParams && 
this.connectionParams.size() > 0,
+            "Connection Parameters can not be null!");
+        this.connection = new JmsBaseConnection(this.connectionParams);
+
+        return connection;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java
 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java
new file mode 100644
index 0000000..ee549aa
--- /dev/null
+++ 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Enumeration;
+import java.util.Properties;
+import java.util.Vector;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.jms.ConnectionMetaData;
+import javax.jms.JMSException;
+
+public class JmsBaseConnectionMetaData implements ConnectionMetaData {
+    public static final String JMS_VERSION;
+    public static final int JMS_MAJOR_VERSION;
+    public static final int JMS_MINOR_VERSION;
+
+    public static final String PROVIDER_VERSION;
+    public static final int PROVIDER_MAJOR_VERSION;
+    public static final int PROVIDER_MINOR_VERSION;
+
+    public static final String PROVIDER_NAME = "Apache RocketMQ";
+
+    public static final JmsBaseConnectionMetaData INSTANCE = new 
JmsBaseConnectionMetaData();
+
+    public static InputStream resourceStream;
+
+    static {
+        Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*");
+
+        String jmsVersion = null;
+        int jmsMajor = 0;
+        int jmsMinor = 0;
+        try {
+            Package p = Package.getPackage("javax.jms");
+            if (p != null) {
+                jmsVersion = p.getImplementationVersion();
+                Matcher m = pattern.matcher(jmsVersion);
+                if (m.matches()) {
+                    jmsMajor = Integer.parseInt(m.group(1));
+                    jmsMinor = Integer.parseInt(m.group(2));
+                }
+            }
+        }
+        catch (Throwable e) {
+        }
+        JMS_VERSION = jmsVersion;
+        JMS_MAJOR_VERSION = jmsMajor;
+        JMS_MINOR_VERSION = jmsMinor;
+
+        String providerVersion = null;
+        int providerMajor = 0;
+        int providerMinor = 0;
+        Properties properties = new Properties();
+        try {
+            resourceStream = 
JmsBaseConnectionMetaData.class.getResourceAsStream("/application.conf");
+            properties.load(resourceStream);
+            providerVersion = properties.getProperty("version");
+
+            Matcher m = pattern.matcher(providerVersion);
+            if (m.matches()) {
+                providerMajor = Integer.parseInt(m.group(1));
+                providerMinor = Integer.parseInt(m.group(2));
+            }
+        }
+        catch (IOException e) {
+            e.printStackTrace();
+        }
+        PROVIDER_VERSION = providerVersion;
+        PROVIDER_MAJOR_VERSION = providerMajor;
+        PROVIDER_MINOR_VERSION = providerMinor;
+
+    }
+
+    public String getJMSVersion() throws JMSException {
+        return JMS_VERSION;
+    }
+
+    public int getJMSMajorVersion() throws JMSException {
+        return JMS_MAJOR_VERSION;
+    }
+
+    public int getJMSMinorVersion() throws JMSException {
+        return JMS_MINOR_VERSION;
+    }
+
+    public String getJMSProviderName() throws JMSException {
+        return PROVIDER_NAME;
+    }
+
+    public String getProviderVersion() throws JMSException {
+        return PROVIDER_VERSION;
+    }
+
+    public int getProviderMajorVersion() throws JMSException {
+        return PROVIDER_MAJOR_VERSION;
+    }
+
+    public int getProviderMinorVersion() throws JMSException {
+        return PROVIDER_MINOR_VERSION;
+    }
+
+    public Enumeration<?> getJMSXPropertyNames() throws JMSException {
+        Vector<String> jmxProperties = new Vector<String>();
+        jmxProperties.add("jmsXUserId");
+        jmxProperties.add("jmsXAppId");
+        jmxProperties.add("jmsXGroupID");
+        jmxProperties.add("jmsXGroupSeq");
+        jmxProperties.add("jmsXState");
+        jmxProperties.add("jmsXDeliveryCount");
+        jmxProperties.add("jmsXProducerTXID");
+        jmxProperties.add("jmsConsumerTXID");
+        jmxProperties.add("jmsRecvTimeStamp");
+        return jmxProperties.elements();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java
 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java
new file mode 100644
index 0000000..f0bca28
--- /dev/null
+++ 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+public interface JmsBaseConstant {
+    //------------------------JMS message header 
constant---------------------------------
+    String JMS_DESTINATION = "jmsDestination";
+    String JMS_DELIVERY_MODE = "jmsDeliveryMode";
+    String JMS_EXPIRATION = "jmsExpiration";
+    String JMS_DELIVERY_TIME = "jmsDeliveryTime";
+    String JMS_PRIORITY = "jmsPriority";
+    String JMS_MESSAGE_ID = "jmsMessageID";
+    String JMS_TIMESTAMP = "jmsTimestamp";
+    String JMS_CORRELATION_ID = "jmsCorrelationID";
+    String JMS_REPLY_TO = "jmsReplyTo";
+    String JMS_TYPE = "jmsType";
+    String JMS_REDELIVERED = "jmsRedelivered";
+
+    //-------------------------JMS defined properties 
constant----------------------------
+    /**
+     * The identity of the user sending the Send message
+     */
+    String JMS_XUSER_ID = "jmsXUserID";
+    /**
+     * The identity of the application Send sending the message
+     */
+    String JMS_XAPP_ID = "jmsXAppID";
+    /**
+     * The number of message delivery Receive attempts
+     */
+    String JMS_XDELIVERY_COUNT = "jmsXDeliveryCount";
+    /**
+     * The identity of the message group this message is part of
+     */
+    String JMS_XGROUP_ID = "jmsXGroupID";
+    /**
+     * The sequence number of this message within the group; the first message 
is 1, the second 2,...
+     */
+    String JMS_XGROUP_SEQ = "jmsXGroupSeq";
+    /**
+     * The transaction identifier of the Send transaction within which this 
message was produced
+     */
+    String JMS_XPRODUCER_TXID = "jmsXProducerTXID";
+    /**
+     * The transaction identifier of the Receive transaction within which this 
message was consumed
+     */
+    String JMS_XCONSUMER_TXID = "jmsXConsumerTXID";
+
+    /**
+     * The time JMS delivered the Receive message to the consumer
+     */
+    String JMS_XRCV_TIMESTAMP = "jmsXRcvTimestamp";
+    /**
+     * Assume there exists a message warehouse that contains a separate copy 
of each message sent to each consumer and
+     * that these copies exist from the time the original message was sent. 
Each copy’s state is one of: 1(waiting),
+     * 2(ready), 3(expired) or 4(retained) Since state is of no interest to 
producers and consumers it is not provided
+     * to either. It is only of relevance to messages looked up in a warehouse 
and JMS provides no API for this.
+     */
+    String JMS_XSTATE = "jmsXState";
+
+    //---------------------------JMS Headers' value 
constant---------------------------
+    /**
+     * Default time to live
+     */
+    long DEFAULT_TIME_TO_LIVE = 3 * 24 * 60 * 60 * 1000;
+
+    /**
+     * Default Jms Type
+     */
+    String DEFAULT_JMS_TYPE = "rocketmq";
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java
 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java
new file mode 100644
index 0000000..b62e928
--- /dev/null
+++ 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.MapMaker;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+
+public class JmsBaseMessageConsumer implements MessageConsumer {
+
+    private static final Object LOCK_OBJECT = new Object();
+    //all shared consumers
+    private static ConcurrentMap<String/**consumerId*/, RMQPushConsumerExt> 
consumerMap = new MapMaker().makeMap();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private CommonContext context;
+    private Destination destination;
+    private MessageListener messageListener;
+
+    public JmsBaseMessageConsumer(Destination destination, CommonContext 
commonContext,
+        JmsBaseConnection connection) throws JMSException {
+        synchronized (LOCK_OBJECT) {
+            checkArgs(destination, commonContext);
+
+            if (null == consumerMap.get(context.getConsumerId())) {
+                DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer(context.getConsumerId());
+                if (context.getConsumeThreadNums() > 0) {
+                    
consumer.setConsumeThreadMax(context.getConsumeThreadNums());
+                    
consumer.setConsumeThreadMin(context.getConsumeThreadNums());
+                }
+                if (!Strings.isNullOrEmpty(context.getNameServer())) {
+                    consumer.setNamesrvAddr(context.getNameServer());
+                }
+                if (!Strings.isNullOrEmpty(context.getInstanceName())) {
+                    consumer.setInstanceName(context.getInstanceName());
+                }
+                consumer.setConsumeMessageBatchMaxSize(1);
+                //add subscribe?
+                RMQPushConsumerExt rocketmqConsumerExt = new 
RMQPushConsumerExt(consumer);
+                consumerMap.putIfAbsent(context.getConsumerId(), 
rocketmqConsumerExt);
+            }
+
+            consumerMap.get(context.getConsumerId()).incrementAndGet();
+
+            //If the connection has been started, start the consumer right now.
+            //add start status?
+            RMQPushConsumerExt consumerExt = 
consumerMap.get(context.getConsumerId());
+            if (connection.isStarted()) {
+                try {
+                    consumerExt.start();
+                }
+                catch (MQClientException mqe) {
+                    JMSException jmsException = new JMSException("Start 
consumer failed " + context.getConsumerId());
+                    jmsException.initCause(mqe);
+                    throw jmsException;
+                }
+            }
+        }
+
+    }
+
+    private void checkArgs(Destination destination, CommonContext context) 
throws JMSException {
+        Preconditions.checkNotNull(context.getConsumerId(), "ConsumerId can 
not be null!");
+        Preconditions.checkNotNull(destination.toString(), "Destination can 
not be null!");
+        this.context = context;
+        this.destination = destination;
+    }
+
+    @Override
+    public String getMessageSelector() throws JMSException {
+        return null;
+    }
+
+    @Override
+    public MessageListener getMessageListener() throws JMSException {
+        return this.messageListener;
+    }
+
+    @Override
+    public void setMessageListener(MessageListener listener) throws 
JMSException {
+        RMQPushConsumerExt rocketmqConsumerExt = 
consumerMap.get(context.getConsumerId());
+        if (null != rocketmqConsumerExt) {
+            try {
+                this.messageListener = listener;
+                String messageTopic = ((JmsBaseTopic) 
destination).getMessageTopic();
+                String messageType = ((JmsBaseTopic) 
destination).getMessageType();
+                rocketmqConsumerExt.subscribe(messageTopic, messageType, 
listener);
+            }
+            catch (MQClientException mqe) {
+                //add what?
+                throw new JMSException(mqe.getMessage());
+            }
+
+        }
+
+    }
+
+    @Override
+    public Message receive() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    @Override
+    public Message receive(long timeout) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    @Override
+    public Message receiveNoWait() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    @Override
+    public void close() throws JMSException {
+        synchronized (LOCK_OBJECT) {
+            if (closed.compareAndSet(false, true)) {
+                RMQPushConsumerExt rocketmqConsumerExt = 
consumerMap.get(context.getConsumerId());
+                if (null != rocketmqConsumerExt && 0 == 
rocketmqConsumerExt.decrementAndGet()) {
+                    rocketmqConsumerExt.close();
+                    consumerMap.remove(context.getConsumerId());
+                }
+            }
+        }
+    }
+
+    /**
+     * Start the consumer to get message from the Broker.
+     */
+    public void startConsumer() throws JMSException {
+        RMQPushConsumerExt rocketmqConsumerExt = 
consumerMap.get(context.getConsumerId());
+        if (null != rocketmqConsumerExt) {
+            try {
+                rocketmqConsumerExt.start();
+            }
+            catch (MQClientException mqe) {
+                throw ExceptionUtil.convertToJmsException(mqe, "Start consumer 
failed");
+            }
+        }
+    }
+
+    public Destination getDestination() throws JMSException {
+        return this.destination;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java
 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java
new file mode 100644
index 0000000..8dd82f0
--- /dev/null
+++ 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.MapMaker;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentMap;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.jms.domain.message.JmsBaseMessage;
+import org.apache.rocketmq.jms.domain.message.JmsBytesMessage;
+import org.apache.rocketmq.jms.domain.message.JmsObjectMessage;
+import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+import org.apache.rocketmq.jms.util.MessageConverter;
+import org.apache.rocketmq.jms.util.MsgConvertUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsBaseMessageProducer implements MessageProducer {
+
+    private static final Object LOCK_OBJECT = new Object();
+    private static ConcurrentMap<String, MQProducer> producerMap = new 
MapMaker().makeMap();
+    private final Logger logger = 
LoggerFactory.getLogger(JmsBaseMessageProducer.class);
+    private CommonContext context;
+
+    private Destination destination;
+
+    public JmsBaseMessageProducer(Destination destination, CommonContext 
context) throws JMSException {
+        synchronized (LOCK_OBJECT) {
+            checkArgs(destination, context);
+
+            if (null == producerMap.get(this.context.getProducerId())) {
+                DefaultMQProducer producer = new 
DefaultMQProducer(context.getProducerId());
+                if (!Strings.isNullOrEmpty(context.getNameServer())) {
+                    producer.setNamesrvAddr(context.getNameServer());
+                }
+                if (!Strings.isNullOrEmpty(context.getInstanceName())) {
+                    producer.setInstanceName(context.getInstanceName());
+                }
+                if (context.getSendMsgTimeoutMillis() > 0) {
+                    
producer.setSendMsgTimeout(context.getSendMsgTimeoutMillis());
+                }
+                try {
+                    producer.start();
+                }
+                catch (MQClientException mqe) {
+                    throw ExceptionUtil.convertToJmsException(mqe, 
String.format("Start producer failed:%s", context.getProducerId()));
+                }
+                producerMap.putIfAbsent(this.context.getProducerId(), 
producer);
+            }
+
+        }
+    }
+
+    private void checkArgs(Destination destination, CommonContext context) 
throws JMSException {
+        Preconditions.checkNotNull(context.getProducerId(), "ProducerId can 
not be null!");
+        Preconditions.checkNotNull(destination.toString(), "Destination can 
not be null!");
+        this.context = context;
+        this.destination = destination;
+    }
+
+    @Override
+    public boolean getDisableMessageID() throws JMSException {
+        return false;
+    }
+
+    @Override
+    public void setDisableMessageID(boolean value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public boolean getDisableMessageTimestamp() throws JMSException {
+        return false;
+    }
+
+    @Override
+    public void setDisableMessageTimestamp(boolean value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public int getDeliveryMode() throws JMSException {
+        return javax.jms.Message.DEFAULT_DELIVERY_MODE;
+    }
+
+    @Override
+    public void setDeliveryMode(int deliveryMode) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public int getPriority() throws JMSException {
+        return javax.jms.Message.DEFAULT_PRIORITY;
+    }
+
+    @Override
+    public void setPriority(int defaultPriority) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public long getTimeToLive() throws JMSException {
+        return JmsBaseConstant.DEFAULT_TIME_TO_LIVE;
+    }
+
+    @Override
+    public void setTimeToLive(long timeToLive) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public Destination getDestination() throws JMSException {
+        return this.destination;
+    }
+
+    @Override
+    public void close() throws JMSException {
+        //Nothing to do
+    }
+
+    @Override
+    public void send(javax.jms.Message message) throws JMSException {
+        this.send(getDestination(), message);
+    }
+
+    /**
+     * Send the message to the defined Destination success---return normally. 
Exception---throw out JMSException.
+     *
+     * @param destination see <CODE>Destination</CODE>
+     * @param message the message to be sent.
+     * @throws javax.jms.JMSException
+     */
+    @Override
+    public void send(Destination destination, javax.jms.Message message) 
throws JMSException {
+        JmsBaseMessage jmsMsg = (JmsBaseMessage) message;
+        initJMSHeaders(jmsMsg, destination);
+
+        try {
+            if (context == null) {
+                throw new IllegalStateException("Context should be inited");
+            }
+            org.apache.rocketmq.common.message.Message rocketmqMsg = 
MessageConverter.convert2RMQMessage(jmsMsg);
+
+            MQProducer producer = producerMap.get(context.getProducerId());
+
+            if (producer == null) {
+                throw new Exception("producer is null ");
+            }
+            SendResult sendResult = producer.send(rocketmqMsg);
+            if (sendResult != null && sendResult.getSendStatus() == 
SendStatus.SEND_OK) {
+                jmsMsg.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:" + 
sendResult.getMsgId());
+            } else {
+                throw new Exception("SendResult is " + (sendResult == null ? 
"null" : sendResult.toString()));
+            }
+        }
+        catch (Exception e) {
+            logger.error("Send rocketmq message failure !", e);
+            //if fail to send the message, throw out JMSException
+            JMSException jmsException = new JMSException("Send rocketmq 
message failure!");
+            jmsException.setLinkedException(e);
+            throw jmsException;
+        }
+    }
+
+    @Override
+    public void send(javax.jms.Message message, int deliveryMode, int priority,
+        long timeToLive) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public void send(Destination destination, javax.jms.Message message, int 
deliveryMode,
+        int priority, long timeToLive) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    /**
+     * Init the JmsMessage Headers.
+     * <p/>
+     * <P>JMS providers init message's headers. Do not allow user to set these 
by yourself.
+     *
+     * @param jmsMsg message
+     * @param destination
+     * @throws javax.jms.JMSException
+     * @see <CODE>Destination</CODE>
+     */
+    private void initJMSHeaders(JmsBaseMessage jmsMsg, Destination 
destination) throws JMSException {
+
+        //JMS_DESTINATION default:"topic:message"
+        jmsMsg.setHeader(JmsBaseConstant.JMS_DESTINATION, destination);
+        //JMS_DELIVERY_MODE default : PERSISTENT
+        jmsMsg.setHeader(JmsBaseConstant.JMS_DELIVERY_MODE, 
javax.jms.Message.DEFAULT_DELIVERY_MODE);
+        //JMS_TIMESTAMP default : current time
+        jmsMsg.setHeader(JmsBaseConstant.JMS_TIMESTAMP, 
System.currentTimeMillis());
+        //JMS_EXPIRATION default :  3 days
+        //JMS_EXPIRATION = currentTime + time_to_live
+        jmsMsg.setHeader(JmsBaseConstant.JMS_EXPIRATION, 
System.currentTimeMillis() + JmsBaseConstant.DEFAULT_TIME_TO_LIVE);
+        //JMS_PRIORITY default : 4
+        jmsMsg.setHeader(JmsBaseConstant.JMS_PRIORITY, 
javax.jms.Message.DEFAULT_PRIORITY);
+        //JMS_TYPE default : open notification service
+        jmsMsg.setHeader(JmsBaseConstant.JMS_TYPE, 
JmsBaseConstant.DEFAULT_JMS_TYPE);
+        //JMS_REPLY_TO,JMS_CORRELATION_ID default : null
+        //JMS_MESSAGE_ID is set by sendResult.
+        //JMS_REDELIVERED is set by broker.
+    }
+
+    /**
+     * Init the OnsMessage Headers.
+     * <p/>
+     * <P>When converting JmsMessage to OnsMessage, should read from the 
JmsMessage's Properties and write to the
+     * OnsMessage's Properties.
+     *
+     * @param jmsMsg message
+     * @throws javax.jms.JMSException
+     */
+    public static Properties initRocketMQHeaders(JmsBaseMessage jmsMsg,
+        String topic, String messageType) throws JMSException {
+        Properties userProperties = new Properties();
+
+        //Jms userProperties to properties
+        Map<String, Object> userProps = jmsMsg.getProperties();
+        Iterator<Map.Entry<String, Object>> userPropsIter = 
userProps.entrySet().iterator();
+        while (userPropsIter.hasNext()) {
+            Map.Entry<String, Object> entry = userPropsIter.next();
+            userProperties.setProperty(entry.getKey(), 
entry.getValue().toString());
+        }
+        //Jms systemProperties to ROCKETMQ properties
+        Map<String, Object> sysProps = jmsMsg.getHeaders();
+        Iterator<Map.Entry<String, Object>> sysPropsIter = 
sysProps.entrySet().iterator();
+        while (sysPropsIter.hasNext()) {
+            Map.Entry<String, Object> entry = sysPropsIter.next();
+            userProperties.setProperty(entry.getKey(), 
entry.getValue().toString());
+        }
+
+        //Jms message Model
+        if (jmsMsg instanceof JmsBytesMessage) {
+            userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, 
MsgConvertUtil.MSGMODEL_BYTES);
+        }
+        else if (jmsMsg instanceof JmsObjectMessage) {
+            userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, 
MsgConvertUtil.MSGMODEL_OBJ);
+        }
+        else if (jmsMsg instanceof JmsTextMessage) {
+            userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, 
MsgConvertUtil.MSGMODEL_TEXT);
+        }
+
+        //message topic and tag
+        userProperties.setProperty(MsgConvertUtil.MSG_TOPIC, topic);
+        userProperties.setProperty(MsgConvertUtil.MSG_TYPE, messageType);
+
+        return userProperties;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java
 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java
new file mode 100644
index 0000000..5bf7005
--- /dev/null
+++ 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import org.apache.rocketmq.jms.domain.message.JmsBytesMessage;
+import org.apache.rocketmq.jms.domain.message.JmsObjectMessage;
+import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+
+public class JmsBaseSession implements Session {
+    protected CommonContext context;
+    protected JmsBaseConnection connection;
+    protected CopyOnWriteArrayList<JmsBaseMessageConsumer> consumerList =
+        new CopyOnWriteArrayList<JmsBaseMessageConsumer>();
+    private boolean transacted = true;
+    private int acknowledgeMode = AUTO_ACKNOWLEDGE;
+
+    public JmsBaseSession(JmsBaseConnection connection, boolean transacted,
+        int acknowledgeMode, CommonContext context) {
+        this.context = context;
+        this.acknowledgeMode = acknowledgeMode;
+        this.transacted = transacted;
+        this.connection = connection;
+    }
+
+    @Override
+    public BytesMessage createBytesMessage() throws JMSException {
+        return new JmsBytesMessage();
+    }
+
+    @Override
+    public MapMessage createMapMessage() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public Message createMessage() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage() throws JMSException {
+        return new JmsObjectMessage();
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage(Serializable object) throws 
JMSException {
+        return new JmsObjectMessage(object);
+    }
+
+    @Override
+    public StreamMessage createStreamMessage() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public TextMessage createTextMessage() throws JMSException {
+        return new JmsTextMessage();
+    }
+
+    @Override
+    public TextMessage createTextMessage(String text) throws JMSException {
+        return new JmsTextMessage(text);
+    }
+
+    @Override
+    public boolean getTransacted() throws JMSException {
+        return this.transacted;
+    }
+
+    @Override
+    public int getAcknowledgeMode() {
+        return this.acknowledgeMode;
+    }
+
+    @Override
+    public void commit() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public void rollback() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public void close() throws JMSException {
+        for (JmsBaseMessageConsumer messageConsumer : consumerList) {
+            messageConsumer.close();
+        }
+        consumerList.clear();
+    }
+
+    @Override
+    public void recover() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public MessageListener getMessageListener() throws JMSException {
+        return null;
+    }
+
+    @Override
+    public void setMessageListener(MessageListener listener) throws 
JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public void run() {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public MessageProducer createProducer(Destination destination) throws 
JMSException {
+        return new JmsBaseMessageProducer(destination, context);
+    }
+
+    /**
+     * Create a MessageConsumer.
+     * <p/>
+     * <P>Create a durable consumer to the specified destination
+     *
+     * @param destination Equals to Topic:MessageType in ROCKETMQ
+     * @throws javax.jms.JMSException
+     * @see <CODE>Destination</CODE>
+     */
+    @Override
+    public MessageConsumer createConsumer(Destination destination) throws 
JMSException {
+        JmsBaseMessageConsumer messageConsumer = new
+            JmsBaseMessageConsumer(destination, this.context, this.connection);
+        this.consumerList.addIfAbsent(messageConsumer);
+        return messageConsumer;
+    }
+
+    /**
+     * Create a MessageConsumer with messageSelector.
+     * <p/>
+     * <P>ROCKETMQ-JMS do not support using messageSelector to filter messages
+     *
+     * @param destination Equals to Topic:MessageType in ROCKETMQ
+     * @param messageSelector For filtering messages
+     * @throws javax.jms.JMSException
+     * @see <CODE>Destination</CODE>
+     */
+    @Override
+    public MessageConsumer createConsumer(Destination destination, String 
messageSelector)
+        throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+
+    }
+
+    /**
+     * Create a MessageConsumer with messageSelector.
+     * <p/>
+     * <P>ROCKETMQ-JMS do not support using messageSelector to filter messages 
and do not support this mechanism to reject
+     * messages from localhost.
+     *
+     * @param destination Equals to Topic:MessageType in ROCKETMQ
+     * @param messageSelector For filtering messages
+     * @param noLocal If true: reject messages from localhost
+     * @throws javax.jms.JMSException
+     * @see <CODE>Destination</CODE>
+     */
+    @Override
+    public MessageConsumer createConsumer(Destination destination, String 
messageSelector,
+        boolean noLocal) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public Queue createQueue(String queueName) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public Topic createTopic(String topicName) throws JMSException {
+        Preconditions.checkNotNull(topicName);
+        List<String> msgTuple = Arrays.asList(topicName.split(":"));
+
+        Preconditions.checkState(msgTuple.size() >= 1 && msgTuple.size() <= 2,
+            "Destination must match messageTopic:messageType !");
+
+        //If messageType is null, use * instead.
+        if (1 == msgTuple.size()) {
+            return new JmsBaseTopic(msgTuple.get(0), "*");
+        }
+        return new JmsBaseTopic(msgTuple.get(0), msgTuple.get(1));
+    }
+
+    /**
+     * Create a MessageConsumer with durable subscription.
+     * <p/>
+     * <P>When using <CODE>createConsumer(Destination)</CODE> method, one 
creates a MessageConsumer with a durable
+     * subscription. So use <CODE>createConsumer(Destination)</CODE> instead 
of these method.
+     *
+     * @param topic destination
+     * @throws javax.jms.JMSException
+     * @see <CODE>Topic</CODE>
+     */
+    @Override
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name)
+        throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    /**
+     * Create a MessageConsumer with durable subscription.
+     * <p/>
+     * <P>When using <CODE>createConsumer(Destination)</CODE> method, one 
creates a MessageConsumer with a durable
+     * subscription. So use <CODE>createConsumer(Destination)</CODE> instead 
of these method.
+     *
+     * @param topic destination
+     * @throws javax.jms.JMSException
+     * @see <CODE>Topic</CODE>
+     */
+    @Override
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name,
+        String messageSelector,
+        boolean noLocal) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public QueueBrowser createBrowser(Queue queue) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public QueueBrowser createBrowser(Queue queue, String messageSelector) 
throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public TemporaryQueue createTemporaryQueue() throws JMSException {
+        return new TemporaryQueue() {
+            public void delete() throws JMSException {
+            }
+
+            public String getQueueName() throws JMSException {
+                return UUID.randomUUID().toString();
+            }
+        };
+    }
+
+    @Override
+    public TemporaryTopic createTemporaryTopic() throws JMSException {
+        return new TemporaryTopic() {
+            public void delete() throws JMSException {
+            }
+
+            public String getTopicName() throws JMSException {
+                return UUID.randomUUID().toString();
+            }
+        };
+    }
+
+    @Override
+    public void unsubscribe(String name) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    public void start() throws JMSException {
+        for (JmsBaseMessageConsumer messageConsumer : consumerList) {
+            messageConsumer.startConsumer();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java
 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java
new file mode 100644
index 0000000..b7e2fab
--- /dev/null
+++ 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import javax.jms.JMSException;
+import javax.jms.Topic;
+
+public class JmsBaseTopic implements Topic {
+
+    private String messageTopic;
+    private String messageType;
+
+    public JmsBaseTopic(String messageTopic, String messageType) {
+        Preconditions.checkNotNull(messageTopic);
+        Preconditions.checkNotNull(messageType);
+
+        this.messageTopic = messageTopic;
+        this.messageType = messageType;
+    }
+
+    public String getTopicName() throws JMSException {
+        return this.toString();
+    }
+
+    public String toString() {
+        return Joiner.on(":").join(this.getMessageTopic(), 
this.getMessageType());
+    }
+
+    public String getMessageTopic() {
+        return messageTopic;
+    }
+
+    public String getMessageType() {
+        return messageType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java
 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java
new file mode 100644
index 0000000..7a8a9f7
--- /dev/null
+++ 
b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.jms.util.MessageConverter;
+
+public class RMQPushConsumerExt {
+    private final MQPushConsumer consumer;
+    private final ConcurrentHashMap<String/* Topic */, 
javax.jms.MessageListener> subscribeTable = new ConcurrentHashMap<String, 
javax.jms.MessageListener>();
+
+    private AtomicInteger referenceCount = new AtomicInteger(0);
+    private AtomicBoolean started = new AtomicBoolean(false);
+
+    public RMQPushConsumerExt(MQPushConsumer consumer) {
+        this.consumer = consumer;
+    }
+
+    public MQPushConsumer getConsumer() {
+        return consumer;
+    }
+
+    public int incrementAndGet() {
+        return referenceCount.incrementAndGet();
+    }
+
+    public int decrementAndGet() {
+        return referenceCount.decrementAndGet();
+    }
+
+    public int getReferenceCount() {
+        return referenceCount.get();
+    }
+    public void start() throws MQClientException {
+        if (consumer == null) {
+            throw new MQClientException(-1, "consumer is null");
+        }
+
+        if (this.started.compareAndSet(false, true)) {
+            this.consumer.registerMessageListener(new MessageListenerImpl());
+            this.consumer.start();
+        }
+    }
+
+
+    public void close() {
+        if (this.started.compareAndSet(true, false)) {
+            this.consumer.shutdown();
+        }
+    }
+
+    public void subscribe(String topic, String subExpression, 
javax.jms.MessageListener listener) throws MQClientException {
+        if (null == topic) {
+            throw new MQClientException(-1, "topic is null");
+        }
+
+        if (null == listener) {
+            throw new MQClientException(-1, "listener is null");
+        }
+
+        try {
+            this.subscribeTable.put(topic, listener);
+            this.consumer.subscribe(topic, subExpression);
+        } catch (MQClientException e) {
+            throw new MQClientException("consumer subscribe exception", e);
+        }
+    }
+
+    public void unsubscribe(String topic) {
+        if (null != topic) {
+            this.consumer.unsubscribe(topic);
+        }
+    }
+
+    class MessageListenerImpl implements MessageListenerConcurrently {
+
+        @Override
+        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgsRMQList, ConsumeConcurrentlyContext contextRMQ) {
+            MessageExt msgRMQ = msgsRMQList.get(0);
+            javax.jms.MessageListener listener = 
RMQPushConsumerExt.this.subscribeTable.get(msgRMQ.getTopic());
+            if (null == listener) {
+                throw new RuntimeException("MessageListener is null");
+            }
+
+            try {
+                
listener.onMessage(MessageConverter.convert2JMSMessage(msgRMQ));
+            }
+            catch (Exception e) {
+                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+            }
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        }
+    }
+
+
+    public boolean isStarted() {
+        return started.get();
+    }
+
+
+    public boolean isClosed() {
+        return !isStarted();
+    }
+}

Reply via email to