Migrate rocketmq-jms to here.

Project: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/fab94406
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/fab94406
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/fab94406

Branch: refs/heads/master
Commit: fab9440608cb3635c7fabf9bc9005462ba3b920a
Parents: a85d9c7
Author: yukon <[email protected]>
Authored: Wed Mar 15 15:52:45 2017 +0800
Committer: yukon <[email protected]>
Committed: Wed Mar 15 16:12:41 2017 +0800

----------------------------------------------------------------------
 .gitignore                                      |   5 -
 .travis.yml                                     |  43 --
 README.md                                       |  31 --
 core/pom.xml                                    |  48 --
 .../rocketmq/jms/domain/CommonConstant.java     |  36 --
 .../rocketmq/jms/domain/CommonContext.java      | 183 --------
 .../rocketmq/jms/domain/JmsBaseConnection.java  | 172 --------
 .../jms/domain/JmsBaseConnectionFactory.java    | 146 -------
 .../jms/domain/JmsBaseConnectionMetaData.java   | 134 ------
 .../rocketmq/jms/domain/JmsBaseConstant.java    |  86 ----
 .../jms/domain/JmsBaseMessageConsumer.java      | 168 -------
 .../jms/domain/JmsBaseMessageProducer.java      | 281 ------------
 .../rocketmq/jms/domain/JmsBaseSession.java     | 308 -------------
 .../rocketmq/jms/domain/JmsBaseTopic.java       |  53 ---
 .../rocketmq/jms/domain/RMQPushConsumerExt.java | 128 ------
 .../jms/domain/message/JmsBaseMessage.java      | 434 -------------------
 .../jms/domain/message/JmsBytesMessage.java     | 245 -----------
 .../jms/domain/message/JmsObjectMessage.java    |  41 --
 .../jms/domain/message/JmsTextMessage.java      |  48 --
 .../apache/rocketmq/jms/util/ExceptionUtil.java |  41 --
 .../rocketmq/jms/util/MessageConverter.java     | 182 --------
 .../rocketmq/jms/util/MsgConvertUtil.java       |  90 ----
 .../apache/rocketmq/jms/util/URISpecParser.java |  61 ---
 core/src/main/resources/application.conf        |   1 -
 .../apache/rocketmq/jms/JmsTestListener.java    |  67 ---
 .../org/apache/rocketmq/jms/JmsTestUtil.java    |  54 ---
 .../jms/domain/message/JmsBytesMessageTest.java | 103 -----
 .../domain/message/JmsMessageConvertTest.java   |  52 ---
 .../domain/message/JmsObjectMessageTest.java    |  92 ----
 .../jms/domain/message/JmsTextMessageTest.java  |  50 ---
 .../jms/integration/IntegrationTestBase.java    | 199 ---------
 .../rocketmq/jms/integration/JmsClientIT.java   | 191 --------
 .../rocketmq/jms/integration/JmsConsumerIT.java | 131 ------
 .../rocketmq/jms/util/URISpecParserTest.java    |  43 --
 pom.xml                                         | 196 ---------
 rocketmq-jms/.gitignore                         |   5 +
 rocketmq-jms/.travis.yml                        |  43 ++
 rocketmq-jms/README.md                          |  31 ++
 rocketmq-jms/core/pom.xml                       |  48 ++
 .../rocketmq/jms/domain/CommonConstant.java     |  36 ++
 .../rocketmq/jms/domain/CommonContext.java      | 183 ++++++++
 .../rocketmq/jms/domain/JmsBaseConnection.java  | 172 ++++++++
 .../jms/domain/JmsBaseConnectionFactory.java    | 146 +++++++
 .../jms/domain/JmsBaseConnectionMetaData.java   | 134 ++++++
 .../rocketmq/jms/domain/JmsBaseConstant.java    |  86 ++++
 .../jms/domain/JmsBaseMessageConsumer.java      | 168 +++++++
 .../jms/domain/JmsBaseMessageProducer.java      | 281 ++++++++++++
 .../rocketmq/jms/domain/JmsBaseSession.java     | 308 +++++++++++++
 .../rocketmq/jms/domain/JmsBaseTopic.java       |  53 +++
 .../rocketmq/jms/domain/RMQPushConsumerExt.java | 128 ++++++
 .../jms/domain/message/JmsBaseMessage.java      | 434 +++++++++++++++++++
 .../jms/domain/message/JmsBytesMessage.java     | 245 +++++++++++
 .../jms/domain/message/JmsObjectMessage.java    |  41 ++
 .../jms/domain/message/JmsTextMessage.java      |  48 ++
 .../apache/rocketmq/jms/util/ExceptionUtil.java |  41 ++
 .../rocketmq/jms/util/MessageConverter.java     | 182 ++++++++
 .../rocketmq/jms/util/MsgConvertUtil.java       |  90 ++++
 .../apache/rocketmq/jms/util/URISpecParser.java |  61 +++
 .../core/src/main/resources/application.conf    |   1 +
 .../apache/rocketmq/jms/JmsTestListener.java    |  67 +++
 .../org/apache/rocketmq/jms/JmsTestUtil.java    |  54 +++
 .../jms/domain/message/JmsBytesMessageTest.java | 103 +++++
 .../domain/message/JmsMessageConvertTest.java   |  52 +++
 .../domain/message/JmsObjectMessageTest.java    |  92 ++++
 .../jms/domain/message/JmsTextMessageTest.java  |  50 +++
 .../jms/integration/IntegrationTestBase.java    | 199 +++++++++
 .../rocketmq/jms/integration/JmsClientIT.java   | 191 ++++++++
 .../rocketmq/jms/integration/JmsConsumerIT.java | 131 ++++++
 .../rocketmq/jms/util/URISpecParserTest.java    |  43 ++
 rocketmq-jms/pom.xml                            | 196 +++++++++
 rocketmq-jms/spring/pom.xml                     |  82 ++++
 .../SimpleExMessageListenerContainer.java       |  90 ++++
 .../rocketmq/jms/spring/JmsConsumeIT.java       |  61 +++
 .../rocketmq/jms/spring/JmsProduceIT.java       |  93 ++++
 .../rocketmq/jms/spring/SpringTestBase.java     |  41 ++
 .../spring/src/test/resources/consumer.xml      |  51 +++
 .../spring/src/test/resources/producer.xml      |  43 ++
 rocketmq-jms/style/copyright/Apache.xml         |  24 +
 .../style/copyright/profiles_settings.xml       |  64 +++
 rocketmq-jms/style/rmq_checkstyle.xml           | 135 ++++++
 rocketmq-jms/style/rmq_codeStyle.xml            | 157 +++++++
 spring/pom.xml                                  |  82 ----
 .../SimpleExMessageListenerContainer.java       |  90 ----
 .../rocketmq/jms/spring/JmsConsumeIT.java       |  61 ---
 .../rocketmq/jms/spring/JmsProduceIT.java       |  93 ----
 .../rocketmq/jms/spring/SpringTestBase.java     |  41 --
 spring/src/test/resources/consumer.xml          |  51 ---
 spring/src/test/resources/producer.xml          |  43 --
 style/copyright/Apache.xml                      |  24 -
 style/copyright/profiles_settings.xml           |  64 ---
 style/rmq_checkstyle.xml                        | 135 ------
 style/rmq_codeStyle.xml                         | 157 -------
 92 files changed, 4984 insertions(+), 4984 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
