Fix error,refactor serialize and MessageConverter further. Unit test will be 
added in next commit


Project: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/464cbc1d
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/464cbc1d
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/464cbc1d

Branch: refs/heads/jms-dev-1.1.0
Commit: 464cbc1d89ba67d61371e398e74fe78025a71c24
Parents: 37576dd
Author: zhangke <zhangke_beij...@qq.com>
Authored: Thu Feb 23 22:21:25 2017 +0800
Committer: zhangke <zhangke_beij...@qq.com>
Committed: Thu Feb 23 22:21:25 2017 +0800

----------------------------------------------------------------------
 .../rocketmq/jms/DeliverMessageService.java     |   4 +-
 .../org/apache/rocketmq/jms/JMSHeaderEnum.java  |  11 +-
 .../rocketmq/jms/JMSMessageModelEnum.java       |   7 +-
 .../apache/rocketmq/jms/RocketMQProducer.java   | 121 ++++----------
 .../apache/rocketmq/jms/RocketMQSession.java    |  11 +-
 .../apache/rocketmq/jms/SendMessageHook.java    |  27 ----
 .../rocketmq/jms/hook/SendMessageHook.java      |  68 ++++++++
 .../apache/rocketmq/jms/msg/JMSMapMessage.java  |   2 +-
 .../rocketmq/jms/msg/JMSObjectMessage.java      |   8 +-
 .../apache/rocketmq/jms/msg/JMSTextMessage.java |   3 +-
 .../jms/msg/convert/JMS2RMQMessageConvert.java  |  62 ++++++++
 .../jms/msg/convert/RMQ2JMSMessageConvert.java  |  94 +++++++++++
 .../jms/msg/serialize/MapSerialize.java         |  12 +-
 .../jms/msg/serialize/ObjectSerialize.java      |  11 +-
 .../rocketmq/jms/msg/serialize/Serialize.java   |   2 -
 .../jms/msg/serialize/StringSerialize.java      |  13 +-
 .../rocketmq/jms/support/MessageConverter.java  | 157 -------------------
 .../jms/msg/RocketMQBytesMessageTest.java       |   6 +-
 .../jms/support/MessageConvertTest.java         |  74 ---------
 19 files changed, 310 insertions(+), 383 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java 
b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
index da8196f..bcfc680 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
@@ -38,8 +38,8 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.jms.msg.convert.RMQ2JMSMessageConvert;
 import org.apache.rocketmq.jms.support.JmsHelper;
