http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java 
b/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
new file mode 100644
index 0000000..1043f5d
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.jms.exception.MessageExpiredException;
+import org.apache.rocketmq.jms.hook.ReceiveMessageHook;
+import org.apache.rocketmq.jms.msg.convert.RMQ2JMSMessageConvert;
+import org.apache.rocketmq.jms.support.JMSUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.String.format;
+
+/**
+ * Service deliver messages synchronous or asynchronous.
+ */
+public class DeliverMessageService extends ServiceThread {
+
+    private static final Logger log = 
LoggerFactory.getLogger(DeliverMessageService.class);
+    private static final AtomicLong COUNTER = new AtomicLong(0L);
+    private static final int PULL_BATCH_SIZE = 100;
+
+    private RocketMQConsumer consumer;
+    private DefaultMQPullConsumer rocketMQPullConsumer;
+    private Destination destination;
+    private String consumerGroup;
+    private String topicName;
+    private ConsumeModel consumeModel = ConsumeModel.SYNC;
+
+    /** only support RMQ subExpression */
+    private String messageSelector;
+    private ReceiveMessageHook hook = new ReceiveMessageHook();
+
+    /**
+     * If durable is true, consume message from the offset consumed last time.
+     * Otherwise consume from the max offset
+     */
+    private boolean durable = false;
+    private boolean shared = false;
+
+    private BlockingQueue<MessageWrapper> msgQueue = new 
ArrayBlockingQueue(PULL_BATCH_SIZE);
+    private volatile boolean pause = true;
+    private final long index = COUNTER.incrementAndGet();
+
+    private Map<MessageQueue, Long> offsetMap = new HashMap();
+
+    public DeliverMessageService(RocketMQConsumer consumer, Destination 
destination, String consumerGroup,
+        String messageSelector, boolean durable, boolean shared) {
+        this.consumer = consumer;
+        this.destination = destination;
+        this.consumerGroup = consumerGroup;
+        this.messageSelector = messageSelector;
+        this.durable = durable;
+        this.shared = shared;
+
+        this.topicName = JMSUtils.getDestinationName(destination);
+
+        createAndStartRocketMQPullConsumer();
+
+        if (this.consumer.getSession().getConnection().isStarted()) {
+            this.recover();
+        }
+        else {
+            this.pause();
+        }
+    }
+
+    private void createAndStartRocketMQPullConsumer() {
+        final ClientConfig clientConfig = 
this.consumer.getSession().getConnection().getClientConfig();
+        this.rocketMQPullConsumer = new DefaultMQPullConsumer(consumerGroup);
+        
this.rocketMQPullConsumer.setNamesrvAddr(clientConfig.getNamesrvAddr());
+        
this.rocketMQPullConsumer.setInstanceName(clientConfig.getInstanceName());
+        this.rocketMQPullConsumer.setRegisterTopics(new 
HashSet(Arrays.asList(this.topicName)));
+
+        try {
+            this.rocketMQPullConsumer.start();
+        }
+        catch (MQClientException e) {
+            throw new JMSRuntimeException(format("Fail to start RocketMQ pull 
consumer, error msg:%s", ExceptionUtils.getStackTrace(e)));
+        }
+    }
+
+    @Override
+    public String getServiceName() {
+        return DeliverMessageService.class.getSimpleName() + "-" + this.index;
+    }
+
+    @Override
+    public void run() {
+        while (!isStopped()) {
+            if (pause) {
+                this.waitForRunning(1000);
+                continue;
+            }
+
+            try {
+                pullMessage();
+            }
+            catch (InterruptedException e) {
+                log.debug("Pulling messages service has been interrupted");
+            }
+            catch (Exception e) {
+                log.error("Error during pulling messages", e);
+            }
+        }
+    }
+
+    private void pullMessage() throws Exception {
+        Set<MessageQueue> mqs = getMessageQueues();
+
+        for (MessageQueue mq : mqs) {
+            Long offset = offsetMap.get(mq);
+            if (offset == null) {
+                offset = beginOffset(mq);
+            }
+            PullResult pullResult = 
this.rocketMQPullConsumer.pullBlockIfNotFound(mq, this.messageSelector, offset, 
PULL_BATCH_SIZE);
+
+            switch (pullResult.getPullStatus()) {
+                case FOUND:
+                    List<MessageExt> msgs = pullResult.getMsgFoundList();
+                    offsetMap.put(mq, pullResult.getMaxOffset());
+                    for (MessageExt msg : msgs) {
+                        handleMessage(msg, mq);
+                    }
+                    log.debug("Pull {} messages from 
topic:{},broker:{},queueId:{}", msgs.size(), mq.getTopic(), mq.getBrokerName(), 
mq.getQueueId());
+                    break;
+                case NO_NEW_MSG:
+                case NO_MATCHED_MSG:
+                    break;
+                case OFFSET_ILLEGAL:
+                    throw new JMSException("Error during pull 
message[reason:OFFSET_ILLEGAL]");
+            }
+        }
+    }
+
+    private Set<MessageQueue> getMessageQueues() throws MQClientException {
+        Set<MessageQueue> mqs = 
this.rocketMQPullConsumer.fetchSubscribeMessageQueues(this.topicName);
+        return mqs;
+    }
+
+    /**
+     * Refer to {@link #durable}.
+     *
+     * @param mq message queue
+     * @return offset
+     * @throws MQClientException
+     */
+    private Long beginOffset(MessageQueue mq) throws MQClientException {
+        return this.durable ? this.rocketMQPullConsumer.fetchConsumeOffset(mq, 
false) : this.rocketMQPullConsumer.maxOffset(mq);
+    }
+
+    /**
+     * If {@link #consumeModel} is {@link ConsumeModel#ASYNC}, messages pulled 
from broker
+     * are handled in {@link ConsumeMessageService} owned by its session.
+     *
+     * If {@link #consumeModel} is {@link ConsumeModel#SYNC}, messages pulled 
from broker are put
+     * into a memory blocking queue, waiting for the {@link 
MessageConsumer#receive()}
+     * using {@link BlockingQueue#poll()} to handle messages synchronous.
+     *
+     * @param msg to handle message
+     * @throws InterruptedException
+     * @throws JMSException
+     */
+    private void handleMessage(MessageExt msg, MessageQueue mq) throws 
InterruptedException, JMSException {
+        Message jmsMessage = RMQ2JMSMessageConvert.convert(msg);
+
+        try {
+            hook.before(jmsMessage);
+        }
+        catch (MessageExpiredException e) {
+            log.debug(e.getMessage());
+        }
+
+        final MessageWrapper wrapper = new MessageWrapper(jmsMessage, 
this.consumer, mq, msg.getQueueOffset());
+
+        switch (this.consumeModel) {
+            case SYNC:
+                this.msgQueue.put(wrapper);
+                break;
+            case ASYNC:
+                
this.consumer.getSession().getConsumeMessageService().put(wrapper);
+                break;
+            default:
+                throw new JMSException(format("Unsupported consume model[%s]", 
this.consumeModel));
+        }
+    }
+
+    public void ack(MessageQueue mq, Long offset) throws JMSException {
+        try {
+            this.rocketMQPullConsumer.updateConsumeOffset(mq, offset);
+        }
+        catch (MQClientException e) {
+            throw new JMSException(format("Fail to ack 
offset[mq:%s,offset:%s]", mq, offset));
+        }
+    }
+
+    public MessageWrapper poll() throws JMSException {
+        try {
+            return this.msgQueue.take();
+        }
+        catch (InterruptedException e) {
+            throw new JMSException(e.getMessage());
+        }
+    }
+
+    public MessageWrapper poll(long timeout, TimeUnit timeUnit) throws 
JMSException {
+        try {
+            return this.msgQueue.poll(timeout, timeUnit);
+        }
+        catch (InterruptedException e) {
+            throw new JMSException(e.getMessage());
+        }
+    }
+
+    public void pause() {
+        this.pause = true;
+    }
+
+    public void recover() {
+        this.pause = false;
+    }
+
+    public void close() {
+
+        this.stop();
+
+        this.rocketMQPullConsumer.shutdown();
+
+        this.shutdown(true);
+
+        log.debug("Success to close message delivery service:{}", 
getServiceName());
+    }
+
+    public void setConsumeModel(ConsumeModel consumeModel) {
+        this.consumeModel = consumeModel;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java 
b/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java
new file mode 100644
index 0000000..e322e5b
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import javax.jms.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class MessageWrapper {
+
+    private Message message;
+    private RocketMQConsumer consumer;
+    private MessageQueue mq;
+    private long offset;
+
+    public MessageWrapper(Message message, RocketMQConsumer consumer, 
MessageQueue mq, long offset) {
+        this.message = message;
+        this.consumer = consumer;
+        this.mq = mq;
+        this.offset = offset;
+    }
+
+    public Message getMessage() {
+        return message;
+    }
+
+    public RocketMQConsumer getConsumer() {
+        return consumer;
+    }
+
+    public MessageQueue getMq() {
+        return mq;
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java 
b/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java
new file mode 100644
index 0000000..e32a382
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.String.format;
+import static javax.jms.Session.AUTO_ACKNOWLEDGE;
+import static javax.jms.Session.SESSION_TRANSACTED;
+import static org.apache.commons.lang.StringUtils.isNotBlank;
+import static org.apache.commons.lang.exception.ExceptionUtils.getStackTrace;
+
+public class RocketMQConnection implements Connection {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RocketMQConnection.class);
+
+    private String clientID;
+    private ClientConfig clientConfig;
+    private MQClientInstance clientInstance;
+    private String userName;
+    private String password;
+
+    private List<RocketMQSession> sessionList = new ArrayList();
+    private AtomicBoolean started = new AtomicBoolean(false);
+
+    public RocketMQConnection(String nameServerAddress, String clientID, 
String instanceName, String userName,
+        String password) {
+        this.clientID = clientID;
+        this.userName = userName;
+        this.password = password;
+
+        this.clientConfig = new ClientConfig();
+        this.clientConfig.setNamesrvAddr(nameServerAddress);
+        this.clientConfig.setInstanceName(instanceName);
+
+        startClientInstance();
+    }
+
+    private void startClientInstance() {
+        try {
+            // create a tcp connection to broker and some other background 
thread
+            this.clientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(this.clientConfig);
+            clientInstance.start();
+        }
+        catch (MQClientException e) {
+            throw new JMSRuntimeException(format("Fail to startClientInstance 
connection object[namesrvAddr:%s,instanceName:%s]. Error message:%s",
+                this.clientConfig.getNamesrvAddr(), 
this.clientConfig.getInstanceName(), getStackTrace(e)));
+        }
+    }
+
+    @Override
+    public Session createSession() throws JMSException {
+        return createSession(false, AUTO_ACKNOWLEDGE);
+    }
+
+    @Override
+    public Session createSession(int sessionMode) throws JMSException {
+        if (sessionMode == SESSION_TRANSACTED) {
+            return createSession(true, Session.AUTO_ACKNOWLEDGE);
+        }
+        else {
+            return createSession(false, sessionMode);
+        }
+    }
+
+    @Override
+    public Session createSession(boolean transacted, int acknowledgeMode) 
throws JMSException {
+        //todo: support transacted and more acknowledge mode
+        if (transacted) {
+            throw new JMSException("Not support local transaction session");
+        }
+        if (acknowledgeMode != AUTO_ACKNOWLEDGE) {
+            throw new JMSException("Only support AUTO_ACKNOWLEDGE mode now");
+        }
+
+        RocketMQSession session = new RocketMQSession(this, acknowledgeMode, 
transacted);
+        this.sessionList.add(session);
+
+        return session;
+    }
+
+    @Override
+    public ConnectionConsumer createConnectionConsumer(Destination destination,
+        String messageSelector,
+        ServerSessionPool sessionPool,
+        int maxMessages) throws JMSException {
+        //todo
+        throw new JMSException("Not support yet");
+    }
+
+    @Override
+    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, 
String subscriptionName,
+        String messageSelector,
+        ServerSessionPool sessionPool,
+        int maxMessages) throws JMSException {
+        //todo
+        throw new JMSException("Not support yet");
+    }
+
+    @Override
+    public ConnectionConsumer createSharedConnectionConsumer(Topic topic, 
String subscriptionName,
+        String messageSelector, ServerSessionPool sessionPool, int 
maxMessages) throws JMSException {
+        //todo
+        throw new JMSException("Not support yet");
+    }
+
+    @Override
+    public ConnectionConsumer createSharedDurableConnectionConsumer(Topic 
topic, String subscriptionName,
+        String messageSelector, ServerSessionPool sessionPool, int 
maxMessages) throws JMSException {
+        //todo
+        throw new JMSException("Not support yet");
+    }
+
+    @Override
+    public String getClientID() throws JMSException {
+        return this.clientID;
+    }
+
+    @Override
+    public void setClientID(String clientID) throws JMSException {
+        if (isNotBlank(this.clientID)) {
+            throw new IllegalStateException("administratively client 
identifier has been configured.");
+        }
+        this.clientID = clientID;
+    }
+
+    @Override
+    public ConnectionMetaData getMetaData() throws JMSException {
+        return RocketMQConnectionMetaData.instance();
+    }
+
+    @Override
+    public ExceptionListener getExceptionListener() throws JMSException {
+        //todo
+        throw new JMSException("Not support yet");
+    }
+
+    @Override
+    public void setExceptionListener(ExceptionListener listener) throws 
JMSException {
+        //todo
+        throw new JMSException("Not support yet");
+    }
+
+    @Override
+    public void start() throws JMSException {
+        if (this.started.compareAndSet(false, true)) {
+            for (RocketMQSession session : sessionList) {
+                for (RocketMQConsumer consumer : session.getConsumerList()) {
+                    consumer.getDeliverMessageService().recover();
+                }
+            }
+            log.debug("Start connection successfully:{}", toString());
+        }
+    }
+
+    @Override
+    public void stop() throws JMSException {
+        if (this.started.compareAndSet(true, false)) {
+            for (RocketMQSession session : sessionList) {
+                for (RocketMQConsumer consumer : session.getConsumerList()) {
+                    consumer.getDeliverMessageService().pause();
+                }
+            }
+            log.debug("Stop connection successfully:{}", toString());
+        }
+    }
+
+    @Override
+    public void close() throws JMSException {
+
+        for (RocketMQSession session : sessionList) {
+            session.close();
+        }
+
+        this.clientInstance.shutdown();
+
+        log.info("Success to close connection:{}", toString());
+    }
+
+    public boolean isStarted() {
+        return started.get();
+    }
+
+    public ClientConfig getClientConfig() {
+        return clientConfig;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    @Override public String toString() {
+        return new ToStringBuilder(this)
+            .append("nameServerAddress", this.clientConfig.getNamesrvAddr())
+            .append("instanceName", this.clientConfig.getInstanceName())
+            .append("clientIdentifier", this.clientID)
+            .toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java 
b/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java
new file mode 100644
index 0000000..c81e8b5
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.jms.support.JMSUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implement of {@link ConnectionFactory} using RocketMQ client.
+ *
+ * <P>In RocketMQ, all producers and consumers interactive with broker
+ * by an {@link MQClientInstance} object, which encapsulates tcp connection,
+ * schedule task and so on. The best way to control the behavior of 
producers/consumers
+ * derived from a connection is to manipulate the {@link MQClientInstance} 
directly.
+ *
+ * <P>However, this object is not easy to access as it is maintained within 
RocketMQ Client.
+ * Fortunately another equivalent identifier called "instanceName" is provided.
+ * The "instanceName" is a one-to-one conception with {@link MQClientInstance} 
object.
+ * Just like there is a hash map,"instanceName" is the key and a {@link 
MQClientInstance}
+ * object is the value. So the essential keyword passed through all objects 
created by a
+ * connection is "instanceName".
+ */
+public class RocketMQConnectionFactory implements ConnectionFactory {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RocketMQConnectionFactory.class);
+
+    private String nameServerAddress;
+
+    private String clientId;
+
+    public RocketMQConnectionFactory(String nameServerAddress) {
+        this.nameServerAddress = nameServerAddress;
+        this.clientId = JMSUtils.uuid();
+    }
+
+    public RocketMQConnectionFactory(String nameServerAddress, String 
clientId) {
+        this.nameServerAddress = nameServerAddress;
+        this.clientId = clientId;
+    }
+
+    @Override
+    public Connection createConnection() throws JMSException {
+        return createConnection(null, null);
+    }
+
+    /**
+     * Using userName and Password to register a connection. Now access RMQ 
broker
+     * is anonymous and any userName/password is legal.
+     *
+     * @param userName ignored
+     * @param password ignored
+     * @return the new JMS Connection
+     * @throws JMSException
+     */
+    @Override
+    public Connection createConnection(String userName, String password) 
throws JMSException {
+        return createRocketMQConnection(userName, password);
+    }
+
+    private Connection createRocketMQConnection(String userName, String 
password) throws JMSException {
+        final String instanceName = JMSUtils.uuid();
+        RocketMQConnection connection = new 
RocketMQConnection(this.nameServerAddress, this.clientId, instanceName, 
userName, password);
+
+        log.info("Create a connection 
successfully[instanceName:{},clientIdentifier:{},userName:{}", instanceName, 
clientId, userName);
+        return connection;
+    }
+
+    @Override
+    public JMSContext createContext() {
+        //todo:
+        return null;
+    }
+
+    @Override
+    public JMSContext createContext(String userName, String password) {
+        //todo:
+        return null;
+    }
+
+    @Override
+    public JMSContext createContext(String userName, String password, int 
sessionMode) {
+        //todo:
+        return null;
+    }
+
+    @Override
+    public JMSContext createContext(int sessionMode) {
+        //todo:
+        return null;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public String getNameServerAddress() {
+        return nameServerAddress;
+    }
+
+    public void setNameServerAddress(String nameServerAddress) {
+        this.nameServerAddress = nameServerAddress;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java 
b/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java
new file mode 100644
index 0000000..e4353e1
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import java.util.Enumeration;
+import java.util.Vector;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.jms.ConnectionMetaData;
+import javax.jms.JMSException;
+import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum;
+import org.apache.rocketmq.jms.support.ProviderVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQConnectionMetaData implements ConnectionMetaData {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RocketMQConnectionMetaData.class);
+    private static final String PROVIDER_NAME = "Apache RocketMQ";
+
+    private String jmsVersion;
+    private int jmsMajorVersion;
+    private int jmsMinorVersion;
+
+    private String providerVersion;
+    private int providerMajorVersion;
+    private int providerMinorVersion;
+
+    private static RocketMQConnectionMetaData metaData = new 
RocketMQConnectionMetaData();
+
+    private RocketMQConnectionMetaData() {
+        Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*");
+
+        String jmsVersion = null;
+        int jmsMajor = 0;
+        int jmsMinor = 0;
+        try {
+            Package p = Package.getPackage("javax.jms");
+            if (p != null) {
+                jmsVersion = p.getImplementationVersion();
+                Matcher m = pattern.matcher(jmsVersion);
+                if (m.matches()) {
+                    jmsMajor = Integer.parseInt(m.group(1));
+                    jmsMinor = Integer.parseInt(m.group(2));
+                }
+            }
+        }
+        catch (Throwable e) {
+            log.error("Error during getting jms version", e);
+        }
+
+        this.jmsVersion = jmsVersion;
+        this.jmsMajorVersion = jmsMajor;
+        this.jmsMinorVersion = jmsMinor;
+
+        this.providerVersion = ProviderVersion.CURRENT_VERSION.name();
+        this.providerMinorVersion = ProviderVersion.CURRENT_VERSION.getValue();
+        this.providerMajorVersion = ProviderVersion.CURRENT_VERSION.getValue();
+    }
+
+    public static RocketMQConnectionMetaData instance() {
+        return metaData;
+    }
+
+    public String getJMSVersion() throws JMSException {
+        return jmsVersion;
+    }
+
+    public int getJMSMajorVersion() throws JMSException {
+        return jmsMajorVersion;
+    }
+
+    public int getJMSMinorVersion() throws JMSException {
+        return jmsMinorVersion;
+    }
+
+    public String getJMSProviderName() throws JMSException {
+        return PROVIDER_NAME;
+    }
+
+    public String getProviderVersion() throws JMSException {
+        return providerVersion;
+    }
+
+    public int getProviderMajorVersion() throws JMSException {
+        return providerMajorVersion;
+    }
+
+    public int getProviderMinorVersion() throws JMSException {
+        return providerMinorVersion;
+    }
+
+    public Enumeration<?> getJMSXPropertyNames() throws JMSException {
+        Vector<String> result = new Vector<String>();
+        for (JMSPropertiesEnum e : JMSPropertiesEnum.values()) {
+            result.add(e.name());
+        }
+        return result.elements();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java 
b/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java
new file mode 100644
index 0000000..af147e0
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import org.apache.rocketmq.jms.support.JMSUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQConsumer implements MessageConsumer {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RocketMQConsumer.class);
+    private RocketMQSession session;
+    private Destination destination;
+    private String messageSelector;
+    private MessageListener messageListener;
+    private String subscriptionName;
+    private boolean durable;
+    private boolean shared;
+
+    private DeliverMessageService deliverMessageService;
+
+    public RocketMQConsumer(RocketMQSession session, Destination destination,
+        String messageSelector,
+        boolean durable, boolean shared) {
+        this(session, destination, messageSelector, 
UUID.randomUUID().toString(), durable, shared);
+    }
+
+    public RocketMQConsumer(RocketMQSession session, Destination destination,
+        String messageSelector,
+        String subscriptionName, boolean durable, boolean shared) {
+        this.session = session;
+        this.destination = destination;
+        this.messageSelector = messageSelector;
+        this.subscriptionName = subscriptionName;
+        this.durable = durable;
+        this.shared = shared;
+
+        String consumerGroup = JMSUtils.getConsumerGroup(this);
+        this.deliverMessageService = new DeliverMessageService(this, 
this.destination, consumerGroup,
+            this.messageSelector, this.durable, this.shared);
+        this.deliverMessageService.start();
+    }
+
+    @Override
+    public String getMessageSelector() throws JMSException {
+        return messageSelector;
+    }
+
+    @Override
+    public MessageListener getMessageListener() throws JMSException {
+        return this.messageListener;
+    }
+
+    @Override
+    public void setMessageListener(MessageListener listener) throws 
JMSException {
+        if (this.session.isSyncModel()) {
+            throw new JMSException("A asynchronously call is not permitted 
when a session is being used synchronously");
+        }
+
+        this.messageListener = listener;
+        this.deliverMessageService.setConsumeModel(ConsumeModel.ASYNC);
+        this.session.addAsyncConsumer(this);
+    }
+
+    @Override
+    public Message receive() throws JMSException {
+        return this.receive(0);
+    }
+
+    @Override
+    public Message receive(long timeout) throws JMSException {
+        if (this.session.isAsyncModel()) {
+            throw new JMSException("A synchronous call is not permitted when a 
session is being used asynchronously.");
+        }
+
+        this.session.addSyncConsumer(this);
+
+        if (timeout == 0) {
+            MessageWrapper wrapper = this.deliverMessageService.poll();
+            
wrapper.getConsumer().getDeliverMessageService().ack(wrapper.getMq(), 
wrapper.getOffset());
+            return wrapper.getMessage();
+        }
+        else {
+            MessageWrapper wrapper = this.deliverMessageService.poll(timeout, 
TimeUnit.MILLISECONDS);
+            if (wrapper == null) {
+                return null;
+            }
+            
wrapper.getConsumer().getDeliverMessageService().ack(wrapper.getMq(), 
wrapper.getOffset());
+            return wrapper.getMessage();
+        }
+    }
+
+    @Override
+    public Message receiveNoWait() throws JMSException {
+        return receive(1);
+    }
+
+    @Override
+    public void close() throws JMSException {
+        this.deliverMessageService.close();
+    }
+
+    public void start() {
+        this.deliverMessageService.recover();
+    }
+
+    public void stop() {
+        this.deliverMessageService.pause();
+    }
+
+    public DeliverMessageService getDeliverMessageService() {
+        return deliverMessageService;
+    }
+
+    public RocketMQSession getSession() {
+        return session;
+    }
+
+    public String getSubscriptionName() {
+        return subscriptionName;
+    }
+
+    public boolean isDurable() {
+        return durable;
+    }
+
+    public boolean isShared() {
+        return shared;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java 
b/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java
new file mode 100644
index 0000000..f5aa2d4
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import java.util.UUID;
+import javax.jms.CompletionListener;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.jms.exception.UnsupportDeliveryModelException;
+import org.apache.rocketmq.jms.hook.SendMessageHook;
+import org.apache.rocketmq.jms.msg.AbstractJMSMessage;
+import org.apache.rocketmq.jms.msg.convert.JMS2RMQMessageConvert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.String.format;
+import static org.apache.commons.lang.exception.ExceptionUtils.getStackTrace;
+import static 
org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_DELIVERY_MODE_DEFAULT_VALUE;
+import static 
org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_DELIVERY_TIME_DEFAULT_VALUE;
+import static 
org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_PRIORITY_DEFAULT_VALUE;
+import static 
org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_TIME_TO_LIVE_DEFAULT_VALUE;
+import static org.apache.rocketmq.jms.support.ObjectTypeCast.cast2Object;
+
+public class RocketMQProducer implements MessageProducer {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RocketMQProducer.class);
+    private RocketMQSession session;
+    private DefaultMQProducer rocketMQProducer;
+    private Destination destination;
+
+    private boolean disableMessageID;
+    private boolean disableMessageTimestamp;
+    private long timeToLive = JMS_TIME_TO_LIVE_DEFAULT_VALUE;
+    private int deliveryMode = JMS_DELIVERY_MODE_DEFAULT_VALUE;
+    private int priority = JMS_PRIORITY_DEFAULT_VALUE;
+    private long deliveryDelay = JMS_DELIVERY_TIME_DEFAULT_VALUE;
+
+    private SendMessageHook sendMessageHook;
+
+    public RocketMQProducer() {
+    }
+
+    public RocketMQProducer(RocketMQSession session, Destination destination) {
+        this.session = session;
+        this.destination = destination;
+
+        this.rocketMQProducer = new 
DefaultMQProducer(UUID.randomUUID().toString());
+        ClientConfig clientConfig = 
this.session.getConnection().getClientConfig();
+        this.rocketMQProducer.setNamesrvAddr(clientConfig.getNamesrvAddr());
+        this.rocketMQProducer.setInstanceName(clientConfig.getInstanceName());
+        try {
+            this.rocketMQProducer.start();
+        }
+        catch (MQClientException e) {
+            throw new JMSRuntimeException(format("Fail to start producer, 
error msg:%s", getStackTrace(e)));
+        }
+
+        this.sendMessageHook = new SendMessageHook(this);
+    }
+
+    @Override
+    public void setDisableMessageID(boolean value) throws JMSException {
+        this.disableMessageID = value;
+    }
+
+    @Override
+    public boolean getDisableMessageID() throws JMSException {
+        return this.disableMessageID;
+    }
+
+    @Override
+    public void setDisableMessageTimestamp(boolean value) throws JMSException {
+        this.disableMessageTimestamp = value;
+    }
+
+    @Override
+    public boolean getDisableMessageTimestamp() throws JMSException {
+        return this.disableMessageTimestamp;
+    }
+
+    @Override
+    public void setDeliveryMode(int deliveryMode) throws JMSException {
+        throw new UnsupportDeliveryModelException();
+    }
+
+    @Override
+    public int getDeliveryMode() throws JMSException {
+        return this.deliveryMode;
+    }
+
+    @Override
+    public void setPriority(int priority) throws JMSException {
+        this.priority = priority;
+    }
+
+    @Override
+    public int getPriority() throws JMSException {
+        return this.priority;
+    }
+
+    @Override
+    public void setTimeToLive(long timeToLive) throws JMSException {
+        this.timeToLive = timeToLive;
+    }
+
+    @Override
+    public long getTimeToLive() throws JMSException {
+        return this.timeToLive;
+    }
+
+    @Override
+    public void setDeliveryDelay(long deliveryDelay) throws JMSException {
+        this.deliveryDelay = deliveryDelay;
+    }
+
+    @Override
+    public long getDeliveryDelay() throws JMSException {
+        return this.deliveryDelay;
+    }
+
+    @Override
+    public Destination getDestination() throws JMSException {
+        return this.destination;
+    }
+
+    @Override
+    public void close() throws JMSException {
+        this.rocketMQProducer.shutdown();
+    }
+
+    @Override
+    public void send(Message message) throws JMSException {
+        this.send(this.destination, message);
+    }
+
+    @Override
+    public void send(Message message, int deliveryMode, int priority, long 
timeToLive) throws JMSException {
+        this.send(this.destination, message, deliveryMode, priority, 
timeToLive);
+    }
+
+    @Override
+    public void send(Destination destination, Message message) throws 
JMSException {
+        this.send(destination, message, getDeliveryMode(), getPriority(), 
getTimeToLive());
+    }
+
+    @Override
+    public void send(Destination destination, Message message, int 
deliveryMode, int priority,
+        long timeToLive) throws JMSException {
+
+        sendMessageHook.before(message, destination, deliveryMode, priority, 
timeToLive);
+
+        MessageExt rmqMsg = createRocketMQMessage(message);
+
+        SendResult sendResult = sendSync(rmqMsg);
+        if (sendResult != null && sendResult.getSendStatus() == 
SendStatus.SEND_OK) {
+            log.debug("Success to send message[key={}]", rmqMsg.getKeys());
+            return;
+        }
+        else {
+            throw new JMSException(format("Sending message error with result 
status:%s", sendResult.getSendStatus().name()));
+        }
+    }
+
+    private SendResult sendSync(org.apache.rocketmq.common.message.Message 
rmqMsg) throws JMSException {
+
+        try {
+            return rocketMQProducer.send(rmqMsg);
+        }
+        catch (Exception e) {
+            throw new JMSException(format("Fail to send message. Error: %s", 
getStackTrace(e)));
+        }
+    }
+
+    private void sendAsync(org.apache.rocketmq.common.message.Message rmqMsg,
+        CompletionListener completionListener) throws JMSException {
+        try {
+            rocketMQProducer.send(rmqMsg, new 
SendCompletionListener(completionListener));
+        }
+        catch (Exception e) {
+            throw new JMSException(format("Fail to send message. Error: %s", 
getStackTrace(e)));
+        }
+    }
+
+    private MessageExt createRocketMQMessage(Message jmsMsg) throws 
JMSException {
+        AbstractJMSMessage abstractJMSMessage = cast2Object(jmsMsg, 
AbstractJMSMessage.class);
+        try {
+            return JMS2RMQMessageConvert.convert(abstractJMSMessage);
+        }
+        catch (Exception e) {
+            throw new JMSException(format("Fail to convert to RocketMQ jmsMsg. 
Error: %s", getStackTrace(e)));
+        }
+    }
+
+    @Override
+    public void send(Message message, CompletionListener completionListener) 
throws JMSException {
+        this.send(this.destination, message, getDeliveryMode(), getPriority(), 
getTimeToLive(), completionListener);
+    }
+
+    @Override
+    public void send(Message message, int deliveryMode, int priority, long 
timeToLive,
+        CompletionListener completionListener) throws JMSException {
+        this.send(this.destination, message, deliveryMode, priority, 
timeToLive, completionListener);
+    }
+
+    @Override
+    public void send(Destination destination, Message message,
+        CompletionListener completionListener) throws JMSException {
+        this.send(destination, message, getDeliveryMode(), getPriority(), 
getTimeToLive(), completionListener);
+    }
+
+    @Override
+    public void send(Destination destination, Message message, int 
deliveryMode, int priority, long timeToLive,
+        CompletionListener completionListener) throws JMSException {
+
+        sendMessageHook.before(message, destination, deliveryMode, priority, 
timeToLive);
+
+        MessageExt rmqMsg = createRocketMQMessage(message);
+
+        sendAsync(rmqMsg, completionListener);
+    }
+
+    public RocketMQSession getSession() {
+        return session;
+    }
+
+    public void setSession(RocketMQSession session) {
+        this.session = session;
+    }
+
+    public void setRocketMQProducer(DefaultMQProducer rocketMQProducer) {
+        this.rocketMQProducer = rocketMQProducer;
+    }
+
+    public void setDestination(Destination destination) {
+        this.destination = destination;
+    }
+
+    public void setSendMessageHook(SendMessageHook sendMessageHook) {
+        this.sendMessageHook = sendMessageHook;
+    }
+
+    public String getUserName() {
+        return this.session.getConnection().getUserName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java 
b/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
new file mode 100644
index 0000000..0094c47
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.jms.admin.AdminFactory;
+import org.apache.rocketmq.jms.destination.RocketMQQueue;
+import org.apache.rocketmq.jms.destination.RocketMQTopic;
+import org.apache.rocketmq.jms.exception.DuplicateSubscriptionException;
+import org.apache.rocketmq.jms.msg.JMSBytesMessage;
+import org.apache.rocketmq.jms.msg.JMSMapMessage;
+import org.apache.rocketmq.jms.msg.JMSObjectMessage;
+import org.apache.rocketmq.jms.msg.JMSTextMessage;
+import org.apache.rocketmq.jms.support.JMSUtils;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.rocketmq.jms.Constant.DEFAULT_DURABLE;
+import static org.apache.rocketmq.jms.Constant.DEFAULT_NO_LOCAL;
+import static org.apache.rocketmq.jms.Constant.NO_MESSAGE_SELECTOR;
+
+/**
+ * Implement of {@link Session}.
+ */
+public class RocketMQSession implements Session {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RocketMQSession.class);
+
+    private RocketMQConnection connection;
+
+    private int acknowledgeMode;
+
+    private boolean transacted;
+
+    private ConsumeMessageService consumeMessageService;
+
+    private final List<RocketMQProducer> producerList = new ArrayList();
+
+    private final List<RocketMQConsumer> consumerList = new ArrayList();
+
+    private final Set<RocketMQConsumer> asyncConsumerSet = new HashSet();
+
+    private final Set<RocketMQConsumer> syncConsumerSet = new HashSet();
+
+    public RocketMQSession(RocketMQConnection connection, int acknowledgeMode, 
boolean transacted) {
+        this.connection = connection;
+        this.acknowledgeMode = acknowledgeMode;
+        this.transacted = transacted;
+
+        this.consumeMessageService = new ConsumeMessageService(this);
+        this.consumeMessageService.start();
+    }
+
+    @Override
+    public BytesMessage createBytesMessage() throws JMSException {
+        return new JMSBytesMessage();
+    }
+
+    @Override
+    public MapMessage createMapMessage() throws JMSException {
+        return new JMSMapMessage();
+    }
+
+    @Override
+    public Message createMessage() throws JMSException {
+        return new JMSBytesMessage();
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage() throws JMSException {
+        return new JMSObjectMessage();
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage(Serializable serializable) throws 
JMSException {
+        return new JMSObjectMessage(serializable);
+    }
+
+    @Override
+    public StreamMessage createStreamMessage() throws JMSException {
+        //todo
+        throw new JMSException("Not support yet");
+    }
+
+    @Override
+    public TextMessage createTextMessage() throws JMSException {
+        return new JMSTextMessage();
+    }
+
+    @Override
+    public TextMessage createTextMessage(String text) throws JMSException {
+        return new JMSTextMessage(text);
+    }
+
+    @Override
+    public boolean getTransacted() throws JMSException {
+        return this.transacted;
+    }
+
+    @Override
+    public int getAcknowledgeMode() throws JMSException {
+        return this.acknowledgeMode;
+    }
+
+    @Override
+    public void commit() throws JMSException {
+        //todo
+        throw new JMSException("Not support yet");
+    }
+
+    @Override
+    public void rollback() throws JMSException {
+        //todo
+        throw new JMSException("Not support yet");
+    }
+
+    @Override
+    public void close() throws JMSException {
+        for (RocketMQProducer producer : this.producerList) {
+            producer.close();
+        }
+        for (RocketMQConsumer consumer : this.consumerList) {
+            consumer.close();
+        }
+    }
+
+    @Override
+    public void recover() throws JMSException {
+        //todo
+        throw new JMSException("Not support yet");
+    }
+
+    @Override
+    public MessageListener getMessageListener() throws JMSException {
+        //todo
+        throw new JMSException("Not support yet");
+    }
+
+    @Override
+    public void setMessageListener(MessageListener listener) throws 
JMSException {
+        //todo
+        throw new JMSException("Not support yet");
+    }
+
+    @Override
+    public void run() {
+        //todo
+        throw new JMSRuntimeException("Not support yet");
+    }
+
+    @Override
+    public MessageProducer createProducer(Destination destination) throws 
JMSException {
+        RocketMQProducer producer = new RocketMQProducer(this, destination);
+        this.producerList.add(producer);
+        return producer;
+    }
+
+    @Override
+    public MessageConsumer createConsumer(Destination destination) throws 
JMSException {
+        return createConsumer(destination, NO_MESSAGE_SELECTOR);
+    }
+
+    @Override
+    public MessageConsumer createConsumer(Destination destination, String 
messageSelector) throws JMSException {
+        return createConsumer(destination, messageSelector, DEFAULT_NO_LOCAL);
+    }
+
+    @Override
+    public MessageConsumer createConsumer(Destination destination, String 
messageSelector,
+        boolean noLocal) throws JMSException {
+
+        // ignore noLocal param as RMQ not support
+        RocketMQConsumer consumer = new RocketMQConsumer(this, destination, 
messageSelector, DEFAULT_DURABLE, false);
+        this.consumerList.add(consumer);
+
+        return consumer;
+    }
+
+    @Override
+    public MessageConsumer createSharedConsumer(Topic topic, String 
sharedSubscriptionName) throws JMSException {
+        return createSharedConsumer(topic, sharedSubscriptionName, 
NO_MESSAGE_SELECTOR);
+    }
+
+    @Override
+    public MessageConsumer createSharedConsumer(Topic topic, String 
sharedSubscriptionName,
+        String messageSelector) throws JMSException {
+        RocketMQConsumer consumer = new RocketMQConsumer(this, topic, 
messageSelector, sharedSubscriptionName, DEFAULT_DURABLE, true);
+        this.consumerList.add(consumer);
+
+        return consumer;
+    }
+
+    @Override
+    public Queue createQueue(String queueName) throws JMSException {
+        return new RocketMQQueue(queueName);
+    }
+
+    @Override
+    public Topic createTopic(String topicName) throws JMSException {
+        Preconditions.checkNotNull(topicName);
+
+        return new RocketMQTopic(topicName);
+    }
+
+    @Override
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name) 
throws JMSException {
+        return createDurableSubscriber(topic, name, NO_MESSAGE_SELECTOR, 
DEFAULT_NO_LOCAL);
+    }
+
+    @Override
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name, 
String messageSelector,
+        boolean noLocal) throws JMSException {
+        RocketMQTopicSubscriber subscriber = new RocketMQTopicSubscriber(this, 
topic, messageSelector, name, true, true);
+        this.consumerList.add(subscriber);
+
+        return subscriber;
+    }
+
+    @Override
+    public MessageConsumer createDurableConsumer(Topic topic, String name) 
throws JMSException {
+        return createDurableConsumer(topic, name, NO_MESSAGE_SELECTOR, true);
+    }
+
+    @Override
+    public MessageConsumer createDurableConsumer(Topic topic, String name, 
String messageSelector,
+        boolean noLocal) throws JMSException {
+        DefaultMQAdminExt admin = 
AdminFactory.getAdmin(this.getConnection().getClientConfig().getNamesrvAddr());
+        try {
+            GroupList groupList = 
admin.queryTopicConsumeByWho(topic.getTopicName());
+            if 
(groupList.getGroupList().contains(JMSUtils.getConsumerGroup(name, 
this.getConnection().getClientID(), false))) {
+                throw new DuplicateSubscriptionException("The same 
subscription( join subscriptionName with clientID) has existed, so couldn't 
create consumer on them again ");
+            }
+        }
+        catch (InterruptedException | MQBrokerException | RemotingException | 
MQClientException e) {
+            throw new JMSException(ExceptionUtils.getStackTrace(e));
+        }
+        RocketMQConsumer consumer = new RocketMQConsumer(this, topic, 
messageSelector, name, true, false);
+        this.consumerList.add(consumer);
+
+        return consumer;
+    }
+
+    @Override
+    public MessageConsumer createSharedDurableConsumer(Topic topic, String 
name) throws JMSException {
+        return createSharedDurableConsumer(topic, name, NO_MESSAGE_SELECTOR);
+    }
+
+    @Override
+    public MessageConsumer createSharedDurableConsumer(Topic topic, String 
name,
+        String messageSelector) throws JMSException {
+        RocketMQConsumer consumer = new RocketMQConsumer(this, topic, 
messageSelector, name, true, true);
+        this.consumerList.add(consumer);
+
+        return consumer;
+    }
+
+    @Override
+    public QueueBrowser createBrowser(Queue queue) throws JMSException {
+        //todo
+        throw new JMSException("Not support yet");
+    }
+
+    @Override
+    public QueueBrowser createBrowser(Queue queue, String messageSelector) 
throws JMSException {
+        //todo
+        throw new JMSException("Not support yet");
+    }
+
+    @Override
+    public TemporaryQueue createTemporaryQueue() throws JMSException {
+        //todo
+        throw new JMSException("Not support yet");
+    }
+
+    @Override
+    public TemporaryTopic createTemporaryTopic() throws JMSException {
+        //todo
+        throw new JMSException("Not support yet");
+    }
+
+    @Override
+    public void unsubscribe(String name) throws JMSException {
+        //todo
+        throw new JMSException("Not support yet");
+    }
+
+    public List<RocketMQProducer> getProducerList() {
+        return producerList;
+    }
+
+    public List<RocketMQConsumer> getConsumerList() {
+        return consumerList;
+    }
+
+    public RocketMQConnection getConnection() {
+        return connection;
+    }
+
+    public boolean isTransacted() {
+        return transacted;
+    }
+
+    public void addSyncConsumer(RocketMQConsumer consumer) {
+        this.syncConsumerSet.add(consumer);
+    }
+
+    public void addAsyncConsumer(RocketMQConsumer consumer) {
+        this.asyncConsumerSet.add(consumer);
+    }
+
+    public boolean isAsyncModel() {
+        return !this.asyncConsumerSet.isEmpty();
+    }
+
+    public boolean isSyncModel() {
+        return !this.syncConsumerSet.isEmpty();
+    }
+
+    public ConsumeMessageService getConsumeMessageService() {
+        return consumeMessageService;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java 
b/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java
new file mode 100644
index 0000000..51b732b
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import javax.jms.JMSException;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+public class RocketMQTopicSubscriber extends RocketMQConsumer implements 
TopicSubscriber {
+
+    private Topic topic;
+
+    public RocketMQTopicSubscriber(RocketMQSession session, Topic topic, 
String messageSelector,
+        String sharedSubscriptionName, boolean durable, boolean shared) {
+        super(session, topic, messageSelector, sharedSubscriptionName, 
durable, shared);
+        this.topic = topic;
+    }
+
+    @Override
+    public Topic getTopic() throws JMSException {
+        return this.topic;
+    }
+
+    @Override
+    public boolean getNoLocal() throws JMSException {
+        //todo: not inhibit now
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/SendCompletionListener.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/rocketmq/jms/SendCompletionListener.java 
b/src/main/java/org/apache/rocketmq/jms/SendCompletionListener.java
new file mode 100644
index 0000000..c2ea161
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/jms/SendCompletionListener.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import javax.jms.CompletionListener;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+
+public class SendCompletionListener implements SendCallback {
+
+    private CompletionListener completionListener;
+
+    public SendCompletionListener(CompletionListener completionListener) {
+        this.completionListener = completionListener;
+    }
+
+    @Override
+    public void onSuccess(SendResult sendResult) {
+        //todo: how to transmit message into
+        this.completionListener.onCompletion(null);
+    }
+
+    @Override
+    public void onException(Throwable e) {
+        //todo: how to transmit message into
+        this.completionListener.onException(null, new Exception(e));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java 
b/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java
new file mode 100644
index 0000000..c551a22
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.admin;
+
+import java.util.concurrent.ConcurrentHashMap;
+import javax.jms.JMSException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.jms.exception.JMSClientException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+
+public class AdminFactory {
+
+    private static ConcurrentHashMap<String/*nameServerAddress*/, 
DefaultMQAdminExt> admins = new ConcurrentHashMap();
+
+    public static DefaultMQAdminExt getAdmin(String nameServerAddress) throws 
JMSException {
+        if (nameServerAddress == null) {
+            throw new IllegalArgumentException("NameServerAddress could be 
null");
+        }
+
+        DefaultMQAdminExt admin = admins.get(nameServerAddress);
+        if (admin != null) {
+            return admin;
+        }
+
+        admin = new DefaultMQAdminExt(nameServerAddress);
+        try {
+            admin.start();
+        }
+        catch (MQClientException e) {
+            throw new JMSClientException("Error during starting admin client");
+        }
+        DefaultMQAdminExt old = admins.putIfAbsent(nameServerAddress, admin);
+        if (old != null) {
+            admin.shutdown();
+            return old;
+        }
+
+        return admin;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/destination/RocketMQQueue.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/rocketmq/jms/destination/RocketMQQueue.java 
b/src/main/java/org/apache/rocketmq/jms/destination/RocketMQQueue.java
new file mode 100644
index 0000000..d7d5b84
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/jms/destination/RocketMQQueue.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.destination;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+
+public class RocketMQQueue implements Queue {
+
+    private String name;
+
+    public RocketMQQueue(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public String getQueueName() throws JMSException {
+        return this.name;
+    }
+
+    @Override public String toString() {
+        return this.name;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/destination/RocketMQTopic.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/rocketmq/jms/destination/RocketMQTopic.java 
b/src/main/java/org/apache/rocketmq/jms/destination/RocketMQTopic.java
new file mode 100644
index 0000000..3214b4c
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/jms/destination/RocketMQTopic.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.destination;
+
+import javax.jms.JMSException;
+import javax.jms.Topic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQTopic implements Topic {
+    private static final Logger log = 
LoggerFactory.getLogger(RocketMQTopic.class);
+
+    private String name;
+
+    public RocketMQTopic(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public String getTopicName() throws JMSException {
+        return this.name;
+    }
+
+    @Override
+    public String toString() {
+        return this.name;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java
 
b/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java
new file mode 100644
index 0000000..51ce57d
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.exception;
+
+import javax.jms.JMSException;
+
+public class DuplicateSubscriptionException extends JMSException {
+
+    public DuplicateSubscriptionException(String reason, String errorCode) {
+        super(reason, errorCode);
+    }
+
+    public DuplicateSubscriptionException(String reason) {
+        super(reason);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java 
b/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java
new file mode 100644
index 0000000..1335eb9
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.exception;
+
+import javax.jms.JMSException;
+
+public class JMSClientException extends JMSException {
+
+    public JMSClientException(String reason, String errorCode) {
+        super(reason, errorCode);
+    }
+
+    public JMSClientException(String reason) {
+        super(reason);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/exception/MessageExpiredException.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/rocketmq/jms/exception/MessageExpiredException.java 
b/src/main/java/org/apache/rocketmq/jms/exception/MessageExpiredException.java
new file mode 100644
index 0000000..d35b7b1
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/jms/exception/MessageExpiredException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.exception;
+
+import javax.jms.JMSException;
+
+public class MessageExpiredException extends JMSException {
+
+    public MessageExpiredException(String reason, String errorCode) {
+        super(reason, errorCode);
+    }
+
+    public MessageExpiredException(String reason) {
+        super(reason);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/exception/UnsupportDeliveryModelException.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/rocketmq/jms/exception/UnsupportDeliveryModelException.java
 
b/src/main/java/org/apache/rocketmq/jms/exception/UnsupportDeliveryModelException.java
new file mode 100644
index 0000000..903b75f
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/jms/exception/UnsupportDeliveryModelException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.exception;
+
+import javax.jms.JMSRuntimeException;
+
+public class UnsupportDeliveryModelException extends JMSRuntimeException {
+
+    public UnsupportDeliveryModelException() {
+        super("Only support PERSISTENT model, and guarantee at-least-once");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/hook/ReceiveMessageHook.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/rocketmq/jms/hook/ReceiveMessageHook.java 
b/src/main/java/org/apache/rocketmq/jms/hook/ReceiveMessageHook.java
new file mode 100644
index 0000000..2bf9bd3
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/jms/hook/ReceiveMessageHook.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.hook;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import org.apache.rocketmq.jms.exception.MessageExpiredException;
+import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum;
+
+public class ReceiveMessageHook {
+
+    public void before(Message message) throws JMSException {
+
+        validate(message);
+
+        setProviderProperties(message);
+    }
+
+    private void validate(Message message) throws JMSException {
+        if (message.getJMSExpiration() != 0 && System.currentTimeMillis() > 
message.getJMSExpiration()) {
+            throw new MessageExpiredException(String.format("This 
message[id=%s] has been expired", message.getJMSMessageID()));
+        }
+    }
+
+    public void setProviderProperties(Message message) throws JMSException {
+        //JMSXRcvTimestamp
+        message.setLongProperty(JMSPropertiesEnum.JMSXRcvTimestamp.name(), 
System.currentTimeMillis());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java 
b/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java
new file mode 100644
index 0000000..0429973
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.hook;
+
+import java.util.UUID;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import org.apache.rocketmq.jms.RocketMQProducer;
+import org.apache.rocketmq.jms.exception.UnsupportDeliveryModelException;
+import org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum;
+
+import static org.apache.rocketmq.jms.Constant.MESSAGE_ID_PREFIX;
+import static org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum.JMSXUserID;
+
+/**
+ * Hook that executes before sending message.
+ */
+public class SendMessageHook {
+
+    private RocketMQProducer producer;
+
+    public SendMessageHook() {
+    }
+
+    public SendMessageHook(RocketMQProducer producer) {
+        this.producer = producer;
+    }
+
+    public void before(Message message, Destination destination, int 
deliveryMode, int priority,
+        long timeToLive) throws JMSException {
+
+        validate(deliveryMode);
+
+        setProviderHeader(message, destination, deliveryMode, priority, 
timeToLive);
+
+        setProviderProperties(message);
+    }
+
+    private void setProviderHeader(Message message, Destination destination, 
int deliveryMode, int priority,
+        long timeToLive) throws JMSException {
+        // destination
+        message.setJMSDestination(destination);
+
+        // delivery mode
+        message.setJMSDeliveryMode(deliveryMode);
+
+        // expiration
+        if (timeToLive != 0) {
+            message.setJMSExpiration(System.currentTimeMillis() + timeToLive);
+        }
+        else {
+            message.setJMSExpiration(0L);
+        }
+
+        // delivery time
+        message.setJMSDeliveryTime(message.getJMSTimestamp() + 
this.producer.getDeliveryDelay());
+
+        // priority
+        message.setJMSPriority(priority);
+
+        // messageID is also required in async model, so {@link 
MessageExt#getMsgId()} can't be used.
+        if (!this.producer.getDisableMessageID()) {
+            message.setJMSMessageID(new 
StringBuffer(MESSAGE_ID_PREFIX).append(UUID.randomUUID().toString()).toString());
+        }
+
+        // timestamp
+        if (!this.producer.getDisableMessageTimestamp()) {
+            message.setJMSTimestamp(System.currentTimeMillis());
+        }
+    }
+
+    private void validate(int deliveryMode) {
+        if (deliveryMode != JMSHeaderEnum.JMS_DELIVERY_MODE_DEFAULT_VALUE) {
+            throw new UnsupportDeliveryModelException();
+        }
+    }
+
+    public void setProviderProperties(Message message) throws JMSException {
+        // JMSXUserID
+        if (this.producer.getUserName() != null) {
+            message.setStringProperty(JMSXUserID.name(), 
this.producer.getUserName());
+        }
+    }
+
+    public void setProducer(RocketMQProducer producer) {
+        this.producer = producer;
+    }
+}

Reply via email to