deleted file mode 100644
index d2e5aaf..0000000
--- a/.gitignore
+++ /dev/null
@@ -1,5 +0,0 @@
-.idea/
-*.iml
-*.ipr
-*.iws
-target/

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
deleted file mode 100644
index 9f430b2..0000000
--- a/.travis.yml
+++ /dev/null
@@ -1,43 +0,0 @@
-notifications:
-  email:
-    recipients:
-      - [email protected]
-      - [email protected]
-  on_success: change
-  on_failure: always
-
-language: java
-
-matrix:
-  include:
-  # On OSX, run with default JDK only.
-  # - os: osx
-  # On Linux, run with specific JDKs only.
-  # - os: linux
-  #  env: CUSTOM_JDK="oraclejdk8"
-  - os: linux
-    env: CUSTOM_JDK="oraclejdk7"
-  #- os: linux
-  #  env: CUSTOM_JDK="openjdk7"
-
-before_install:
-  - echo 'MAVEN_OPTS="$MAVEN_OPTS -Xmx1024m -XX:MaxPermSize=512m 
-XX:+BytecodeVerificationLocal"' >> ~/.mavenrc
-  - cat ~/.mavenrc
-  - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export 
JAVA_HOME=$(/usr/libexec/java_home); fi
-  - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; 
fi
-
-#os:
-#  - linux
-#  - osx
-#jdk:
-#  - oraclejdk8
-#  - oraclejdk7
-#  - openjdk7
-
-
-script:
-  - travis_retry mvn -B clean install jacoco:report coveralls:report
-
-#after_success:
-#  - mvn clean install
-#  - mvn sonar:sonar

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
deleted file mode 100644
index a05e27e..0000000
--- a/README.md
+++ /dev/null
@@ -1,31 +0,0 @@
-# RocketMQ-JMS   [![Build 
Status](https://travis-ci.org/rocketmq/rocketmq-jms.svg?branch=master)](https://travis-ci.org/rocketmq/rocketmq-jms)
 [![Coverage 
Status](https://coveralls.io/repos/github/rocketmq/rocketmq-jms/badge.svg?branch=master)](https://coveralls.io/github/rocketmq/rocketmq-jms?branch=master)
-
-
-## Introduction
-RocketMQ-JMS is an implement of JMS specification,taking Apache RocketMQ as 
broker.
-Now we are on the way of supporting JMS 1.1 and JMS2.0 is our final target.   
-
-Now RocketMQ-JMS will release the first version soon, and new features will be 
developed on the branch "v1.1".
-Please visit the [issue 
board](https://github.com/rocketmq/rocketmq-jms/issues) to see features in next 
version. 
-
-
-## Building
-
-  > cd rocketmq-jms  
-  > mvn clean install  
-  
-  **run unit test:**  
-  > mvn test    
-  
-  **run integration test:**  
-  > mvn verify
-  
-  **see jacoco code coverage report**
-  > open core/target/site/jacoco/index.html  
-  > open core/target/site/jacoco-it/index.html  
-  > open spring/target/site/jacoco-it/index.html 
-  
-  
-## Guidelines
-
- Please see [Coding Guidelines 
Introduction](http://rocketmq.apache.org/docs/code-guidelines/)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
deleted file mode 100644
index 1b36e14..0000000
--- a/core/pom.xml
+++ /dev/null
@@ -1,48 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~  Unless required by applicable law or agreed to in writing, software
-  ~  distributed under the License is distributed on an "AS IS" BASIS,
-  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~  See the License for the specific language governing permissions and
-  ~  limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <parent>
-        <artifactId>rocketmq-jms-all</artifactId>
-        <groupId>org.apache.rocketmq</groupId>
-        <version>1.0-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>rocketmq-jms</artifactId>
-    <version>1.0-SNAPSHOT</version>
-
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-jar-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>test-jar</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java 
b/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java
deleted file mode 100644
index 80a8b64..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.domain;
-
-public interface CommonConstant {
-
-    String PRODUCERID = "producerId";
-
-    String CONSUMERID = "consumerId";
-
-    String PROVIDER = "provider";
-
-    String NAMESERVER = "nameServer";
-
-    String INSTANCE_NAME = "instanceName";
-
-    String CONSUME_THREAD_NUMS = "consumeThreadNums";
-
-    String SEND_TIMEOUT_MILLIS = "sendMsgTimeoutMillis";
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java 
b/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java
deleted file mode 100644
index c8e4276..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.domain;
-
-import org.apache.commons.lang.builder.ReflectionToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-
-public class CommonContext {
-    private String accessKey;
-    private String secretKey;
-
-    private String consumerId;
-    private String producerId;
-
-    private String provider;
-
-    private String appId;
-
-    private String nameServer;
-
-    /**
-     * MQType
-     */
-    private String mqType;
-
-    /**
-     * Using for distinguishing client jvm process
-     */
-    private String instanceName;
-    /**
-     * Set consumer threadPool Size
-     */
-    private int consumeThreadNums;
-    /**
-     * Set send message timeOut
-     */
-    private int sendMsgTimeoutMillis = -1;
-
-    /**
-     * @return the appId
-     */
-    public String getAppId() {
-        return appId;
-    }
-
-    /**
-     * @param appId the appId to set
-     */
-    public void setAppId(String appId) {
-        this.appId = appId;
-    }
-
-    /**
-     * @return the provider
-     */
-    public String getProvider() {
-        return provider;
-    }
-
-    /**
-     * @param provider the provider to set
-     */
-    public void setProvider(String provider) {
-        this.provider = provider;
-    }
-
-    /**
-     * @return the instanceName
-     */
-    public String getInstanceName() {
-        return instanceName;
-    }
-
-    /**
-     * @param instanceName the instanceName to set
-     */
-    public void setInstanceName(String instanceName) {
-        this.instanceName = instanceName;
-    }
-
-    /**
-     * @return the accessKey
-     */
-    public String getAccessKey() {
-        return accessKey;
-    }
-
-    /**
-     * @param accessKey the accessKey to set
-     */
-    public void setAccessKey(String accessKey) {
-        this.accessKey = accessKey;
-    }
-
-    /**
-     * @return the secretKey
-     */
-    public String getSecretKey() {
-        return secretKey;
-    }
-
-    /**
-     * @param secretKey the secretKey to set
-     */
-    public void setSecretKey(String secretKey) {
-        this.secretKey = secretKey;
-    }
-
-    /**
-     * @return consumer thread nums
-     */
-    public int getConsumeThreadNums() {
-        return consumeThreadNums;
-    }
-
-    /**
-     * @param consumeThreadNums
-     */
-    public void setConsumeThreadNums(int consumeThreadNums) {
-        this.consumeThreadNums = consumeThreadNums;
-    }
-
-    public String getConsumerId() {
-        return consumerId;
-    }
-
-    public void setConsumerId(String consumerId) {
-        this.consumerId = consumerId;
-    }
-
-    public String getProducerId() {
-        return producerId;
-    }
-
-    public void setProducerId(String producerId) {
-        this.producerId = producerId;
-    }
-
-    public int getSendMsgTimeoutMillis() {
-        return sendMsgTimeoutMillis;
-    }
-
-    public void setSendMsgTimeoutMillis(int sendMsgTimeoutMillis) {
-        this.sendMsgTimeoutMillis = sendMsgTimeoutMillis;
-    }
-
-    public String getMqType() {
-        return mqType;
-    }
-
-    public void setMqType(String mqType) {
-        this.mqType = mqType;
-    }
-
-    public String getNameServer() {
-        return nameServer;
-    }
-
-    public void setNameServer(String nameServer) {
-        this.nameServer = nameServer;
-    }
-
-    @Override
-    public String toString() {
-        return ReflectionToStringBuilder.toString(this, 
ToStringStyle.DEFAULT_STYLE);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java 
b/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java
deleted file mode 100644
index 4c809c7..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.domain;
-
-import com.google.common.base.Preconditions;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jms.Connection;
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.Topic;
-import org.apache.commons.lang.StringUtils;
-
-public class JmsBaseConnection implements Connection {
-    private final AtomicBoolean started = new AtomicBoolean(false);
-    protected String clientID;
-    protected ExceptionListener exceptionListener;
-    protected CommonContext context;
-    protected JmsBaseSession session;
-
-    public JmsBaseConnection(Map<String, String> connectionParams) {
-
-        this.clientID = UUID.randomUUID().toString();
-
-        context = new CommonContext();
-
-        //At lease one should be set
-        context.setProducerId(connectionParams.get(CommonConstant.PRODUCERID));
-        context.setConsumerId(connectionParams.get(CommonConstant.CONSUMERID));
-
-        //optional
-        context.setProvider(connectionParams.get(CommonConstant.PROVIDER));
-
-        String nameServer = connectionParams.get(CommonConstant.NAMESERVER);
-        String consumerThreadNums = 
connectionParams.get(CommonConstant.CONSUME_THREAD_NUMS);
-        String sendMsgTimeoutMillis = 
connectionParams.get(CommonConstant.SEND_TIMEOUT_MILLIS);
-        String instanceName = 
connectionParams.get(CommonConstant.INSTANCE_NAME);
-
-        if (StringUtils.isNotEmpty(nameServer)) {
-            context.setNameServer(nameServer);
-        }
-        if (StringUtils.isNotEmpty(instanceName)) {
-            
context.setInstanceName(connectionParams.get(CommonConstant.INSTANCE_NAME));
-        }
-
-        if (StringUtils.isNotEmpty(consumerThreadNums)) {
-            context.setConsumeThreadNums(Integer.parseInt(consumerThreadNums));
-        }
-        if (StringUtils.isNotEmpty(sendMsgTimeoutMillis)) {
-            
context.setSendMsgTimeoutMillis(Integer.parseInt(sendMsgTimeoutMillis));
-        }
-    }
-
-    @Override
-    public Session createSession(boolean transacted, int acknowledgeMode) 
throws JMSException {
-
-        Preconditions.checkArgument(!transacted, "Not support transaction 
Session !");
-        Preconditions.checkArgument(Session.AUTO_ACKNOWLEDGE == 
acknowledgeMode,
-            "Not support this acknowledge mode: " + acknowledgeMode);
-
-        if (null != this.session) {
-            return this.session;
-        }
-        synchronized (this) {
-            if (null != this.session) {
-                return this.session;
-            }
-            this.session = new JmsBaseSession(this, transacted, 
acknowledgeMode, context);
-            if (isStarted()) {
-                this.session.start();
-            }
-            return this.session;
-        }
-    }
-
-    @Override
-    public String getClientID() throws JMSException {
-        return this.clientID;
-    }
-
-    @Override
-    public void setClientID(String clientID) throws JMSException {
-        this.clientID = clientID;
-    }
-
-    @Override
-    public ConnectionMetaData getMetaData() throws JMSException {
-        return new JmsBaseConnectionMetaData();
-    }
-
-    @Override
-    public ExceptionListener getExceptionListener() throws JMSException {
-        return this.exceptionListener;
-    }
-
-    @Override
-    public void setExceptionListener(ExceptionListener listener) throws 
JMSException {
-        this.exceptionListener = listener;
-    }
-
-    @Override
-    public void start() throws JMSException {
-        if (started.compareAndSet(false, true)) {
-            if (this.session != null) {
-                this.session.start();
-            }
-
-        }
-    }
-
-    @Override
-    public void stop() throws JMSException {
-        //Stop the connection before closing it.
-        //Do nothing here.
-    }
-
-    @Override
-    public void close() throws JMSException {
-        if (started.compareAndSet(true, false)) {
-            if (this.session != null) {
-                this.session.close();
-            }
-
-        }
-    }
-
-    @Override
-    public ConnectionConsumer createConnectionConsumer(Destination destination,
-        String messageSelector,
-        ServerSessionPool sessionPool,
-        int maxMessages) throws JMSException {
-        throw new UnsupportedOperationException("Unsupported");
-    }
-
-    @Override
-    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, 
String subscriptionName,
-        String messageSelector,
-        ServerSessionPool sessionPool,
-        int maxMessages) throws JMSException {
-        throw new UnsupportedOperationException("Unsupported");
-    }
-
-    /**
-     * Whether the connection is started.
-     *
-     * @return whether the connection is started.
-     */
-    public boolean isStarted() {
-        return started.get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java
 
b/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java
deleted file mode 100644
index 1b9da06..0000000
--- 
a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.domain;
-
-import com.google.common.base.Preconditions;
-import java.net.URI;
-import java.util.Map;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import org.apache.rocketmq.jms.util.URISpecParser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JmsBaseConnectionFactory implements ConnectionFactory {
-
-    private static Logger logger = LoggerFactory
-        .getLogger(JmsBaseConnectionFactory.class);
-    /**
-     * Synchronization monitor for the shared Connection
-     */
-    private final Object connectionMonitor = new Object();
-    /**
-     * Can be configured in a consistent way without too much URL hacking.
-     */
-    protected URI connectionUri;
-    /**
-     * Store connection uri query parameters.
-     */
-    protected Map<String, String> connectionParams;
-    /**
-     * Wrapped Connection
-     */
-    protected JmsBaseConnection connection;
-
-    public JmsBaseConnectionFactory() {
-
-    }
-
-    public JmsBaseConnectionFactory(URI connectionUri) {
-        setConnectionUri(connectionUri);
-    }
-
-    public void setConnectionUri(URI connectionUri) {
-        Preconditions.checkNotNull(connectionUri, "Please set URI !");
-        this.connectionUri = connectionUri;
-        this.connectionParams = 
URISpecParser.parseURI(connectionUri.toString());
-
-        if (null != connectionParams) {
-            Preconditions.checkState(null != 
connectionParams.get(CommonConstant.CONSUMERID) ||
-                null != connectionParams.get(CommonConstant.PRODUCERID), 
"Please set consumerId or ProducerId !");
-        }
-
-    }
-
-    @Override
-    public Connection createConnection() throws JMSException {
-        synchronized (this.connectionMonitor) {
-            if (this.connection == null) {
-                initConnection();
-            }
-            return this.connection;
-        }
-    }
-
-    /**
-     * Using userName and Password to create a connection
-     *
-     * @param userName ignored
-     * @param password ignored
-     * @return the new JMS Connection
-     * @throws JMSException
-     */
-    @Override
-    public Connection createConnection(String userName, String password) 
throws JMSException {
-        logger.debug("Using userName and Password to create a connection.");
-        return this.createConnection();
-    }
-
-    /**
-     * Initialize the underlying shared Connection.
-     * <p/>
-     * Closes and reInitializes the Connection if an underlying Connection is 
present already.
-     *
-     * @throws javax.jms.JMSException if thrown by JMS API methods
-     */
-    protected void initConnection() throws JMSException {
-        synchronized (this.connectionMonitor) {
-            if (this.connection != null) {
-                closeConnection(this.connection);
-            }
-            this.connection = doCreateConnection();
-            logger.debug("Established shared JMS Connection: {}", 
this.connection);
-        }
-    }
-
-    /**
-     * Close the given Connection.
-     *
-     * @param con the Connection to close
-     */
-    protected void closeConnection(Connection con) {
-        logger.debug("Closing shared JMS Connection: {}", this.connection);
-        try {
-            try {
-                con.stop();
-            }
-            finally {
-                con.close();
-            }
-        }
-        catch (Throwable ex) {
-            logger.error("Could not close shared JMS Connection.", ex);
-        }
-    }
-
-    /**
-     * Create a JMS Connection
-     *
-     * @return the new JMS Connection
-     * @throws javax.jms.JMSException if thrown by JMS API methods
-     */
-    protected JmsBaseConnection doCreateConnection() throws JMSException {
-        Preconditions.checkState(null != this.connectionParams && 
this.connectionParams.size() > 0,
-            "Connection Parameters can not be null!");
-        this.connection = new JmsBaseConnection(this.connectionParams);
-
-        return connection;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java
 
b/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java
deleted file mode 100644
index ee549aa..0000000
--- 
a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.domain;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Enumeration;
-import java.util.Properties;
-import java.util.Vector;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import javax.jms.ConnectionMetaData;
-import javax.jms.JMSException;
-
-public class JmsBaseConnectionMetaData implements ConnectionMetaData {
-    public static final String JMS_VERSION;
-    public static final int JMS_MAJOR_VERSION;
-    public static final int JMS_MINOR_VERSION;
-
-    public static final String PROVIDER_VERSION;
-    public static final int PROVIDER_MAJOR_VERSION;
-    public static final int PROVIDER_MINOR_VERSION;
-
-    public static final String PROVIDER_NAME = "Apache RocketMQ";
-
-    public static final JmsBaseConnectionMetaData INSTANCE = new 
JmsBaseConnectionMetaData();
-
-    public static InputStream resourceStream;
-
-    static {
-        Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*");
-
-        String jmsVersion = null;
-        int jmsMajor = 0;
-        int jmsMinor = 0;
-        try {
-            Package p = Package.getPackage("javax.jms");
-            if (p != null) {
-                jmsVersion = p.getImplementationVersion();
-                Matcher m = pattern.matcher(jmsVersion);
-                if (m.matches()) {
-                    jmsMajor = Integer.parseInt(m.group(1));
-                    jmsMinor = Integer.parseInt(m.group(2));
-                }
-            }
-        }
-        catch (Throwable e) {
-        }
-        JMS_VERSION = jmsVersion;
-        JMS_MAJOR_VERSION = jmsMajor;
-        JMS_MINOR_VERSION = jmsMinor;
-
-        String providerVersion = null;
-        int providerMajor = 0;
-        int providerMinor = 0;
-        Properties properties = new Properties();
-        try {
-            resourceStream = 
JmsBaseConnectionMetaData.class.getResourceAsStream("/application.conf");
-            properties.load(resourceStream);
-            providerVersion = properties.getProperty("version");
-
-            Matcher m = pattern.matcher(providerVersion);
-            if (m.matches()) {
-                providerMajor = Integer.parseInt(m.group(1));
-                providerMinor = Integer.parseInt(m.group(2));
-            }
-        }
-        catch (IOException e) {
-            e.printStackTrace();
-        }
-        PROVIDER_VERSION = providerVersion;
-        PROVIDER_MAJOR_VERSION = providerMajor;
-        PROVIDER_MINOR_VERSION = providerMinor;
-
-    }
-
-    public String getJMSVersion() throws JMSException {
-        return JMS_VERSION;
-    }
-
-    public int getJMSMajorVersion() throws JMSException {
-        return JMS_MAJOR_VERSION;
-    }
-
-    public int getJMSMinorVersion() throws JMSException {
-        return JMS_MINOR_VERSION;
-    }
-
-    public String getJMSProviderName() throws JMSException {
-        return PROVIDER_NAME;
-    }
-
-    public String getProviderVersion() throws JMSException {
-        return PROVIDER_VERSION;
-    }
-
-    public int getProviderMajorVersion() throws JMSException {
-        return PROVIDER_MAJOR_VERSION;
-    }
-
-    public int getProviderMinorVersion() throws JMSException {
-        return PROVIDER_MINOR_VERSION;
-    }
-
-    public Enumeration<?> getJMSXPropertyNames() throws JMSException {
-        Vector<String> jmxProperties = new Vector<String>();
-        jmxProperties.add("jmsXUserId");
-        jmxProperties.add("jmsXAppId");
-        jmxProperties.add("jmsXGroupID");
-        jmxProperties.add("jmsXGroupSeq");
-        jmxProperties.add("jmsXState");
-        jmxProperties.add("jmsXDeliveryCount");
-        jmxProperties.add("jmsXProducerTXID");
-        jmxProperties.add("jmsConsumerTXID");
-        jmxProperties.add("jmsRecvTimeStamp");
-        return jmxProperties.elements();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java 
b/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java
deleted file mode 100644
index f0bca28..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.domain;
-
-public interface JmsBaseConstant {
-    //------------------------JMS message header 
constant---------------------------------
-    String JMS_DESTINATION = "jmsDestination";
-    String JMS_DELIVERY_MODE = "jmsDeliveryMode";
-    String JMS_EXPIRATION = "jmsExpiration";
-    String JMS_DELIVERY_TIME = "jmsDeliveryTime";
-    String JMS_PRIORITY = "jmsPriority";
-    String JMS_MESSAGE_ID = "jmsMessageID";
-    String JMS_TIMESTAMP = "jmsTimestamp";
-    String JMS_CORRELATION_ID = "jmsCorrelationID";
-    String JMS_REPLY_TO = "jmsReplyTo";
-    String JMS_TYPE = "jmsType";
-    String JMS_REDELIVERED = "jmsRedelivered";
-
-    //-------------------------JMS defined properties 
constant----------------------------
-    /**
-     * The identity of the user sending the Send message
-     */
-    String JMS_XUSER_ID = "jmsXUserID";
-    /**
-     * The identity of the application Send sending the message
-     */
-    String JMS_XAPP_ID = "jmsXAppID";
-    /**
-     * The number of message delivery Receive attempts
-     */
-    String JMS_XDELIVERY_COUNT = "jmsXDeliveryCount";
-    /**
-     * The identity of the message group this message is part of
-     */
-    String JMS_XGROUP_ID = "jmsXGroupID";
-    /**
-     * The sequence number of this message within the group; the first message 
is 1, the second 2,...
-     */
-    String JMS_XGROUP_SEQ = "jmsXGroupSeq";
-    /**
-     * The transaction identifier of the Send transaction within which this 
message was produced
-     */
-    String JMS_XPRODUCER_TXID = "jmsXProducerTXID";
-    /**
-     * The transaction identifier of the Receive transaction within which this 
message was consumed
-     */
-    String JMS_XCONSUMER_TXID = "jmsXConsumerTXID";
-
-    /**
-     * The time JMS delivered the Receive message to the consumer
-     */
-    String JMS_XRCV_TIMESTAMP = "jmsXRcvTimestamp";
-    /**
-     * Assume there exists a message warehouse that contains a separate copy 
of each message sent to each consumer and
-     * that these copies exist from the time the original message was sent. 
Each copy’s state is one of: 1(waiting),
-     * 2(ready), 3(expired) or 4(retained) Since state is of no interest to 
producers and consumers it is not provided
-     * to either. It is only of relevance to messages looked up in a warehouse 
and JMS provides no API for this.
-     */
-    String JMS_XSTATE = "jmsXState";
-
-    //---------------------------JMS Headers' value 
constant---------------------------
-    /**
-     * Default time to live
-     */
-    long DEFAULT_TIME_TO_LIVE = 3 * 24 * 60 * 60 * 1000;
-
-    /**
-     * Default Jms Type
-     */
-    String DEFAULT_JMS_TYPE = "rocketmq";
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java 
b/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java
deleted file mode 100644
index b62e928..0000000
--- 
a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.domain;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.MapMaker;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.jms.util.ExceptionUtil;
-
-public class JmsBaseMessageConsumer implements MessageConsumer {
-
-    private static final Object LOCK_OBJECT = new Object();
-    //all shared consumers
-    private static ConcurrentMap<String/**consumerId*/, RMQPushConsumerExt> 
consumerMap = new MapMaker().makeMap();
-    private final AtomicBoolean closed = new AtomicBoolean(false);
-    private CommonContext context;
-    private Destination destination;
-    private MessageListener messageListener;
-
-    public JmsBaseMessageConsumer(Destination destination, CommonContext 
commonContext,
-        JmsBaseConnection connection) throws JMSException {
-        synchronized (LOCK_OBJECT) {
-            checkArgs(destination, commonContext);
-
-            if (null == consumerMap.get(context.getConsumerId())) {
-                DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer(context.getConsumerId());
-                if (context.getConsumeThreadNums() > 0) {
-                    
consumer.setConsumeThreadMax(context.getConsumeThreadNums());
-                    
consumer.setConsumeThreadMin(context.getConsumeThreadNums());
-                }
-                if (!Strings.isNullOrEmpty(context.getNameServer())) {
-                    consumer.setNamesrvAddr(context.getNameServer());
-                }
-                if (!Strings.isNullOrEmpty(context.getInstanceName())) {
-                    consumer.setInstanceName(context.getInstanceName());
-                }
-                consumer.setConsumeMessageBatchMaxSize(1);
-                //add subscribe?
-                RMQPushConsumerExt rocketmqConsumerExt = new 
RMQPushConsumerExt(consumer);
-                consumerMap.putIfAbsent(context.getConsumerId(), 
rocketmqConsumerExt);
-            }
-
-            consumerMap.get(context.getConsumerId()).incrementAndGet();
-
-            //If the connection has been started, start the consumer right now.
-            //add start status?
-            RMQPushConsumerExt consumerExt = 
consumerMap.get(context.getConsumerId());
-            if (connection.isStarted()) {
-                try {
-                    consumerExt.start();
-                }
-                catch (MQClientException mqe) {
-                    JMSException jmsException = new JMSException("Start 
consumer failed " + context.getConsumerId());
-                    jmsException.initCause(mqe);
-                    throw jmsException;
-                }
-            }
-        }
-
-    }
-
-    private void checkArgs(Destination destination, CommonContext context) 
throws JMSException {
-        Preconditions.checkNotNull(context.getConsumerId(), "ConsumerId can 
not be null!");
-        Preconditions.checkNotNull(destination.toString(), "Destination can 
not be null!");
-        this.context = context;
-        this.destination = destination;
-    }
-
-    @Override
-    public String getMessageSelector() throws JMSException {
-        return null;
-    }
-
-    @Override
-    public MessageListener getMessageListener() throws JMSException {
-        return this.messageListener;
-    }
-
-    @Override
-    public void setMessageListener(MessageListener listener) throws 
JMSException {
-        RMQPushConsumerExt rocketmqConsumerExt = 
consumerMap.get(context.getConsumerId());
-        if (null != rocketmqConsumerExt) {
-            try {
-                this.messageListener = listener;
-                String messageTopic = ((JmsBaseTopic) 
destination).getMessageTopic();
-                String messageType = ((JmsBaseTopic) 
destination).getMessageType();
-                rocketmqConsumerExt.subscribe(messageTopic, messageType, 
listener);
-            }
-            catch (MQClientException mqe) {
-                //add what?
-                throw new JMSException(mqe.getMessage());
-            }
-
-        }
-
-    }
-
-    @Override
-    public Message receive() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported!");
-    }
-
-    @Override
-    public Message receive(long timeout) throws JMSException {
-        throw new UnsupportedOperationException("Unsupported!");
-    }
-
-    @Override
-    public Message receiveNoWait() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported!");
-    }
-
-    @Override
-    public void close() throws JMSException {
-        synchronized (LOCK_OBJECT) {
-            if (closed.compareAndSet(false, true)) {
-                RMQPushConsumerExt rocketmqConsumerExt = 
consumerMap.get(context.getConsumerId());
-                if (null != rocketmqConsumerExt && 0 == 
rocketmqConsumerExt.decrementAndGet()) {
-                    rocketmqConsumerExt.close();
-                    consumerMap.remove(context.getConsumerId());
-                }
-            }
-        }
-    }
-
-    /**
-     * Start the consumer to get message from the Broker.
-     */
-    public void startConsumer() throws JMSException {
-        RMQPushConsumerExt rocketmqConsumerExt = 
consumerMap.get(context.getConsumerId());
-        if (null != rocketmqConsumerExt) {
-            try {
-                rocketmqConsumerExt.start();
-            }
-            catch (MQClientException mqe) {
-                throw ExceptionUtil.convertToJmsException(mqe, "Start consumer 
failed");
-            }
-        }
-    }
-
-    public Destination getDestination() throws JMSException {
-        return this.destination;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java 
b/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java
deleted file mode 100644
index 8dd82f0..0000000
--- 
a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.domain;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.MapMaker;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentMap;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.MQProducer;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.jms.domain.message.JmsBaseMessage;
-import org.apache.rocketmq.jms.domain.message.JmsBytesMessage;
-import org.apache.rocketmq.jms.domain.message.JmsObjectMessage;
-import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
-import org.apache.rocketmq.jms.util.ExceptionUtil;
-import org.apache.rocketmq.jms.util.MessageConverter;
-import org.apache.rocketmq.jms.util.MsgConvertUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JmsBaseMessageProducer implements MessageProducer {
-
-    private static final Object LOCK_OBJECT = new Object();
-    private static ConcurrentMap<String, MQProducer> producerMap = new 
MapMaker().makeMap();
-    private final Logger logger = 
LoggerFactory.getLogger(JmsBaseMessageProducer.class);
-    private CommonContext context;
-
-    private Destination destination;
-
-    public JmsBaseMessageProducer(Destination destination, CommonContext 
context) throws JMSException {
-        synchronized (LOCK_OBJECT) {
-            checkArgs(destination, context);
-
-            if (null == producerMap.get(this.context.getProducerId())) {
-                DefaultMQProducer producer = new 
DefaultMQProducer(context.getProducerId());
-                if (!Strings.isNullOrEmpty(context.getNameServer())) {
-                    producer.setNamesrvAddr(context.getNameServer());
-                }
-                if (!Strings.isNullOrEmpty(context.getInstanceName())) {
-                    producer.setInstanceName(context.getInstanceName());
-                }
-                if (context.getSendMsgTimeoutMillis() > 0) {
-                    
producer.setSendMsgTimeout(context.getSendMsgTimeoutMillis());
-                }
-                try {
-                    producer.start();
-                }
-                catch (MQClientException mqe) {
-                    throw ExceptionUtil.convertToJmsException(mqe, 
String.format("Start producer failed:%s", context.getProducerId()));
-                }
-                producerMap.putIfAbsent(this.context.getProducerId(), 
producer);
-            }
-
-        }
-    }
-
-    private void checkArgs(Destination destination, CommonContext context) 
throws JMSException {
-        Preconditions.checkNotNull(context.getProducerId(), "ProducerId can 
not be null!");
-        Preconditions.checkNotNull(destination.toString(), "Destination can 
not be null!");
-        this.context = context;
-        this.destination = destination;
-    }
-
-    @Override
-    public boolean getDisableMessageID() throws JMSException {
-        return false;
-    }
-
-    @Override
-    public void setDisableMessageID(boolean value) throws JMSException {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    @Override
-    public boolean getDisableMessageTimestamp() throws JMSException {
-        return false;
-    }
-
-    @Override
-    public void setDisableMessageTimestamp(boolean value) throws JMSException {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    @Override
-    public int getDeliveryMode() throws JMSException {
-        return javax.jms.Message.DEFAULT_DELIVERY_MODE;
-    }
-
-    @Override
-    public void setDeliveryMode(int deliveryMode) throws JMSException {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    @Override
-    public int getPriority() throws JMSException {
-        return javax.jms.Message.DEFAULT_PRIORITY;
-    }
-
-    @Override
-    public void setPriority(int defaultPriority) throws JMSException {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    @Override
-    public long getTimeToLive() throws JMSException {
-        return JmsBaseConstant.DEFAULT_TIME_TO_LIVE;
-    }
-
-    @Override
-    public void setTimeToLive(long timeToLive) throws JMSException {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    @Override
-    public Destination getDestination() throws JMSException {
-        return this.destination;
-    }
-
-    @Override
-    public void close() throws JMSException {
-        //Nothing to do
-    }
-
-    @Override
-    public void send(javax.jms.Message message) throws JMSException {
-        this.send(getDestination(), message);
-    }
-
-    /**
-     * Send the message to the defined Destination success---return normally. 
Exception---throw out JMSException.
-     *
-     * @param destination see <CODE>Destination</CODE>
-     * @param message the message to be sent.
-     * @throws javax.jms.JMSException
-     */
-    @Override
-    public void send(Destination destination, javax.jms.Message message) 
throws JMSException {
-        JmsBaseMessage jmsMsg = (JmsBaseMessage) message;
-        initJMSHeaders(jmsMsg, destination);
-
-        try {
-            if (context == null) {
-                throw new IllegalStateException("Context should be inited");
-            }
-            org.apache.rocketmq.common.message.Message rocketmqMsg = 
MessageConverter.convert2RMQMessage(jmsMsg);
-
-            MQProducer producer = producerMap.get(context.getProducerId());
-
-            if (producer == null) {
-                throw new Exception("producer is null ");
-            }
-            SendResult sendResult = producer.send(rocketmqMsg);
-            if (sendResult != null && sendResult.getSendStatus() == 
SendStatus.SEND_OK) {
-                jmsMsg.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:" + 
sendResult.getMsgId());
-            } else {
-                throw new Exception("SendResult is " + (sendResult == null ? 
"null" : sendResult.toString()));
-            }
-        }
-        catch (Exception e) {
-            logger.error("Send rocketmq message failure !", e);
-            //if fail to send the message, throw out JMSException
-            JMSException jmsException = new JMSException("Send rocketmq 
message failure!");
-            jmsException.setLinkedException(e);
-            throw jmsException;
-        }
-    }
-
-    @Override
-    public void send(javax.jms.Message message, int deliveryMode, int priority,
-        long timeToLive) throws JMSException {
-        throw new UnsupportedOperationException("Unsupported");
-    }
-
-    @Override
-    public void send(Destination destination, javax.jms.Message message, int 
deliveryMode,
-        int priority, long timeToLive) throws JMSException {
-        throw new UnsupportedOperationException("Unsupported");
-    }
-
-    /**
-     * Init the JmsMessage Headers.
-     * <p/>
-     * <P>JMS providers init message's headers. Do not allow user to set these 
by yourself.
-     *
-     * @param jmsMsg message
-     * @param destination
-     * @throws javax.jms.JMSException
-     * @see <CODE>Destination</CODE>
-     */
-    private void initJMSHeaders(JmsBaseMessage jmsMsg, Destination 
destination) throws JMSException {
-
-        //JMS_DESTINATION default:"topic:message"
-        jmsMsg.setHeader(JmsBaseConstant.JMS_DESTINATION, destination);
-        //JMS_DELIVERY_MODE default : PERSISTENT
-        jmsMsg.setHeader(JmsBaseConstant.JMS_DELIVERY_MODE, 
javax.jms.Message.DEFAULT_DELIVERY_MODE);
-        //JMS_TIMESTAMP default : current time
-        jmsMsg.setHeader(JmsBaseConstant.JMS_TIMESTAMP, 
System.currentTimeMillis());
-        //JMS_EXPIRATION default :  3 days
-        //JMS_EXPIRATION = currentTime + time_to_live
-        jmsMsg.setHeader(JmsBaseConstant.JMS_EXPIRATION, 
System.currentTimeMillis() + JmsBaseConstant.DEFAULT_TIME_TO_LIVE);
-        //JMS_PRIORITY default : 4
-        jmsMsg.setHeader(JmsBaseConstant.JMS_PRIORITY, 
javax.jms.Message.DEFAULT_PRIORITY);
-        //JMS_TYPE default : open notification service
-        jmsMsg.setHeader(JmsBaseConstant.JMS_TYPE, 
JmsBaseConstant.DEFAULT_JMS_TYPE);
-        //JMS_REPLY_TO,JMS_CORRELATION_ID default : null
-        //JMS_MESSAGE_ID is set by sendResult.
-        //JMS_REDELIVERED is set by broker.
-    }
-
-    /**
-     * Init the OnsMessage Headers.
-     * <p/>
-     * <P>When converting JmsMessage to OnsMessage, should read from the 
JmsMessage's Properties and write to the
-     * OnsMessage's Properties.
-     *
-     * @param jmsMsg message
-     * @throws javax.jms.JMSException
-     */
-    public static Properties initRocketMQHeaders(JmsBaseMessage jmsMsg,
-        String topic, String messageType) throws JMSException {
-        Properties userProperties = new Properties();
-
-        //Jms userProperties to properties
-        Map<String, Object> userProps = jmsMsg.getProperties();
-        Iterator<Map.Entry<String, Object>> userPropsIter = 
userProps.entrySet().iterator();
-        while (userPropsIter.hasNext()) {
-            Map.Entry<String, Object> entry = userPropsIter.next();
-            userProperties.setProperty(entry.getKey(), 
entry.getValue().toString());
-        }
-        //Jms systemProperties to ROCKETMQ properties
-        Map<String, Object> sysProps = jmsMsg.getHeaders();
-        Iterator<Map.Entry<String, Object>> sysPropsIter = 
sysProps.entrySet().iterator();
-        while (sysPropsIter.hasNext()) {
-            Map.Entry<String, Object> entry = sysPropsIter.next();
-            userProperties.setProperty(entry.getKey(), 
entry.getValue().toString());
-        }
-
-        //Jms message Model
-        if (jmsMsg instanceof JmsBytesMessage) {
-            userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, 
MsgConvertUtil.MSGMODEL_BYTES);
-        }
-        else if (jmsMsg instanceof JmsObjectMessage) {
-            userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, 
MsgConvertUtil.MSGMODEL_OBJ);
-        }
-        else if (jmsMsg instanceof JmsTextMessage) {
-            userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, 
MsgConvertUtil.MSGMODEL_TEXT);
-        }
-
-        //message topic and tag
-        userProperties.setProperty(MsgConvertUtil.MSG_TOPIC, topic);
-        userProperties.setProperty(MsgConvertUtil.MSG_TYPE, messageType);
-
-        return userProperties;
-    }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java 
b/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java
deleted file mode 100644
index 5bf7005..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.domain;
-
-import com.google.common.base.Preconditions;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-import org.apache.rocketmq.jms.domain.message.JmsBytesMessage;
-import org.apache.rocketmq.jms.domain.message.JmsObjectMessage;
-import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
-import org.apache.rocketmq.jms.util.ExceptionUtil;
-
-public class JmsBaseSession implements Session {
-    protected CommonContext context;
-    protected JmsBaseConnection connection;
-    protected CopyOnWriteArrayList<JmsBaseMessageConsumer> consumerList =
-        new CopyOnWriteArrayList<JmsBaseMessageConsumer>();
-    private boolean transacted = true;
-    private int acknowledgeMode = AUTO_ACKNOWLEDGE;
-
-    public JmsBaseSession(JmsBaseConnection connection, boolean transacted,
-        int acknowledgeMode, CommonContext context) {
-        this.context = context;
-        this.acknowledgeMode = acknowledgeMode;
-        this.transacted = transacted;
-        this.connection = connection;
-    }
-
-    @Override
-    public BytesMessage createBytesMessage() throws JMSException {
-        return new JmsBytesMessage();
-    }
-
-    @Override
-    public MapMessage createMapMessage() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported");
-    }
-
-    @Override
-    public Message createMessage() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported");
-    }
-
-    @Override
-    public ObjectMessage createObjectMessage() throws JMSException {
-        return new JmsObjectMessage();
-    }
-
-    @Override
-    public ObjectMessage createObjectMessage(Serializable object) throws 
JMSException {
-        return new JmsObjectMessage(object);
-    }
-
-    @Override
-    public StreamMessage createStreamMessage() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported");
-    }
-
-    @Override
-    public TextMessage createTextMessage() throws JMSException {
-        return new JmsTextMessage();
-    }
-
-    @Override
-    public TextMessage createTextMessage(String text) throws JMSException {
-        return new JmsTextMessage(text);
-    }
-
-    @Override
-    public boolean getTransacted() throws JMSException {
-        return this.transacted;
-    }
-
-    @Override
-    public int getAcknowledgeMode() {
-        return this.acknowledgeMode;
-    }
-
-    @Override
-    public void commit() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported");
-    }
-
-    @Override
-    public void rollback() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported");
-    }
-
-    @Override
-    public void close() throws JMSException {
-        for (JmsBaseMessageConsumer messageConsumer : consumerList) {
-            messageConsumer.close();
-        }
-        consumerList.clear();
-    }
-
-    @Override
-    public void recover() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported");
-    }
-
-    @Override
-    public MessageListener getMessageListener() throws JMSException {
-        return null;
-    }
-
-    @Override
-    public void setMessageListener(MessageListener listener) throws 
JMSException {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    @Override
-    public void run() {
-        throw new UnsupportedOperationException("Unsupported");
-    }
-
-    @Override
-    public MessageProducer createProducer(Destination destination) throws 
JMSException {
-        return new JmsBaseMessageProducer(destination, context);
-    }
-
-    /**
-     * Create a MessageConsumer.
-     * <p/>
-     * <P>Create a durable consumer to the specified destination
-     *
-     * @param destination Equals to Topic:MessageType in ROCKETMQ
-     * @throws javax.jms.JMSException
-     * @see <CODE>Destination</CODE>
-     */
-    @Override
-    public MessageConsumer createConsumer(Destination destination) throws 
JMSException {
-        JmsBaseMessageConsumer messageConsumer = new
-            JmsBaseMessageConsumer(destination, this.context, this.connection);
-        this.consumerList.addIfAbsent(messageConsumer);
-        return messageConsumer;
-    }
-
-    /**
-     * Create a MessageConsumer with messageSelector.
-     * <p/>
-     * <P>ROCKETMQ-JMS do not support using messageSelector to filter messages
-     *
-     * @param destination Equals to Topic:MessageType in ROCKETMQ
-     * @param messageSelector For filtering messages
-     * @throws javax.jms.JMSException
-     * @see <CODE>Destination</CODE>
-     */
-    @Override
-    public MessageConsumer createConsumer(Destination destination, String 
messageSelector)
-        throws JMSException {
-        throw new UnsupportedOperationException("Unsupported");
-
-    }
-
-    /**
-     * Create a MessageConsumer with messageSelector.
-     * <p/>
-     * <P>ROCKETMQ-JMS do not support using messageSelector to filter messages 
and do not support this mechanism to reject
-     * messages from localhost.
-     *
-     * @param destination Equals to Topic:MessageType in ROCKETMQ
-     * @param messageSelector For filtering messages
-     * @param noLocal If true: reject messages from localhost
-     * @throws javax.jms.JMSException
-     * @see <CODE>Destination</CODE>
-     */
-    @Override
-    public MessageConsumer createConsumer(Destination destination, String 
messageSelector,
-        boolean noLocal) throws JMSException {
-        throw new UnsupportedOperationException("Unsupported");
-    }
-
-    @Override
-    public Queue createQueue(String queueName) throws JMSException {
-        throw new UnsupportedOperationException("Unsupported");
-    }
-
-    @Override
-    public Topic createTopic(String topicName) throws JMSException {
-        Preconditions.checkNotNull(topicName);
-        List<String> msgTuple = Arrays.asList(topicName.split(":"));
-
-        Preconditions.checkState(msgTuple.size() >= 1 && msgTuple.size() <= 2,
-            "Destination must match messageTopic:messageType !");
-
-        //If messageType is null, use * instead.
-        if (1 == msgTuple.size()) {
-            return new JmsBaseTopic(msgTuple.get(0), "*");
-        }
-        return new JmsBaseTopic(msgTuple.get(0), msgTuple.get(1));
-    }
-
-    /**
-     * Create a MessageConsumer with durable subscription.
-     * <p/>
-     * <P>When using <CODE>createConsumer(Destination)</CODE> method, one 
creates a MessageConsumer with a durable
-     * subscription. So use <CODE>createConsumer(Destination)</CODE> instead 
of these method.
-     *
-     * @param topic destination
-     * @throws javax.jms.JMSException
-     * @see <CODE>Topic</CODE>
-     */
-    @Override
-    public TopicSubscriber createDurableSubscriber(Topic topic, String name)
-        throws JMSException {
-        throw new UnsupportedOperationException("Unsupported");
-    }
-
-    /**
-     * Create a MessageConsumer with durable subscription.
-     * <p/>
-     * <P>When using <CODE>createConsumer(Destination)</CODE> method, one 
creates a MessageConsumer with a durable
-     * subscription. So use <CODE>createConsumer(Destination)</CODE> instead 
of these method.
-     *
-     * @param topic destination
-     * @throws javax.jms.JMSException
-     * @see <CODE>Topic</CODE>
-     */
-    @Override
-    public TopicSubscriber createDurableSubscriber(Topic topic, String name,
-        String messageSelector,
-        boolean noLocal) throws JMSException {
-        throw new UnsupportedOperationException("Unsupported");
-    }
-
-    @Override
-    public QueueBrowser createBrowser(Queue queue) throws JMSException {
-        throw new UnsupportedOperationException("Unsupported");
-    }
-
-    @Override
-    public QueueBrowser createBrowser(Queue queue, String messageSelector) 
throws JMSException {
-        throw new UnsupportedOperationException("Unsupported");
-    }
-
-    @Override
-    public TemporaryQueue createTemporaryQueue() throws JMSException {
-        return new TemporaryQueue() {
-            public void delete() throws JMSException {
-            }
-
-            public String getQueueName() throws JMSException {
-                return UUID.randomUUID().toString();
-            }
-        };
-    }
-
-    @Override
-    public TemporaryTopic createTemporaryTopic() throws JMSException {
-        return new TemporaryTopic() {
-            public void delete() throws JMSException {
-            }
-
-            public String getTopicName() throws JMSException {
-                return UUID.randomUUID().toString();
-            }
-        };
-    }
-
-    @Override
-    public void unsubscribe(String name) throws JMSException {
-        throw new UnsupportedOperationException("Unsupported");
-    }
-
-    public void start() throws JMSException {
-        for (JmsBaseMessageConsumer messageConsumer : consumerList) {
-            messageConsumer.startConsumer();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java 
b/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java
deleted file mode 100644
index b7e2fab..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.domain;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import javax.jms.JMSException;
-import javax.jms.Topic;
-
-public class JmsBaseTopic implements Topic {
-
-    private String messageTopic;
-    private String messageType;
-
-    public JmsBaseTopic(String messageTopic, String messageType) {
-        Preconditions.checkNotNull(messageTopic);
-        Preconditions.checkNotNull(messageType);
-
-        this.messageTopic = messageTopic;
-        this.messageType = messageType;
-    }
-
-    public String getTopicName() throws JMSException {
-        return this.toString();
-    }
-
-    public String toString() {
-        return Joiner.on(":").join(this.getMessageTopic(), 
this.getMessageType());
-    }
-
-    public String getMessageTopic() {
-        return messageTopic;
-    }
-
-    public String getMessageType() {
-        return messageType;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java 
b/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java
deleted file mode 100644
index 7a8a9f7..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.domain;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.rocketmq.client.consumer.MQPushConsumer;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.jms.util.MessageConverter;
-
-public class RMQPushConsumerExt {
-    private final MQPushConsumer consumer;
-    private final ConcurrentHashMap<String/* Topic */, 
javax.jms.MessageListener> subscribeTable = new ConcurrentHashMap<String, 
javax.jms.MessageListener>();
-
-    private AtomicInteger referenceCount = new AtomicInteger(0);
-    private AtomicBoolean started = new AtomicBoolean(false);
-
-    public RMQPushConsumerExt(MQPushConsumer consumer) {
-        this.consumer = consumer;
-    }
-
-    public MQPushConsumer getConsumer() {
-        return consumer;
-    }
-
-    public int incrementAndGet() {
-        return referenceCount.incrementAndGet();
-    }
-
-    public int decrementAndGet() {
-        return referenceCount.decrementAndGet();
-    }
-
-    public int getReferenceCount() {
-        return referenceCount.get();
-    }
-    public void start() throws MQClientException {
-        if (consumer == null) {
-            throw new MQClientException(-1, "consumer is null");
-        }
-
-        if (this.started.compareAndSet(false, true)) {
-            this.consumer.registerMessageListener(new MessageListenerImpl());
-            this.consumer.start();
-        }
-    }
-
-
-    public void close() {
-        if (this.started.compareAndSet(true, false)) {
-            this.consumer.shutdown();
-        }
-    }
-
-    public void subscribe(String topic, String subExpression, 
javax.jms.MessageListener listener) throws MQClientException {
-        if (null == topic) {
-            throw new MQClientException(-1, "topic is null");
-        }
-
-        if (null == listener) {
-            throw new MQClientException(-1, "listener is null");
-        }
-
-        try {
-            this.subscribeTable.put(topic, listener);
-            this.consumer.subscribe(topic, subExpression);
-        } catch (MQClientException e) {
-            throw new MQClientException("consumer subscribe exception", e);
-        }
-    }
-
-    public void unsubscribe(String topic) {
-        if (null != topic) {
-            this.consumer.unsubscribe(topic);
-        }
-    }
-
-    class MessageListenerImpl implements MessageListenerConcurrently {
-
-        @Override
-        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgsRMQList, ConsumeConcurrentlyContext contextRMQ) {
-            MessageExt msgRMQ = msgsRMQList.get(0);
-            javax.jms.MessageListener listener = 
RMQPushConsumerExt.this.subscribeTable.get(msgRMQ.getTopic());
-            if (null == listener) {
-                throw new RuntimeException("MessageListener is null");
-            }
-
-            try {
-                
listener.onMessage(MessageConverter.convert2JMSMessage(msgRMQ));
-            }
-            catch (Exception e) {
-                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
-            }
-            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-        }
-    }
-
-
-    public boolean isStarted() {
-        return started.get();
-    }
-
-
-    public boolean isClosed() {
-        return !isStarted();
-    }
-}

Reply via email to