-import org.apache.rocketmq.jms.support.MessageConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -183,7 +183,7 @@ public class DeliverMessageService extends ServiceThread {
      * @throws JMSException
      */
     private void handleMessage(MessageExt msg, MessageQueue mq) throws 
InterruptedException, JMSException {
-        Message jmsMessage = MessageConverter.convert2JMSMessage(msg);
+        Message jmsMessage = RMQ2JMSMessageConvert.convert(msg);
         if (jmsMessage.getJMSExpiration() != 0 && System.currentTimeMillis() > 
jmsMessage.getJMSExpiration()) {
             log.debug("The message[id={}] has been expired", msg.getMsgId());
             return;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/JMSHeaderEnum.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/JMSHeaderEnum.java 
b/core/src/main/java/org/apache/rocketmq/jms/JMSHeaderEnum.java
index a9c758e..4979f88 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/JMSHeaderEnum.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/JMSHeaderEnum.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.jms;
 
+import javax.jms.Message;
+
 public enum JMSHeaderEnum {
 
     JMSDestination,
@@ -31,11 +33,12 @@ public enum JMSHeaderEnum {
     JMSPriority,
     JMSDeliveryTime;
 
-    public static final int JMS_DELIVERY_MODE_DEFAULT_VALUE = 0;
-    public static final int JMS_TIMESTAMP_DEFAULT_VALUE = 0;
+    public static final int JMS_DELIVERY_MODE_DEFAULT_VALUE = 
Message.DEFAULT_DELIVERY_MODE;
+    public static final long JMS_TIME_TO_LIVE_DEFAULT_VALUE = 
Message.DEFAULT_TIME_TO_LIVE;
+    public static final int JMS_PRIORITY_DEFAULT_VALUE = 
Message.DEFAULT_PRIORITY;
+    public static final long JMS_DELIVERY_TIME_DEFAULT_VALUE = 
Message.DEFAULT_DELIVERY_DELAY;
     public static final boolean JMS_REDELIVERED_DEFAULT_VALUE = false;
+    public static final int JMS_TIMESTAMP_DEFAULT_VALUE = 0;
     public static final int JMS_EXPIRATION_DEFAULT_VALUE = 0;
-    public static final int JMS_PRIORITY_DEFAULT_VALUE = 5;
-    public static final int JMS_DELIVERY_TIME_DEFAULT_VALUE = 0;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/JMSMessageModelEnum.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/JMSMessageModelEnum.java 
b/core/src/main/java/org/apache/rocketmq/jms/JMSMessageModelEnum.java
index 0659f92..feee4e3 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/JMSMessageModelEnum.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/JMSMessageModelEnum.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.jms;
 
+import org.apache.rocketmq.jms.msg.AbstractJMSMessage;
 import org.apache.rocketmq.jms.msg.JMSBytesMessage;
 import org.apache.rocketmq.jms.msg.JMSMapMessage;
 import org.apache.rocketmq.jms.msg.JMSObjectMessage;
@@ -36,14 +37,14 @@ public enum JMSMessageModelEnum {
         this.jmsClass = jmsClass;
     }
 
-    public static JMSMessageModelEnum toMsgModelEnum(Class clazz) {
+    public static JMSMessageModelEnum toMsgModelEnum(AbstractJMSMessage 
jmsMsg) {
         for (JMSMessageModelEnum e : values()) {
-            if (e.getJmsClass() == clazz) {
+            if (e.getJmsClass().isInstance(jmsMsg)) {
                 return e;
             }
         }
 
-        throw new IllegalArgumentException(String.format("Not supported 
class[%s]", clazz));
+        throw new IllegalArgumentException(String.format("Not supported 
class[%s]", jmsMsg.getClass().getSimpleName()));
     }
 
     public Class getJmsClass() {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java 
b/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java
index 8cc5903..109f3bb 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java
@@ -30,24 +30,18 @@ import 
org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.jms.hook.SendMessageHook;
 import org.apache.rocketmq.jms.msg.AbstractJMSMessage;
-import org.apache.rocketmq.jms.support.MessageConverter;
+import org.apache.rocketmq.jms.msg.convert.JMS2RMQMessageConvert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static java.lang.String.format;
-import static javax.jms.Message.DEFAULT_DELIVERY_MODE;
-import static javax.jms.Message.DEFAULT_PRIORITY;
-import static javax.jms.Message.DEFAULT_TIME_TO_LIVE;
 import static org.apache.commons.lang.exception.ExceptionUtils.getStackTrace;
-import static org.apache.rocketmq.jms.Constant.DEFAULT_JMS_TYPE;
-import static org.apache.rocketmq.jms.Constant.JMS_DELIVERY_MODE;
-import static org.apache.rocketmq.jms.Constant.JMS_DESTINATION;
-import static org.apache.rocketmq.jms.Constant.JMS_EXPIRATION;
-import static org.apache.rocketmq.jms.Constant.JMS_PRIORITY;
-import static org.apache.rocketmq.jms.Constant.JMS_TIMESTAMP;
-import static org.apache.rocketmq.jms.Constant.JMS_TYPE;
-import static org.apache.rocketmq.jms.Constant.MESSAGE_ID_PREFIX;
+import static 
org.apache.rocketmq.jms.JMSHeaderEnum.JMS_DELIVERY_MODE_DEFAULT_VALUE;
+import static 
org.apache.rocketmq.jms.JMSHeaderEnum.JMS_DELIVERY_TIME_DEFAULT_VALUE;
+import static org.apache.rocketmq.jms.JMSHeaderEnum.JMS_PRIORITY_DEFAULT_VALUE;
+import static 
org.apache.rocketmq.jms.JMSHeaderEnum.JMS_TIME_TO_LIVE_DEFAULT_VALUE;
 import static 
org.apache.rocketmq.jms.support.DirectTypeConverter.convert2Object;
 
 public class RocketMQProducer implements MessageProducer {
@@ -59,7 +53,12 @@ public class RocketMQProducer implements MessageProducer {
 
     private boolean disableMessageID;
     private boolean disableMessageTimestamp;
-    private long timeToLive;
+    private long timeToLive = JMS_TIME_TO_LIVE_DEFAULT_VALUE;
+    private int deliveryMode = JMS_DELIVERY_MODE_DEFAULT_VALUE;
+    private int priority = JMS_PRIORITY_DEFAULT_VALUE;
+    private long deliveryDelay = JMS_DELIVERY_TIME_DEFAULT_VALUE;
+
+    private SendMessageHook sendMessageHook;
 
     public RocketMQProducer(RocketMQSession session, Destination destination) {
         this.session = session;
@@ -75,6 +74,8 @@ public class RocketMQProducer implements MessageProducer {
         catch (MQClientException e) {
             throw new JMSRuntimeException(format("Fail to start producer, 
error msg:%s", getStackTrace(e)));
         }
+
+        this.sendMessageHook = new SendMessageHook(this);
     }
 
     @Override
@@ -99,24 +100,22 @@ public class RocketMQProducer implements MessageProducer {
 
     @Override
     public void setDeliveryMode(int deliveryMode) throws JMSException {
-        //todo
+        this.deliveryMode = deliveryMode;
     }
 
     @Override
     public int getDeliveryMode() throws JMSException {
-        //todo
-        return 0;
+        return this.deliveryMode;
     }
 
     @Override
-    public void setPriority(int defaultPriority) throws JMSException {
-        //todo
+    public void setPriority(int priority) throws JMSException {
+        this.priority = priority;
     }
 
     @Override
     public int getPriority() throws JMSException {
-        //todo
-        return 0;
+        return this.priority;
     }
 
     @Override
@@ -126,24 +125,22 @@ public class RocketMQProducer implements MessageProducer {
 
     @Override
     public long getTimeToLive() throws JMSException {
-        return this.getTimeToLive();
+        return this.timeToLive;
     }
 
     @Override
     public void setDeliveryDelay(long deliveryDelay) throws JMSException {
-        //todo
+        this.deliveryDelay = deliveryDelay;
     }
 
     @Override
     public long getDeliveryDelay() throws JMSException {
-        //todo
-        return 0;
+        return this.deliveryDelay;
     }
 
     @Override
     public Destination getDestination() throws JMSException {
-        //todo
-        return null;
+        return this.destination;
     }
 
     @Override
@@ -163,15 +160,14 @@ public class RocketMQProducer implements MessageProducer {
 
     @Override
     public void send(Destination destination, Message message) throws 
JMSException {
-        //todo: DEFAULT_TIME_TO_LIVE is zero which means message never 
expires. This feature is not supported by RMQ now.
-        this.send(destination, message, DEFAULT_DELIVERY_MODE, 
DEFAULT_PRIORITY, DEFAULT_TIME_TO_LIVE);
+        this.send(destination, message, getDeliveryMode(), getPriority(), 
getTimeToLive());
     }
 
     @Override
     public void send(Destination destination, Message message, int 
deliveryMode, int priority,
         long timeToLive) throws JMSException {
 
-        before(message);
+        sendMessageHook.before(message, destination, deliveryMode, priority, 
timeToLive);
 
         MessageExt rmqMsg = createRocketMQMessage(message);
 
@@ -205,93 +201,42 @@ public class RocketMQProducer implements MessageProducer {
         }
     }
 
-    private MessageExt createRocketMQMessage(Message message) throws 
JMSException {
-        AbstractJMSMessage jmsMsg = convert2Object(message, 
AbstractJMSMessage.class);
-        initJMSHeaders(jmsMsg, destination);
+    private MessageExt createRocketMQMessage(Message jmsMsg) throws 
JMSException {
+        AbstractJMSMessage abstractJMSMessage = convert2Object(jmsMsg, 
AbstractJMSMessage.class);
         try {
-            return MessageConverter.convert2RMQMessage(jmsMsg);
+            return JMS2RMQMessageConvert.convert(abstractJMSMessage);
         }
         catch (Exception e) {
-            throw new JMSException(format("Fail to convert to RocketMQ 
message. Error: %s", getStackTrace(e)));
+            throw new JMSException(format("Fail to convert to RocketMQ jmsMsg. 
Error: %s", getStackTrace(e)));
         }
     }
 
-    /**
-     * Init the JmsMessage Headers.
-     * <p/>
-     * <P>JMS providers init message's headers. Do not allow user to set these 
by yourself.
-     *
-     * @param jmsMsg message
-     * @param destination
-     * @throws javax.jms.JMSException
-     * @see <CODE>Destination</CODE>
-     */
-    private void initJMSHeaders(AbstractJMSMessage jmsMsg, Destination 
destination) throws JMSException {
-
-        //JMS_DESTINATION default:"topic:message"
-        jmsMsg.setHeader(JMS_DESTINATION, destination);
-        //JMS_DELIVERY_MODE default : PERSISTENT
-        jmsMsg.setHeader(JMS_DELIVERY_MODE, 
javax.jms.Message.DEFAULT_DELIVERY_MODE);
-        //JMS_TIMESTAMP default : current time
-        jmsMsg.setHeader(JMS_TIMESTAMP, System.currentTimeMillis());
-        //JMS_EXPIRATION default :  3 days
-        //JMS_EXPIRATION = currentTime + time_to_live
-        jmsMsg.setHeader(JMS_EXPIRATION, System.currentTimeMillis() + 
DEFAULT_TIME_TO_LIVE);
-        //JMS_PRIORITY default : 4
-        jmsMsg.setHeader(JMS_PRIORITY, javax.jms.Message.DEFAULT_PRIORITY);
-        //JMS_TYPE default : ons(open notification service)
-        jmsMsg.setHeader(JMS_TYPE, DEFAULT_JMS_TYPE);
-        //JMS_REPLY_TO,JMS_CORRELATION_ID default : null
-        //JMS_MESSAGE_ID is set by sendResult.
-        //JMS_REDELIVERED is set by broker.
-    }
-
     @Override
     public void send(Message message, CompletionListener completionListener) 
throws JMSException {
-        this.send(this.destination, message, DEFAULT_DELIVERY_MODE, 
DEFAULT_PRIORITY, DEFAULT_TIME_TO_LIVE, completionListener);
+        this.send(this.destination, message, getDeliveryMode(), getPriority(), 
getTimeToLive(), completionListener);
     }
 
     @Override
     public void send(Message message, int deliveryMode, int priority, long 
timeToLive,
         CompletionListener completionListener) throws JMSException {
-        this.send(this.destination, message, DEFAULT_DELIVERY_MODE, 
DEFAULT_PRIORITY, DEFAULT_TIME_TO_LIVE, completionListener);
+        this.send(this.destination, message, deliveryMode, priority, 
timeToLive, completionListener);
     }
 
     @Override
     public void send(Destination destination, Message message,
         CompletionListener completionListener) throws JMSException {
-        this.send(destination, message, DEFAULT_DELIVERY_MODE, 
DEFAULT_PRIORITY, DEFAULT_TIME_TO_LIVE, completionListener);
+        this.send(destination, message, getDeliveryMode(), getPriority(), 
getTimeToLive(), completionListener);
     }
 
     @Override
     public void send(Destination destination, Message message, int 
deliveryMode, int priority, long timeToLive,
         CompletionListener completionListener) throws JMSException {
 
-        before(message);
+        sendMessageHook.before(message, destination, deliveryMode, priority, 
timeToLive);
 
         MessageExt rmqMsg = createRocketMQMessage(message);
 
         sendAsync(rmqMsg, completionListener);
     }
 
-    private void before(Message message) throws JMSException {
-        // timestamp
-        if (!getDisableMessageTimestamp()) {
-            message.setJMSTimestamp(System.currentTimeMillis());
-        }
-
-        // messageID is also required in async model, so {@link 
MessageExt#getMsgId()} can't be used.
-        if (!getDisableMessageID()) {
-            message.setJMSMessageID(new 
StringBuffer(MESSAGE_ID_PREFIX).append(UUID.randomUUID().getLeastSignificantBits()).toString());
-        }
-
-        // expiration
-        if (getTimeToLive() != 0) {
-            message.setJMSExpiration(System.currentTimeMillis() + 
getTimeToLive());
-        }
-        else {
-            message.setJMSExpiration(0l);
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java 
b/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
index c14c85d..d5b64d1 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
@@ -20,7 +20,6 @@ package org.apache.rocketmq.jms;
 import com.google.common.base.Preconditions;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -229,16 +228,8 @@ public class RocketMQSession implements Session {
     @Override
     public Topic createTopic(String topicName) throws JMSException {
         Preconditions.checkNotNull(topicName);
-        List<String> msgTuple = Arrays.asList(topicName.split(":"));
 
-        Preconditions.checkState(msgTuple.size() >= 1 && msgTuple.size() <= 2,
-            "Destination must match messageTopic:messageType !");
-
-        //If messageType is null, use * instead.
-        if (1 == msgTuple.size()) {
-            return new RocketMQTopic(topicName);
-        }
-        return new RocketMQTopic(msgTuple.get(0), msgTuple.get(1));
+        return new RocketMQTopic(topicName);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/SendMessageHook.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/SendMessageHook.java 
b/core/src/main/java/org/apache/rocketmq/jms/SendMessageHook.java
deleted file mode 100644
index 0dee423..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/SendMessageHook.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.jms;
-
-import javax.jms.Message;
-
-public class SendMessageHook {
-
-    public void before(Message message) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java 
b/core/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java
new file mode 100644
index 0000000..2e33cd8
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.hook;
+
+import java.util.UUID;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import org.apache.rocketmq.jms.RocketMQProducer;
+
+import static org.apache.rocketmq.jms.Constant.MESSAGE_ID_PREFIX;
+
+public class SendMessageHook {
+
+    private RocketMQProducer producer;
+
+    public SendMessageHook(RocketMQProducer producer) {
+        this.producer = producer;
+    }
+
+    public void before(Message message, Destination destination, int 
deliveryMode, int priority,
+        long timeToLive) throws JMSException {
+        // destination
+        message.setJMSDestination(destination);
+
+        // delivery mode
+        message.setJMSDeliveryMode(deliveryMode);
+
+        // expiration
+        if (timeToLive != 0) {
+            message.setJMSExpiration(System.currentTimeMillis() + timeToLive);
+        }
+        else {
+            message.setJMSExpiration(0L);
+        }
+
+        // delivery time
+        message.setJMSDeliveryTime(message.getJMSTimestamp() + 
this.producer.getDeliveryDelay());
+
+        // priority
+        message.setJMSPriority(priority);
+
+        // messageID is also required in async model, so {@link 
MessageExt#getMsgId()} can't be used.
+        if (!this.producer.getDisableMessageID()) {
+            message.setJMSMessageID(new 
StringBuffer(MESSAGE_ID_PREFIX).append(UUID.randomUUID().getLeastSignificantBits()).toString());
+        }
+
+        // timestamp
+        if (!this.producer.getDisableMessageTimestamp()) {
+            message.setJMSTimestamp(System.currentTimeMillis());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java 
b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java
index d1dd15d..d10247f 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java
@@ -66,7 +66,7 @@ public class JMSMapMessage extends AbstractJMSMessage 
implements MapMessage {
     }
 
     @Override public byte[] getBody() throws JMSException {
-        return new MapSerialize().serialize(this.map);
+        return MapSerialize.instance().serialize(this.map);
     }
 
     @Override public boolean isBodyAssignableTo(Class c) throws JMSException {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java 
b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java
index 239ecc7..4f29d33 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.jms.msg;
 
-import java.io.IOException;
 import java.io.Serializable;
 import javax.jms.JMSException;
 import org.apache.rocketmq.jms.msg.serialize.ObjectSerialize;
@@ -39,12 +38,7 @@ public class JMSObjectMessage extends AbstractJMSMessage 
implements javax.jms.Ob
     }
 
     @Override public byte[] getBody() throws JMSException {
-        try {
-            return ObjectSerialize.serialize(body);
-        }
-        catch (IOException e) {
-            throw new JMSException(e.getMessage());
-        }
+        return ObjectSerialize.instance().serialize(body);
     }
 
     @Override public boolean isBodyAssignableTo(Class c) throws JMSException {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java 
b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java
index 13e344d..5fd67a3 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.jms.msg;
 
 import javax.jms.JMSException;
 import javax.jms.MessageFormatException;
+import org.apache.rocketmq.jms.msg.serialize.StringSerialize;
 
 import static java.lang.String.format;
 
@@ -43,7 +44,7 @@ public class JMSTextMessage extends AbstractJMSMessage 
implements javax.jms.Text
     }
 
     @Override public byte[] getBody() throws JMSException {
-        return new byte[0];
+        return StringSerialize.instance().serialize(this.text);
     }
 
     @Override public boolean isBodyAssignableTo(Class c) throws JMSException {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java
 
b/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java
new file mode 100644
index 0000000..cf7d975
--- /dev/null
+++ 
b/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg.convert;
+
+import java.util.Map;
+import javax.jms.JMSException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.jms.msg.AbstractJMSMessage;
+
+import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSExpiration;
+import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSMessageID;
+import static org.apache.rocketmq.jms.JMSMessageModelEnum.MSG_MODEL_NAME;
+import static org.apache.rocketmq.jms.JMSMessageModelEnum.toMsgModelEnum;
+
+public class JMS2RMQMessageConvert {
+
+    public static MessageExt convert(AbstractJMSMessage jmsMsg) throws 
Exception {
+        MessageExt rmqMsg = new MessageExt();
+
+        handleHeader(jmsMsg, rmqMsg);
+
+        handleBody(jmsMsg, rmqMsg);
+
+        handleProperties(jmsMsg, rmqMsg);
+
+        return rmqMsg;
+    }
+
+    private static void handleHeader(AbstractJMSMessage jmsMsg, MessageExt 
rmqMsg) {
+        rmqMsg.putUserProperty(JMSMessageID.name(), jmsMsg.getJMSMessageID());
+        rmqMsg.setBornTimestamp(jmsMsg.getJMSTimestamp());
+        rmqMsg.putUserProperty(JMSExpiration.name(), 
String.valueOf(jmsMsg.getJMSExpiration()));
+        rmqMsg.setKeys(jmsMsg.getJMSMessageID());
+    }
+
+    private static void handleProperties(AbstractJMSMessage jmsMsg, MessageExt 
rmqMsg) {
+        Map<String, Object> userProps = jmsMsg.getProperties();
+        for (Map.Entry<String, Object> entry : userProps.entrySet()) {
+            rmqMsg.putUserProperty(entry.getKey(), 
entry.getValue().toString());
+        }
+    }
+
+    private static void handleBody(AbstractJMSMessage jmsMsg, MessageExt 
rmqMsg) throws JMSException {
+        rmqMsg.putUserProperty(MSG_MODEL_NAME, toMsgModelEnum(jmsMsg).name());
+        rmqMsg.setBody(jmsMsg.getBody());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java
 
b/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java
new file mode 100644
index 0000000..4fa197b
--- /dev/null
+++ 
b/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg.convert;
+
+import java.util.Map;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.jms.JMSHeaderEnum;
+import org.apache.rocketmq.jms.JMSMessageModelEnum;
+import org.apache.rocketmq.jms.RocketMQTopic;
+import org.apache.rocketmq.jms.msg.AbstractJMSMessage;
+import org.apache.rocketmq.jms.msg.JMSBytesMessage;
+import org.apache.rocketmq.jms.msg.JMSMapMessage;
+import org.apache.rocketmq.jms.msg.JMSObjectMessage;
+import org.apache.rocketmq.jms.msg.JMSTextMessage;
+import org.apache.rocketmq.jms.msg.serialize.MapSerialize;
+import org.apache.rocketmq.jms.msg.serialize.ObjectSerialize;
+import org.apache.rocketmq.jms.msg.serialize.StringSerialize;
+
+import static java.lang.String.format;
+import static org.apache.rocketmq.jms.JMSMessageModelEnum.MSG_MODEL_NAME;
+
+public class RMQ2JMSMessageConvert {
+
+    public static Message convert(MessageExt rmqMsg) throws JMSException {
+        if (rmqMsg == null) {
+            return null;
+        }
+
+        AbstractJMSMessage jmsMsg = 
newAbstractJMSMessage(rmqMsg.getUserProperty(MSG_MODEL_NAME), rmqMsg.getBody());
+
+        setHeader(rmqMsg, jmsMsg);
+
+        setProperties(rmqMsg, jmsMsg);
+
+        return jmsMsg;
+    }
+
+    private static AbstractJMSMessage newAbstractJMSMessage(String msgModel, 
byte[] body) throws JMSException {
+        AbstractJMSMessage message;
+        switch (JMSMessageModelEnum.valueOf(msgModel)) {
+            case BYTE:
+                return new JMSBytesMessage(body);
+            case MAP:
+                message = new 
JMSMapMessage(MapSerialize.instance().deserialize(body));
+                break;
+            case OBJECT:
+                message = new 
JMSObjectMessage(ObjectSerialize.instance().deserialize(body));
+                break;
+            case STRING:
+                message = new 
JMSTextMessage(StringSerialize.instance().deserialize(body));
+                break;
+            default:
+                throw new JMSException(format("The type[%s] is not supported", 
msgModel));
+        }
+
+        return message;
+    }
+
+    private static void setHeader(MessageExt rmqMsg, AbstractJMSMessage 
jmsMsg) {
+        
jmsMsg.setJMSMessageID(rmqMsg.getUserProperty(JMSHeaderEnum.JMSMessageID.name()));
+        jmsMsg.setJMSTimestamp(rmqMsg.getBornTimestamp());
+        
jmsMsg.setJMSExpiration(Long.valueOf(rmqMsg.getUserProperty(JMSHeaderEnum.JMSExpiration.name())));
+        jmsMsg.setJMSRedelivered(rmqMsg.getReconsumeTimes() > 0 ? true : 
false);
+        //todo: what about Queue?
+        jmsMsg.setJMSDestination(new RocketMQTopic(rmqMsg.getTopic()));
+    }
+
+    private static void setProperties(MessageExt rmqMsg, AbstractJMSMessage 
jmsMsg) {
+        Map<String, String> propertiesMap = rmqMsg.getProperties();
+        if (propertiesMap != null) {
+            for (String properName : propertiesMap.keySet()) {
+                String properValue = propertiesMap.get(properName);
+                jmsMsg.setStringProperty(properName, properValue);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java 
b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java
index b979d74..7c7f1ea 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java
@@ -18,16 +18,26 @@
 package org.apache.rocketmq.jms.msg.serialize;
 
 import com.alibaba.fastjson.JSON;
+import java.util.HashMap;
 import java.util.Map;
 import javax.jms.JMSException;
 
 public class MapSerialize implements Serialize<Map> {
 
+    private static MapSerialize ins = new MapSerialize();
+
+    public static MapSerialize instance() {
+        return ins;
+    }
+
     @Override public byte[] serialize(Map map) throws JMSException {
         return JSON.toJSONBytes(map);
     }
 
+    private MapSerialize() {
+    }
+
     @Override public Map deserialize(byte[] bytes) throws JMSException {
-        return JSON.parseObject(bytes, Map.class);
+        return JSON.parseObject(bytes, HashMap.class);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java 
b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java
index 38ffa2d..34e9c22 100644
--- 
a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java
+++ 
b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java
@@ -27,6 +27,15 @@ import javax.jms.JMSException;
 
 public class ObjectSerialize implements Serialize<Object> {
 
+    private static ObjectSerialize ins = new ObjectSerialize();
+
+    public static ObjectSerialize instance() {
+        return ins;
+    }
+
+    private ObjectSerialize() {
+    }
+
     public byte[] serialize(Object object) throws JMSException {
         try {
             ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -47,7 +56,7 @@ public class ObjectSerialize implements Serialize<Object> {
             ObjectInputStream ois = new ObjectInputStream(bais);
             ois.close();
             bais.close();
-            return (Serializable)ois.readObject();
+            return (Serializable) ois.readObject();
         }
         catch (IOException e) {
             throw new JMSException(e.getMessage());

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java 
b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java
index 8e4224f..78a499c 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java
@@ -21,8 +21,6 @@ import javax.jms.JMSException;
 
 public interface Serialize<T> {
 
-    static final byte[] EMPTY_BYTES = new byte[0];
-
     byte[] serialize(T t) throws JMSException;
 
     T deserialize(byte[] bytes) throws JMSException;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java 
b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java
index a6dca20..9ee0d3b 100644
--- 
a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java
+++ 
b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java
@@ -23,8 +23,17 @@ import javax.jms.JMSException;
 
 public class StringSerialize implements Serialize<String> {
 
-    public static final String EMPTY_STRING = "";
-    public static final Charset DEFAULT_CHARSET = Charsets.UTF_8;
+    private static final String EMPTY_STRING = "";
+    private static final byte[] EMPTY_BYTES = new byte[0];
+    private static final Charset DEFAULT_CHARSET = Charsets.UTF_8;
+    private static StringSerialize ins = new StringSerialize();
+
+    public static StringSerialize instance() {
+        return ins;
+    }
+
+    private StringSerialize() {
+    }
 
     @Override public byte[] serialize(String s) throws JMSException {
         if (null == s) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/support/MessageConverter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/support/MessageConverter.java 
b/core/src/main/java/org/apache/rocketmq/jms/support/MessageConverter.java
deleted file mode 100644
index 5c86237..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/support/MessageConverter.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.support;
-
-import com.alibaba.fastjson.JSON;
-import com.google.common.base.Charsets;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.StreamMessage;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.jms.JMSHeaderEnum;
-import org.apache.rocketmq.jms.JMSMessageModelEnum;
-import org.apache.rocketmq.jms.RocketMQTopic;
-import org.apache.rocketmq.jms.msg.AbstractJMSMessage;
-import org.apache.rocketmq.jms.msg.JMSBytesMessage;
-import org.apache.rocketmq.jms.msg.JMSMapMessage;
-import org.apache.rocketmq.jms.msg.JMSObjectMessage;
-import org.apache.rocketmq.jms.msg.JMSTextMessage;
-
-import static java.lang.String.format;
-import static org.apache.rocketmq.jms.JMSMessageModelEnum.MSG_MODEL_NAME;
-
-public class MessageConverter {
-    public static final String EMPTY_STRING = "";
-
-    public static Object getBodyFromJMSMessage(javax.jms.Message jmsMessage) 
throws JMSException {
-        if (jmsMessage == null) {
-            return null;
-        }
-
-        if (StreamMessage.class.isInstance(jmsMessage)) {
-            throw new 
UnsupportedOperationException(StreamMessage.class.getSimpleName() + " is not 
supported");
-        }
-        return jmsMessage.getBody(Object.class);
-    }
-
-    public static Message convert2JMSMessage(MessageExt msg) throws Exception {
-        if (msg == null) {
-            return null;
-        }
-
-        AbstractJMSMessage message;
-        final String msgModel = msg.getUserProperty(MSG_MODEL_NAME);
-        switch (JMSMessageModelEnum.valueOf(msgModel)) {
-            case BYTE:
-                message = new JMSBytesMessage(msg.getBody());
-                break;
-            case MAP:
-                message = new JMSMapMessage(JSON.parseObject(new 
String(msg.getBody()), HashMap.class));
-                break;
-            case OBJECT:
-                message = new 
JMSObjectMessage(objectDeserialize(msg.getBody()));
-                break;
-            case STRING:
-                message = new JMSTextMessage(bytes2String(msg.getBody(), 
Charsets.UTF_8));
-                break;
-            default:
-                throw new JMSException(format("The type[%s] is not supported", 
msgModel));
-        }
-
-        //-------------------------set headers-------------------------
-        
message.setJMSMessageID(msg.getUserProperty(JMSHeaderEnum.JMSMessageID.name()));
-        message.setJMSTimestamp(msg.getBornTimestamp());
-        
message.setJMSExpiration(Long.valueOf(msg.getUserProperty(JMSHeaderEnum.JMSExpiration.name())));
-        message.setJMSRedelivered(msg.getReconsumeTimes() > 0 ? true : false);
-        //todo: what about Queue?
-        message.setJMSDestination(new RocketMQTopic(msg.getTopic()));
-
-        Map<String, String> propertiesMap = msg.getProperties();
-        if (propertiesMap != null) {
-            for (String properName : propertiesMap.keySet()) {
-                String properValue = propertiesMap.get(properName);
-                message.setStringProperty(properName, properValue);
-            }
-        }
-
-        return message;
-    }
-
-    public static final String bytes2String(byte[] bs, Charset charset) {
-        if (null == bs) {
-            return EMPTY_STRING;
-        }
-        String s = null;
-        try {
-            s = new String(bs, charset);
-        }
-        catch (Exception e) {
-            // ignore
-        }
-        return s;
-    }
-
-    public static MessageExt convert2RMQMessage(AbstractJMSMessage jmsMsg) 
throws Exception {
-        MessageExt rmqMsg = new MessageExt();
-
-        rmqMsg.putUserProperty(JMSHeaderEnum.JMSMessageID.name(), 
jmsMsg.getJMSMessageID());
-        rmqMsg.setBornTimestamp(jmsMsg.getJMSTimestamp());
-        rmqMsg.putUserProperty(JMSHeaderEnum.JMSExpiration.name(), 
String.valueOf(jmsMsg.getJMSExpiration()));
-        rmqMsg.setKeys(jmsMsg.getJMSMessageID());
-
-        // 1. Transform message body
-        rmqMsg.setBody(MessageConverter.getBodyFromJMSMessage(jmsMsg));
-
-        // 2. Transform message properties
-        Properties properties = getAllProperties(jmsMsg);
-        for (String name : properties.stringPropertyNames()) {
-            String value = properties.getProperty(name);
-            rmqMsg.putUserProperty(name, value);
-        }
-
-        return rmqMsg;
-    }
-
-    private static Properties getAllProperties(AbstractJMSMessage jmsMsg) 
throws JMSException {
-        Properties userProperties = new Properties();
-
-        Map<String, Object> userProps = jmsMsg.getProperties();
-        Iterator<Map.Entry<String, Object>> userPropsIter = 
userProps.entrySet().iterator();
-        while (userPropsIter.hasNext()) {
-            Map.Entry<String, Object> entry = userPropsIter.next();
-            userProperties.setProperty(entry.getKey(), 
entry.getValue().toString());
-        }
-
-        //Jms message Model
-        userProperties.setProperty(MSG_MODEL_NAME, 
JMSMessageModelEnum.toMsgModelEnum(jmsMsg.getClass()).name())
-
-        return userProperties;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessageTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessageTest.java 
b/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessageTest.java
index 7664c08..723ca1a 100644
--- 
a/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessageTest.java
+++ 
b/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessageTest.java
@@ -32,11 +32,11 @@ public class RocketMQBytesMessageTest {
     @Test
     public void testGetData() throws Exception {
         JMSBytesMessage readMessage = new JMSBytesMessage(receiveData);
-        assertThat(new String(receiveData), is(new 
String(readMessage.getData())));
+        assertThat(new String(receiveData), is(new 
String(readMessage.getBody())));
 
         JMSBytesMessage sendMessage = new JMSBytesMessage();
         sendMessage.writeBytes(sendData, 0, sendData.length);
-        assertThat(new String(sendData), is(new 
String(sendMessage.getData())));
+        assertThat(new String(sendData), is(new 
String(sendMessage.getBody())));
     }
 
     @Test
@@ -72,7 +72,7 @@ public class RocketMQBytesMessageTest {
     public void testWriteBytes() throws Exception {
         JMSBytesMessage msg = new JMSBytesMessage();
         msg.writeBytes(sendData);
-        assertThat(new String(msg.getData()), is(new String(sendData)));
+        assertThat(new String(msg.getBody()), is(new String(sendData)));
     }
 
     @Test(expected = MessageNotReadableException.class)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/test/java/org/apache/rocketmq/jms/support/MessageConvertTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/rocketmq/jms/support/MessageConvertTest.java 
b/core/src/test/java/org/apache/rocketmq/jms/support/MessageConvertTest.java
deleted file mode 100644
index 6ac7acd..0000000
--- a/core/src/test/java/org/apache/rocketmq/jms/support/MessageConvertTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.jms.support;
-
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.jms.RocketMQTopic;
-import org.apache.rocketmq.jms.msg.AbstractJMSMessage;
-import org.apache.rocketmq.jms.msg.JMSTextMessage;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.apache.rocketmq.jms.Constant.JMS_DESTINATION;
-import static org.apache.rocketmq.jms.Constant.JMS_MESSAGE_ID;
-import static org.apache.rocketmq.jms.Constant.JMS_REDELIVERED;
-import static org.apache.rocketmq.jms.support.MessageConverter.JMS_MSGMODEL;
-import static org.apache.rocketmq.jms.support.MessageConverter.MSGMODEL_TEXT;
-import static org.apache.rocketmq.jms.support.MessageConverter.MSG_TOPIC;
-import static org.apache.rocketmq.jms.support.MessageConverter.MSG_TYPE;
-
-public class MessageConvertTest {
-    @Test
-    public void testCovert2RMQ() throws Exception {
-        //build RmqJmsMessage
-        String topic = "TestTopic";
-        String messageType = "TagA";
-
-        AbstractJMSMessage rmqJmsMessage = new JMSTextMessage("testText");
-        rmqJmsMessage.setHeader(JMS_DESTINATION, new RocketMQTopic(topic, 
messageType));
-        rmqJmsMessage.setHeader(JMS_MESSAGE_ID, "ID:null");
-        rmqJmsMessage.setHeader(JMS_REDELIVERED, Boolean.FALSE);
-
-        rmqJmsMessage.setObjectProperty(JMS_MSGMODEL, MSGMODEL_TEXT);
-        rmqJmsMessage.setObjectProperty(MSG_TOPIC, topic);
-        rmqJmsMessage.setObjectProperty(MSG_TYPE, messageType);
-        rmqJmsMessage.setObjectProperty(MessageConst.PROPERTY_TAGS, 
messageType);
-        rmqJmsMessage.setObjectProperty(MessageConst.PROPERTY_KEYS, 
messageType);
-
-        //convert to RMQMessagemiz
-        MessageExt message = 
(MessageExt)MessageConverter.convert2RMQMessage(rmqJmsMessage);
-
-        //then convert back to RmqJmsMessage
-        AbstractJMSMessage RmqJmsMessageBack = 
MessageConverter.convert2JMSMessage(message);
-
-        JMSTextMessage jmsTextMessage = (JMSTextMessage) rmqJmsMessage;
-        JMSTextMessage jmsTextMessageBack = (JMSTextMessage) RmqJmsMessageBack;
-
-        Assert.assertEquals(jmsTextMessage.getText(), 
jmsTextMessageBack.getText());
-        Assert.assertEquals(jmsTextMessage.getJMSDestination().toString(), 
jmsTextMessageBack.getJMSDestination().toString());
-        Assert.assertEquals(jmsTextMessage.getJMSMessageID(), 
jmsTextMessageBack.getJMSMessageID());
-        Assert.assertEquals(jmsTextMessage.getJMSRedelivered(), 
jmsTextMessageBack.getJMSRedelivered());
-        Assert.assertEquals(jmsTextMessage.getHeaders().get(JMS_MSGMODEL), 
jmsTextMessageBack.getHeaders().get(JMS_MSGMODEL));
-        Assert.assertEquals(jmsTextMessage.getHeaders().get(MSG_TOPIC), 
jmsTextMessageBack.getHeaders().get(MSG_TOPIC));
-        Assert.assertEquals(jmsTextMessage.getHeaders().get(MSG_TYPE), 
jmsTextMessageBack.getHeaders().get(MSG_TYPE));
-        
Assert.assertEquals(jmsTextMessage.getHeaders().get(MessageConst.PROPERTY_TAGS),
 jmsTextMessageBack.getHeaders().get(MessageConst.PROPERTY_TAGS));
-        
Assert.assertEquals(jmsTextMessage.getHeaders().get(MessageConst.PROPERTY_KEYS),
 jmsTextMessageBack.getHeaders().get(MessageConst.PROPERTY_KEYS));
-
-    }
-}

Reply via email to