http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java
new file mode 100644
index 0000000..4bdf58b
--- /dev/null
+++ 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+import javax.jms.IllegalStateRuntimeException;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+import org.apache.rocketmq.jms.support.PrimitiveTypeCast;
+
+import static java.lang.String.format;
+
+/**
+ * RocketMQ ByteMessage.
+ */
+public class JMSBytesMessage extends AbstractJMSMessage implements 
javax.jms.BytesMessage {
+
+    private byte[] bytesIn;
+    private DataInputStream dataAsInput;
+
+    private ByteArrayOutputStream bytesOut;
+    private DataOutputStream dataAsOutput;
+
+    protected boolean readOnly;
+
+    /**
+     * Message created for reading
+     *
+     * @param data to construct this object
+     */
+    public JMSBytesMessage(byte[] data) {
+        this.bytesIn = data;
+        this.dataAsInput = new DataInputStream(new ByteArrayInputStream(data, 
0, data.length));
+        this.readOnly = true;
+        this.writeOnly = false;
+    }
+
+    /**
+     * Message created to be sent
+     */
+    public JMSBytesMessage() {
+        this.bytesOut = new ByteArrayOutputStream();
+        this.dataAsOutput = new DataOutputStream(this.bytesOut);
+        this.readOnly = false;
+        this.writeOnly = true;
+    }
+
+    @Override public byte[] getBody(Class clazz) throws JMSException {
+        byte[] result;
+        if (isBodyAssignableTo(clazz)) {
+            if (isWriteOnly()) {
+                result = bytesOut.toByteArray();
+                this.reset();
+                return result;
+            }
+            else if (isReadOnly()) {
+                result = Arrays.copyOf(bytesIn, bytesIn.length);
+                this.reset();
+                return result;
+            }
+            else {
+                throw new IllegalStateRuntimeException("Message must be in 
write only or read only status");
+            }
+        }
+
+        throw new MessageFormatException(format("The type[%s] can't be casted 
to byte[]", clazz.toString()));
+    }
+
+    @Override public byte[] getBody() throws JMSException {
+        return getBody(byte[].class);
+    }
+
+    @Override public boolean isBodyAssignableTo(Class c) throws JMSException {
+        return byte[].class.isAssignableFrom(c);
+    }
+
+    @Override public long getBodyLength() throws JMSException {
+        if (isWriteOnly()) {
+            return bytesOut.size();
+        }
+        else if (isReadOnly()) {
+            return bytesIn.length;
+        }
+        else {
+            throw new IllegalStateRuntimeException("Message must be in write 
only or read only status");
+        }
+    }
+
+    public boolean readBoolean() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readBoolean();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    private void checkIsReadOnly() throws MessageNotReadableException {
+        if (!isReadOnly()) {
+            throw new MessageNotReadableException("Not readable");
+        }
+        if (dataAsInput == null) {
+            throw new MessageNotReadableException("No data to read");
+        }
+    }
+
+    public byte readByte() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readByte();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public int readUnsignedByte() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readUnsignedByte();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public short readShort() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readShort();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public int readUnsignedShort() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readUnsignedShort();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public char readChar() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readChar();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public int readInt() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readInt();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public long readLong() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readLong();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public float readFloat() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readFloat();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public double readDouble() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readDouble();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public String readUTF() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readUTF();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public int readBytes(byte[] value) throws JMSException {
+        checkIsReadOnly();
+
+        return readBytes(value, value.length);
+    }
+
+    public int readBytes(byte[] value, int length) throws JMSException {
+        checkIsReadOnly();
+
+        if (length > value.length) {
+            throw new IndexOutOfBoundsException("length must be smaller than 
the length of value");
+        }
+        if (dataAsInput == null) {
+            throw new MessageNotReadableException("Message is not readable! ");
+        }
+        try {
+            int offset = 0;
+            while (offset < length) {
+                int read = dataAsInput.read(value, offset, length - offset);
+                if (read < 0) {
+                    break;
+                }
+                offset += read;
+            }
+
+            if (offset == 0 && length != 0) {
+                return -1;
+            }
+            else {
+                return offset;
+            }
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+
+    }
+
+    public void writeBoolean(boolean value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        try {
+            dataAsOutput.writeBoolean(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    private void initializeWriteIfNecessary() {
+        if (bytesOut == null) {
+            bytesOut = new ByteArrayOutputStream();
+        }
+        if (dataAsOutput == null) {
+            dataAsOutput = new DataOutputStream(bytesOut);
+        }
+    }
+
+    public void writeByte(byte value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        try {
+            dataAsOutput.writeByte(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeShort(short value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        try {
+            dataAsOutput.writeShort(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeChar(char value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        try {
+            dataAsOutput.writeChar(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeInt(int value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        try {
+            dataAsOutput.writeInt(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeLong(long value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        try {
+            dataAsOutput.writeLong(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeFloat(float value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        try {
+            dataAsOutput.writeFloat(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeDouble(double value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        try {
+            dataAsOutput.writeDouble(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeUTF(String value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        try {
+            dataAsOutput.writeUTF(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeBytes(byte[] value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        if (dataAsOutput == null) {
+            throw new MessageNotWriteableException("Message is not writable! 
");
+        }
+        try {
+            dataAsOutput.write(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeBytes(byte[] value, int offset, int length) throws 
JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        if (dataAsOutput == null) {
+            throw new MessageNotWriteableException("Message is not writable! 
");
+        }
+        try {
+            dataAsOutput.write(value, offset, length);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeObject(Object value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        if (!PrimitiveTypeCast.isPrimitiveType(value)) {
+            throw new JMSException("Object must be primitive type");
+        }
+
+        try {
+            dataAsOutput.writeBytes(String.valueOf(value));
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void reset() throws JMSException {
+        try {
+            if (bytesOut != null) {
+                bytesOut.reset();
+            }
+            if (this.dataAsInput != null) {
+                this.dataAsInput.reset();
+            }
+
+            this.readOnly = true;
+        }
+        catch (IOException e) {
+            throw new JMSException(e.getMessage());
+        }
+    }
+
+    @Override public void clearBody() {
+        super.clearBody();
+        this.bytesOut = null;
+        this.dataAsOutput = null;
+        this.dataAsInput = null;
+        this.bytesIn = null;
+    }
+
+    private JMSException handleOutputException(final IOException e) {
+        JMSException ex = new JMSException(e.getMessage());
+        ex.initCause(e);
+        ex.setLinkedException(e);
+        return ex;
+    }
+
+    private JMSException handleInputException(final IOException e) {
+        JMSException ex;
+        if (e instanceof EOFException) {
+            ex = new MessageEOFException(e.getMessage());
+        }
+        else {
+            ex = new MessageFormatException(e.getMessage());
+        }
+        ex.initCause(e);
+        ex.setLinkedException(e);
+        return ex;
+    }
+
+    protected boolean isReadOnly() {
+        return readOnly;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java
new file mode 100644
index 0000000..3524d50
--- /dev/null
+++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg;
+
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotWriteableException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.jms.msg.serialize.MapSerialize;
+
+import static java.lang.String.format;
+import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Boolean;
+import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Byte;
+import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2ByteArray;
+import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Char;
+import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Double;
+import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Float;
+import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Int;
+import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Long;
+import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Short;
+import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2String;
+
+/**
+ * Message can only be accessed by a thread at a time.
+ */
+public class JMSMapMessage extends AbstractJMSMessage implements MapMessage {
+
+    private Map<String, Object> map;
+
+    protected boolean readOnly;
+
+    public JMSMapMessage(Map<String, Object> map) {
+        this.map = map;
+    }
+
+    public JMSMapMessage() {
+        this.map = new HashMap();
+    }
+
+    @Override public Map<String, Object> getBody(Class clazz) throws 
JMSException {
+        if (isBodyAssignableTo(clazz)) {
+            return this.map;
+        }
+
+        throw new MessageFormatException(format("The type[%s] can't be casted 
to byte[]", clazz.toString()));
+    }
+
+    @Override public byte[] getBody() throws JMSException {
+        return MapSerialize.instance().serialize(this.map);
+    }
+
+    @Override public boolean isBodyAssignableTo(Class c) throws JMSException {
+        return Map.class.isAssignableFrom(c);
+    }
+
+    @Override public boolean getBoolean(String name) throws JMSException {
+        checkName(name);
+
+        return cast2Boolean(map.get(name));
+    }
+
+    private void checkName(String name) throws JMSException {
+        if (StringUtils.isBlank(name)) {
+            throw new JMSException("Name is required");
+        }
+    }
+
+    @Override public byte getByte(String name) throws JMSException {
+        checkName(name);
+
+        return cast2Byte(map.get(name));
+    }
+
+    @Override public short getShort(String name) throws JMSException {
+        checkName(name);
+
+        return cast2Short(map.get(name));
+    }
+
+    @Override public char getChar(String name) throws JMSException {
+        checkName(name);
+
+        return cast2Char(map.get(name));
+    }
+
+    @Override public int getInt(String name) throws JMSException {
+        checkName(name);
+
+        return cast2Int(map.get(name));
+    }
+
+    @Override public long getLong(String name) throws JMSException {
+        checkName(name);
+
+        return cast2Long(map.get(name));
+    }
+
+    @Override public float getFloat(String name) throws JMSException {
+        checkName(name);
+
+        return cast2Float(map.get(name));
+    }
+
+    @Override public double getDouble(String name) throws JMSException {
+        checkName(name);
+
+        return cast2Double(map.get(name));
+    }
+
+    @Override public String getString(String name) throws JMSException {
+        checkName(name);
+
+        return cast2String(map.get(name));
+    }
+
+    @Override public byte[] getBytes(String name) throws JMSException {
+        checkName(name);
+
+        return cast2ByteArray(map.get(name));
+    }
+
+    @Override public Object getObject(String name) throws JMSException {
+        checkName(name);
+
+        return map.get(name);
+    }
+
+    @Override public Enumeration getMapNames() throws JMSException {
+        return Collections.enumeration(map.keySet());
+    }
+
+    @Override public void setBoolean(String name, boolean value) throws 
JMSException {
+        putProperty(name, value);
+    }
+
+    private void putProperty(String name, Object obj) throws JMSException {
+        if (isReadOnly()) {
+            throw new MessageNotWriteableException("Message is not writable");
+        }
+
+        checkName(name);
+
+        map.put(name, obj);
+    }
+
+    @Override public void setByte(String name, byte value) throws JMSException 
{
+        putProperty(name, value);
+    }
+
+    @Override public void setShort(String name, short value) throws 
JMSException {
+        putProperty(name, value);
+    }
+
+    @Override public void setChar(String name, char value) throws JMSException 
{
+        putProperty(name, value);
+    }
+
+    @Override public void setInt(String name, int value) throws JMSException {
+        putProperty(name, value);
+    }
+
+    @Override public void setLong(String name, long value) throws JMSException 
{
+        putProperty(name, value);
+    }
+
+    @Override public void setFloat(String name, float value) throws 
JMSException {
+        putProperty(name, value);
+    }
+
+    @Override public void setDouble(String name, double value) throws 
JMSException {
+        putProperty(name, value);
+    }
+
+    @Override public void setString(String name, String value) throws 
JMSException {
+        putProperty(name, value);
+    }
+
+    @Override public void setBytes(String name, byte[] value) throws 
JMSException {
+        putProperty(name, value);
+    }
+
+    @Override public void setBytes(String name, byte[] value, int offset, int 
length) throws JMSException {
+        putProperty(name, value);
+    }
+
+    @Override public void setObject(String name, Object value) throws 
JMSException {
+        putProperty(name, value);
+    }
+
+    @Override public boolean itemExists(String name) throws JMSException {
+        checkName(name);
+
+        return map.containsKey(name);
+    }
+
+    @Override public void clearBody() {
+        super.clearBody();
+        this.map.clear();
+        this.readOnly = false;
+    }
+
+    protected boolean isReadOnly() {
+        return this.readOnly;
+    }
+
+    public void setReadOnly(boolean readOnly) {
+        this.readOnly = readOnly;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java
new file mode 100644
index 0000000..4f29d33
--- /dev/null
+++ 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg;
+
+import java.io.Serializable;
+import javax.jms.JMSException;
+import org.apache.rocketmq.jms.msg.serialize.ObjectSerialize;
+
+public class JMSObjectMessage extends AbstractJMSMessage implements 
javax.jms.ObjectMessage {
+
+    private Serializable body;
+
+    public JMSObjectMessage(Serializable object) {
+        this.body = object;
+    }
+
+    public JMSObjectMessage() {
+
+    }
+
+    @Override public Serializable getBody(Class clazz) throws JMSException {
+        return body;
+    }
+
+    @Override public byte[] getBody() throws JMSException {
+        return ObjectSerialize.instance().serialize(body);
+    }
+
+    @Override public boolean isBodyAssignableTo(Class c) throws JMSException {
+        return true;
+    }
+
+    public Serializable getObject() throws JMSException {
+        return this.body;
+    }
+
+    public void setObject(Serializable object) throws JMSException {
+        this.body = object;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java
new file mode 100644
index 0000000..5fd67a3
--- /dev/null
+++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import org.apache.rocketmq.jms.msg.serialize.StringSerialize;
+
+import static java.lang.String.format;
+
+public class JMSTextMessage extends AbstractJMSMessage implements 
javax.jms.TextMessage {
+
+    private String text;
+
+    public JMSTextMessage() {
+
+    }
+
+    public JMSTextMessage(String text) {
+        setText(text);
+    }
+
+    @Override public String getBody(Class clazz) throws JMSException {
+        if (isBodyAssignableTo(clazz)) {
+            return text;
+        }
+
+        throw new MessageFormatException(format("The type[%s] can't be casted 
to byte[]", clazz.toString()));
+    }
+
+    @Override public byte[] getBody() throws JMSException {
+        return StringSerialize.instance().serialize(this.text);
+    }
+
+    @Override public boolean isBodyAssignableTo(Class c) throws JMSException {
+        return String.class.isAssignableFrom(c);
+    }
+
+    public void clearBody() {
+        super.clearBody();
+        this.text = null;
+    }
+
+    public String getText() throws JMSException {
+        return this.text;
+    }
+
+    public void setText(String text) {
+        this.text = text;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java
 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java
new file mode 100644
index 0000000..ca2cbed
--- /dev/null
+++ 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg.convert;
+
+import java.util.Map;
+import javax.jms.JMSException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.jms.msg.AbstractJMSMessage;
+import org.apache.rocketmq.jms.support.JMSUtils;
+
+import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSExpiration;
+import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSMessageID;
+import static 
org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum.MSG_MODEL_NAME;
+import static 
org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum.toMsgModelEnum;
+
+public class JMS2RMQMessageConvert {
+
+    public static final String USER_PROPERTY_PREFIX = "USER:";
+
+    public static MessageExt convert(AbstractJMSMessage jmsMsg) throws 
Exception {
+        MessageExt rmqMsg = new MessageExt();
+
+        handleHeader(jmsMsg, rmqMsg);
+
+        handleBody(jmsMsg, rmqMsg);
+
+        handleProperties(jmsMsg, rmqMsg);
+
+        return rmqMsg;
+    }
+
+    private static void handleHeader(AbstractJMSMessage jmsMsg, MessageExt 
rmqMsg) {
+        
rmqMsg.setTopic(JMSUtils.getDestinationName(jmsMsg.getJMSDestination()));
+        rmqMsg.putUserProperty(JMSMessageID.name(), jmsMsg.getJMSMessageID());
+        rmqMsg.setBornTimestamp(jmsMsg.getJMSTimestamp());
+        rmqMsg.putUserProperty(JMSExpiration.name(), 
String.valueOf(jmsMsg.getJMSExpiration()));
+        rmqMsg.setKeys(jmsMsg.getJMSMessageID());
+    }
+
+    private static void handleProperties(AbstractJMSMessage jmsMsg, MessageExt 
rmqMsg) {
+        Map<String, Object> userProps = jmsMsg.getProperties();
+        for (Map.Entry<String, Object> entry : userProps.entrySet()) {
+            rmqMsg.putUserProperty(new 
StringBuffer(USER_PROPERTY_PREFIX).append(entry.getKey()).toString(), 
entry.getValue().toString());
+        }
+    }
+
+    private static void handleBody(AbstractJMSMessage jmsMsg, MessageExt 
rmqMsg) throws JMSException {
+        rmqMsg.putUserProperty(MSG_MODEL_NAME, toMsgModelEnum(jmsMsg).name());
+        rmqMsg.setBody(jmsMsg.getBody());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java
 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java
new file mode 100644
index 0000000..4adb692
--- /dev/null
+++ 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg.convert;
+
+import java.util.Map;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.jms.destination.RocketMQTopic;
+import org.apache.rocketmq.jms.msg.AbstractJMSMessage;
+import org.apache.rocketmq.jms.msg.JMSBytesMessage;
+import org.apache.rocketmq.jms.msg.JMSMapMessage;
+import org.apache.rocketmq.jms.msg.JMSObjectMessage;
+import org.apache.rocketmq.jms.msg.JMSTextMessage;
+import org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum;
+import org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum;
+import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum;
+import org.apache.rocketmq.jms.msg.serialize.MapSerialize;
+import org.apache.rocketmq.jms.msg.serialize.ObjectSerialize;
+import org.apache.rocketmq.jms.msg.serialize.StringSerialize;
+
+import static java.lang.String.format;
+import static 
org.apache.rocketmq.jms.msg.convert.JMS2RMQMessageConvert.USER_PROPERTY_PREFIX;
+import static 
org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum.MSG_MODEL_NAME;
+
+public class RMQ2JMSMessageConvert {
+
+    public static Message convert(MessageExt rmqMsg) throws JMSException {
+        if (rmqMsg == null) {
+            throw new IllegalArgumentException("RocketMQ message could not be 
null");
+        }
+        if (rmqMsg.getBody() == null) {
+            throw new IllegalArgumentException("RocketMQ message body could 
not be null");
+        }
+
+        AbstractJMSMessage jmsMsg = 
newAbstractJMSMessage(rmqMsg.getUserProperty(MSG_MODEL_NAME), rmqMsg.getBody());
+
+        setHeader(rmqMsg, jmsMsg);
+
+        setProperties(rmqMsg, jmsMsg);
+
+        return jmsMsg;
+    }
+
+    private static AbstractJMSMessage newAbstractJMSMessage(String msgModel, 
byte[] body) throws JMSException {
+        AbstractJMSMessage message;
+        switch (JMSMessageModelEnum.valueOf(msgModel)) {
+            case BYTE:
+                return new JMSBytesMessage(body);
+            case MAP:
+                message = new 
JMSMapMessage(MapSerialize.instance().deserialize(body));
+                break;
+            case OBJECT:
+                message = new 
JMSObjectMessage(ObjectSerialize.instance().deserialize(body));
+                break;
+            case STRING:
+                message = new 
JMSTextMessage(StringSerialize.instance().deserialize(body));
+                break;
+            default:
+                throw new JMSException(format("The type[%s] is not supported", 
msgModel));
+        }
+
+        return message;
+    }
+
+    private static void setHeader(MessageExt rmqMsg, AbstractJMSMessage 
jmsMsg) {
+        
jmsMsg.setJMSMessageID(rmqMsg.getUserProperty(JMSHeaderEnum.JMSMessageID.name()));
+        jmsMsg.setJMSTimestamp(rmqMsg.getBornTimestamp());
+        
jmsMsg.setJMSExpiration(Long.valueOf(rmqMsg.getUserProperty(JMSHeaderEnum.JMSExpiration.name())));
+        jmsMsg.setJMSRedelivered(rmqMsg.getReconsumeTimes() > 0 ? true : 
false);
+        //todo: what about Queue?
+        jmsMsg.setJMSDestination(new RocketMQTopic(rmqMsg.getTopic()));
+    }
+
+    private static void setProperties(MessageExt rmqMsg, AbstractJMSMessage 
jmsMsg) {
+        jmsMsg.setIntProperty(JMSPropertiesEnum.JMSXDeliveryCount.name(), 
rmqMsg.getReconsumeTimes() + 1);
+
+        Map<String, String> propertiesMap = rmqMsg.getProperties();
+        if (propertiesMap != null) {
+            for (String properName : propertiesMap.keySet()) {
+                if (properName.startsWith(USER_PROPERTY_PREFIX)) {
+                    String properValue = propertiesMap.get(properName);
+                    
jmsMsg.setStringProperty(properName.substring(USER_PROPERTY_PREFIX.length()), 
properValue);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSHeaderEnum.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSHeaderEnum.java
 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSHeaderEnum.java
new file mode 100644
index 0000000..cb27675
--- /dev/null
+++ 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSHeaderEnum.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg.enums;
+
+import javax.jms.Message;
+
+public enum JMSHeaderEnum {
+
+    JMSDestination,
+    JMSDeliveryMode,
+    JMSMessageID,
+    JMSTimestamp,
+    JMSCorrelationID,
+    JMSReplyTo,
+    JMSRedelivered,
+    JMSType,
+    JMSExpiration,
+    JMSPriority,
+    JMSDeliveryTime;
+
+    public static final int JMS_DELIVERY_MODE_DEFAULT_VALUE = 
Message.DEFAULT_DELIVERY_MODE;
+    public static final long JMS_TIME_TO_LIVE_DEFAULT_VALUE = 
Message.DEFAULT_TIME_TO_LIVE;
+    public static final int JMS_PRIORITY_DEFAULT_VALUE = 
Message.DEFAULT_PRIORITY;
+    public static final long JMS_DELIVERY_TIME_DEFAULT_VALUE = 
Message.DEFAULT_DELIVERY_DELAY;
+    public static final boolean JMS_REDELIVERED_DEFAULT_VALUE = false;
+    public static final int JMS_TIMESTAMP_DEFAULT_VALUE = 0;
+    public static final int JMS_EXPIRATION_DEFAULT_VALUE = 0;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java
 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java
new file mode 100644
index 0000000..f7dc15a
--- /dev/null
+++ 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg.enums;
+
+import org.apache.rocketmq.jms.msg.AbstractJMSMessage;
+import org.apache.rocketmq.jms.msg.JMSBytesMessage;
+import org.apache.rocketmq.jms.msg.JMSMapMessage;
+import org.apache.rocketmq.jms.msg.JMSObjectMessage;
+import org.apache.rocketmq.jms.msg.JMSTextMessage;
+
+public enum JMSMessageModelEnum {
+    BYTE(JMSBytesMessage.class),
+    MAP(JMSMapMessage.class),
+    OBJECT(JMSObjectMessage.class),
+    STRING(JMSTextMessage.class);
+
+    public static final String MSG_MODEL_NAME = "MsgModel";
+
+    private Class jmsClass;
+
+    JMSMessageModelEnum(Class jmsClass) {
+        this.jmsClass = jmsClass;
+    }
+
+    public static JMSMessageModelEnum toMsgModelEnum(AbstractJMSMessage 
jmsMsg) {
+        for (JMSMessageModelEnum e : values()) {
+            if (e.getJmsClass().isInstance(jmsMsg)) {
+                return e;
+            }
+        }
+
+        throw new IllegalArgumentException(String.format("Not supported 
class[%s]", jmsMsg.getClass().getSimpleName()));
+    }
+
+    public Class getJmsClass() {
+        return jmsClass;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java
 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java
new file mode 100644
index 0000000..dd5955b
--- /dev/null
+++ 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg.enums;
+
+public enum JMSPropertiesEnum {
+    JMSXUserID,
+    JMSXDeliveryCount,
+    JMSXGroupID,
+    JMSXGroupSeq,
+    JMSXRcvTimestamp
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java
 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java
new file mode 100644
index 0000000..7c7f1ea
--- /dev/null
+++ 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg.serialize;
+
+import com.alibaba.fastjson.JSON;
+import java.util.HashMap;
+import java.util.Map;
+import javax.jms.JMSException;
+
+public class MapSerialize implements Serialize<Map> {
+
+    private static MapSerialize ins = new MapSerialize();
+
+    public static MapSerialize instance() {
+        return ins;
+    }
+
+    @Override public byte[] serialize(Map map) throws JMSException {
+        return JSON.toJSONBytes(map);
+    }
+
+    private MapSerialize() {
+    }
+
+    @Override public Map deserialize(byte[] bytes) throws JMSException {
+        return JSON.parseObject(bytes, HashMap.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java
 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java
new file mode 100644
index 0000000..a685808
--- /dev/null
+++ 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg.serialize;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import javax.jms.JMSException;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
+public class ObjectSerialize implements Serialize<Object> {
+
+    private static ObjectSerialize ins = new ObjectSerialize();
+
+    public static ObjectSerialize instance() {
+        return ins;
+    }
+
+    private ObjectSerialize() {
+    }
+
+    public byte[] serialize(Object object) throws JMSException {
+        try {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(object);
+            oos.close();
+            baos.close();
+            return baos.toByteArray();
+        }
+        catch (IOException e) {
+            throw new JMSException(ExceptionUtils.getStackTrace(e));
+        }
+    }
+
+    public Serializable deserialize(byte[] bytes) throws JMSException {
+        try {
+            ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+            ObjectInputStream ois = new ObjectInputStream(bais);
+            ois.close();
+            bais.close();
+            return (Serializable) ois.readObject();
+        }
+        catch (IOException e) {
+            throw new JMSException(e.getMessage());
+        }
+        catch (ClassNotFoundException e) {
+            throw new JMSException(e.getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java
 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java
new file mode 100644
index 0000000..78a499c
--- /dev/null
+++ 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg.serialize;
+
+import javax.jms.JMSException;
+
+public interface Serialize<T> {
+
+    byte[] serialize(T t) throws JMSException;
+
+    T deserialize(byte[] bytes) throws JMSException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java
 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java
new file mode 100644
index 0000000..b6119a5
--- /dev/null
+++ 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg.serialize;
+
+import javax.jms.JMSException;
+import org.apache.rocketmq.jms.support.JMSUtils;
+
+public class StringSerialize implements Serialize<String> {
+
+    private static final String EMPTY_STRING = "";
+    private static final byte[] EMPTY_BYTES = new byte[0];
+    private static StringSerialize ins = new StringSerialize();
+
+    public static StringSerialize instance() {
+        return ins;
+    }
+
+    private StringSerialize() {
+    }
+
+    @Override public byte[] serialize(String s) throws JMSException {
+        if (null == s) {
+            return EMPTY_BYTES;
+        }
+        return JMSUtils.string2Bytes(s);
+    }
+
+    @Override public String deserialize(byte[] bytes) throws JMSException {
+        if (null == bytes) {
+            return EMPTY_STRING;
+        }
+        return JMSUtils.bytes2String(bytes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java
new file mode 100644
index 0000000..67c54e9
--- /dev/null
+++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.support;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.Charset;
+import java.util.UUID;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.rocketmq.jms.RocketMQConsumer;
+
+public class JMSUtils {
+
+    public static final String  DEFAULT_CHARSET = "UTF-8";
+
+    public static String getDestinationName(Destination destination) {
+        try {
+            String topicName;
+            if (destination instanceof Topic) {
+                topicName = ((Topic) destination).getTopicName();
+            }
+            else if (destination instanceof Queue) {
+                topicName = ((Queue) destination).getQueueName();
+            }
+            else {
+                throw new JMSException(String.format("Unsupported Destination 
type:", destination.getClass()));
+            }
+            return topicName;
+        }
+        catch (JMSException e) {
+            throw new JMSRuntimeException(e.getMessage());
+        }
+    }
+
+    public static String getConsumerGroup(RocketMQConsumer consumer) {
+        try {
+            return getConsumerGroup(consumer.getSubscriptionName(),
+                consumer.getSession().getConnection().getClientID(),
+                consumer.isShared()
+            );
+        }
+        catch (JMSException e) {
+            throw new JMSRuntimeException(ExceptionUtils.getStackTrace(e));
+        }
+    }
+
+    public static String getConsumerGroup(String subscriptionName, String 
clientID, boolean shared) {
+        StringBuffer consumerGroup = new StringBuffer();
+
+        if (StringUtils.isNotBlank(subscriptionName)) {
+            consumerGroup.append(subscriptionName);
+        }
+
+        if (StringUtils.isNotBlank(clientID)) {
+            if (consumerGroup.length() != 0) {
+                consumerGroup.append("-");
+            }
+            consumerGroup.append(clientID);
+        }
+
+        if (shared) {
+            if (consumerGroup.length() != 0) {
+                consumerGroup.append("-");
+            }
+            consumerGroup.append(uuid());
+        }
+
+        if (consumerGroup.length() == 0) {
+            consumerGroup.append(uuid());
+        }
+
+        return consumerGroup.toString();
+    }
+
+    public static String uuid() {
+        return UUID.randomUUID().toString();
+    }
+
+    public static String bytes2String(byte[] bytes) {
+        Prediction.checkNotNull(bytes, "bytes could not be null");
+        return new String(bytes, Charset.forName(DEFAULT_CHARSET));
+    }
+
+    public static byte[] string2Bytes(String source) {
+        Prediction.checkNotNull(source, "source could be null");
+        try {
+            return source.getBytes(DEFAULT_CHARSET);
+        }
+        catch (UnsupportedEncodingException e) {
+            throw new JMSRuntimeException(ExceptionUtils.getStackTrace(e));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java
 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java
new file mode 100644
index 0000000..3ff1d69
--- /dev/null
+++ 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.support;
+
+/**
+ * Converter that convert object directly, which means Integer can only be
+ * converted to Integer,rather than Integer and Long.
+ */
+public class ObjectTypeCast {
+
+    public static String cast2String(Object obj) {
+        if (obj == null) {
+            return null;
+        }
+        if (String.class.isInstance(obj)) {
+            return (String) obj;
+        }
+        throw new ClassCastException("To casted object is " + obj.getClass() + 
", not String.class");
+    }
+
+    public static Long cast2Long(Object obj) {
+        if (obj == null) {
+            return null;
+        }
+        if (Long.class.isInstance(obj)) {
+            return (Long) obj;
+        }
+        throw new ClassCastException("To casted object is " + obj.getClass() + 
", not Long.class");
+    }
+
+    public static Integer cast2Integer(Object obj) {
+        if (obj == null) {
+            return null;
+        }
+        if (Integer.class.isInstance(obj)) {
+            return (Integer) obj;
+        }
+        throw new ClassCastException("To casted object is " + obj.getClass() + 
", not Integer.class");
+    }
+
+    public static Boolean cast2Boolean(Object obj) {
+        if (obj == null) {
+            return null;
+        }
+        if (Boolean.class.isInstance(obj)) {
+            return (Boolean) obj;
+        }
+        throw new ClassCastException("To casted object is " + obj.getClass() + 
", not Boolean.class");
+    }
+
+    public static <T> T cast2Object(Object obj, Class<T> target) {
+        if (obj == null) {
+            return null;
+        }
+        if (target.isInstance(obj)) {
+            return (T) obj;
+        }
+        throw new ClassCastException("To casted object is " + obj.getClass() + 
", not " + target.getSimpleName() + ".class");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/Prediction.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/Prediction.java 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/Prediction.java
new file mode 100644
index 0000000..868c5d8
--- /dev/null
+++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/Prediction.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.support;
+
+public class Prediction {
+
+    public static void checkNotNull(Object obj, String errorMsg) {
+        if (obj == null) {
+            throw new IllegalArgumentException(errorMsg);
+        }
+    }
+
+    public static void checkNotBlank(String source, String errorMsg) {
+        if (source == null || source.trim().length() == 0) {
+            throw new IllegalArgumentException(errorMsg);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/PrimitiveTypeCast.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/PrimitiveTypeCast.java
 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/PrimitiveTypeCast.java
new file mode 100644
index 0000000..6e24ab0
--- /dev/null
+++ 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/PrimitiveTypeCast.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.support;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+
+/**
+ * Primitive type converter, according to the conversion table in {@link 
MapMessage}.
+ */
+public class PrimitiveTypeCast {
+
+    /**
+     * Indicate if the parameter obj is primitive type.
+     *
+     * @param obj that to be check
+     * @return true if the obj is primitive type, otherwise return false
+     */
+    public static boolean isPrimitiveType(Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (Boolean.class.isInstance(obj)
+            || Byte.class.isInstance(obj)
+            || Short.class.isInstance(obj)
+            || Character.class.isInstance(obj)
+            || Integer.class.isInstance(obj)
+            || Long.class.isInstance(obj)
+            || Float.class.isInstance(obj)
+            || Double.class.isInstance(obj)
+            || String.class.isInstance(obj)
+            || byte[].class.isInstance(obj)) {
+            return true;
+        }
+
+        return false;
+    }
+
+    public static boolean cast2Boolean(Object obj) throws JMSException {
+        if (obj == null) {
+            return Boolean.valueOf(null);
+        }
+
+        if (Boolean.class.isInstance(obj)) {
+            return (Boolean) obj;
+        }
+        if (String.class.isInstance(obj)) {
+            return Boolean.valueOf((String) obj);
+        }
+
+        throw new JMSException("Incorrect type[" + obj.getClass() + "] to 
cast");
+    }
+
+    public static byte cast2Byte(Object obj) throws JMSException {
+        if (obj == null) {
+            return Byte.valueOf(null);
+        }
+
+        if (Byte.class.isInstance(obj)) {
+            return (Byte) obj;
+        }
+        if (String.class.isInstance(obj)) {
+            return Byte.valueOf((String) obj);
+        }
+
+        throw new JMSException("Incorrect type[" + obj.getClass() + "] to 
cast");
+    }
+
+    public static short cast2Short(Object obj) throws JMSException {
+        if (obj == null) {
+            return Short.valueOf(null);
+        }
+
+        if (Byte.class.isInstance(obj)) {
+            return ((Byte) obj).shortValue();
+        }
+        if (Short.class.isInstance(obj)) {
+            return (Short) obj;
+        }
+        if (String.class.isInstance(obj)) {
+            return Short.valueOf((String) obj);
+        }
+
+        throw new JMSException("Incorrect type[" + obj.getClass() + "] to 
cast");
+    }
+
+    public static char cast2Char(Object obj) throws JMSException {
+        if (obj == null) {
+            throw new NullPointerException("Obj is required");
+        }
+
+        if (Character.class.isInstance(obj)) {
+            return (Character) obj;
+        }
+
+        throw new JMSException("Incorrect type[" + obj.getClass() + "] to 
cast");
+    }
+
+    public static int cast2Int(Object obj) throws JMSException {
+        if (obj == null) {
+            return Integer.valueOf(null);
+        }
+
+        if (Byte.class.isInstance(obj)) {
+            return ((Byte) obj).intValue();
+        }
+        if (Short.class.isInstance(obj)) {
+            return ((Short) obj).intValue();
+        }
+        if (Integer.class.isInstance(obj)) {
+            return (Integer) obj;
+        }
+        if (String.class.isInstance(obj)) {
+            return Integer.parseInt((String) obj);
+        }
+
+        throw new JMSException("Incorrect type[" + obj.getClass() + "] to 
cast");
+    }
+
+    public static long cast2Long(Object obj) throws JMSException {
+        if (obj == null) {
+            return Long.valueOf(null);
+        }
+
+        if (Byte.class.isInstance(obj)) {
+            return ((Byte) obj).longValue();
+        }
+        if (Short.class.isInstance(obj)) {
+            return ((Short) obj).longValue();
+        }
+        if (Integer.class.isInstance(obj)) {
+            return ((Integer) obj).longValue();
+        }
+        if (Long.class.isInstance(obj)) {
+            return (Long) obj;
+        }
+        if (String.class.isInstance(obj)) {
+            return Long.parseLong((String) obj);
+        }
+
+        throw new JMSException("Incorrect type[" + obj.getClass() + "] to 
cast");
+    }
+
+    public static float cast2Float(Object obj) throws JMSException {
+        if (obj == null) {
+            return Float.valueOf(null);
+        }
+
+        if (Float.class.isInstance(obj)) {
+            return (Float) obj;
+        }
+        if (String.class.isInstance(obj)) {
+            return Float.parseFloat((String) obj);
+        }
+
+        throw new JMSException("Incorrect type[" + obj.getClass() + "] to 
cast");
+    }
+
+    public static double cast2Double(Object obj) throws JMSException {
+        if (obj == null) {
+            return Double.valueOf(null);
+        }
+
+        if (Float.class.isInstance(obj)) {
+            return ((Float) obj).doubleValue();
+        }
+        if (Double.class.isInstance(obj)) {
+            return (Double) obj;
+        }
+        if (String.class.isInstance(obj)) {
+            return Double.parseDouble((String) obj);
+        }
+
+        throw new JMSException("Incorrect type[" + obj.getClass() + "] to 
cast");
+    }
+
+    public static String cast2String(Object obj) throws JMSException {
+        if (obj == null) {
+            return String.valueOf(null);
+        }
+
+        if (Boolean.class.isInstance(obj)
+            || Byte.class.isInstance(obj)
+            || Short.class.isInstance(obj)
+            || Character.class.isInstance(obj)
+            || Integer.class.isInstance(obj)
+            || Long.class.isInstance(obj)
+            || Float.class.isInstance(obj)
+            || Double.class.isInstance(obj)
+            || String.class.isInstance(obj)
+            ) {
+            return obj.toString();
+        }
+
+        throw new JMSException("Incorrect type[" + obj.getClass() + "] to 
cast");
+    }
+
+    public static byte[] cast2ByteArray(Object obj) throws JMSException {
+        if (obj instanceof byte[]) {
+            return (byte[]) obj;
+        }
+
+        throw new JMSException("Incorrect type[" + obj.getClass() + "] to 
cast");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/ProviderVersion.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/ProviderVersion.java
 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/ProviderVersion.java
new file mode 100644
index 0000000..c67e71c
--- /dev/null
+++ 
b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/ProviderVersion.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.support;
+
+public class ProviderVersion {
+
+    public static final Version CURRENT_VERSION = Version.V1_1_0;
+
+    public enum Version {
+
+        V1_1_0(1);
+        private int value;
+
+        Version(int value) {
+            this.value = value;
+        }
+
+        public int getValue() {
+            return value;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/resources/logback.xml
----------------------------------------------------------------------
diff --git a/rocketmq-jms/src/main/resources/logback.xml 
b/rocketmq-jms/src/main/resources/logback.xml
new file mode 100644
index 0000000..39da112
--- /dev/null
+++ b/rocketmq-jms/src/main/resources/logback.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="DefaultAppender"
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${user.home}/logs/rocketmq/jms.log</file>
+        <append>true</append>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            <fileNamePattern>${user.home}/logs/rocketmq/otherdays/jms.%i.log
+            </fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>10</maxIndex>
+        </rollingPolicy>
+        <triggeringPolicy
+            class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>100MB</maxFileSize>
+        </triggeringPolicy>
+        <encoder>
+            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <append>true</append>
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+
+    <logger name="org.apache.rocketmq.jms">
+        <level value="DEBUG"/>
+    </logger>
+
+    <root>
+        <level value="ERROR"/>
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/RocketMQConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/RocketMQConnectionFactoryTest.java
 
b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/RocketMQConnectionFactoryTest.java
new file mode 100644
index 0000000..61f1e54
--- /dev/null
+++ 
b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/RocketMQConnectionFactoryTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNull.notNullValue;
+
+public class RocketMQConnectionFactoryTest {
+
+    @Test
+    public void testClientId() throws Exception {
+        final String nameServerAddress = "localhost:6789";
+        RocketMQConnectionFactory connectionFactory = new 
RocketMQConnectionFactory(nameServerAddress);
+
+        assertThat(connectionFactory.getNameServerAddress(), 
is(nameServerAddress));
+        assertThat(connectionFactory.getClientId(), notNullValue());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/destination/RocketMQQueueTest.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/destination/RocketMQQueueTest.java
 
b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/destination/RocketMQQueueTest.java
new file mode 100644
index 0000000..0a3b36b
--- /dev/null
+++ 
b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/destination/RocketMQQueueTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.destination;
+
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class RocketMQQueueTest {
+
+    @Test
+    public void test() throws Exception {
+        RocketMQQueue queue = new RocketMQQueue("MyQueue");
+
+        assertThat(queue.getQueueName(), is("MyQueue"));
+        assertThat(queue.toString(), is("MyQueue"));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/destination/RocketMQTopicTest.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/destination/RocketMQTopicTest.java
 
b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/destination/RocketMQTopicTest.java
new file mode 100644
index 0000000..c482e1c
--- /dev/null
+++ 
b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/destination/RocketMQTopicTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.destination;
+
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class RocketMQTopicTest {
+
+    @Test
+    public void test() throws Exception {
+        RocketMQTopic topic = new RocketMQTopic("MyTopic");
+
+        assertThat(topic.getTopicName(), is("MyTopic"));
+        assertThat(topic.toString(), is("MyTopic"));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java
 
b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java
new file mode 100644
index 0000000..62b5056
--- /dev/null
+++ 
b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.hook;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import javax.jms.Message;
+import org.apache.rocketmq.jms.exception.MessageExpiredException;
+import org.apache.rocketmq.jms.msg.JMSTextMessage;
+import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+
+public class ReceiveMessageHookTest {
+
+    @Test(expected = MessageExpiredException.class)
+    public void testValidateFail() throws Exception {
+        ReceiveMessageHook hook = new ReceiveMessageHook();
+
+        Message message = new JMSTextMessage("text");
+        message.setJMSExpiration(new Date().getTime());
+        Thread.sleep(100);
+        hook.before(message);
+    }
+
+    @Test
+    public void testValidateSuccess() throws Exception {
+        ReceiveMessageHook hook = new ReceiveMessageHook();
+
+        Message message = new JMSTextMessage("text");
+        // never expired
+        message.setJMSExpiration(0);
+        hook.before(message);
+
+        // expired in the future
+        message.setJMSExpiration(new 
SimpleDateFormat("yyyy-MM-dd").parse("2999-01-01").getTime());
+        hook.before(message);
+    }
+
+    @Test
+    public void setProviderProperties() throws Exception {
+        ReceiveMessageHook hook = new ReceiveMessageHook();
+
+        Message message = new JMSTextMessage("text");
+        hook.before(message);
+
+        
assertThat(message.getLongProperty(JMSPropertiesEnum.JMSXRcvTimestamp.name()), 
greaterThan(0L));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java
 
b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java
new file mode 100644
index 0000000..29e91ec
--- /dev/null
+++ 
b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.hook;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import org.apache.rocketmq.jms.RocketMQProducer;
+import org.apache.rocketmq.jms.destination.RocketMQTopic;
+import org.apache.rocketmq.jms.exception.UnsupportDeliveryModelException;
+import org.apache.rocketmq.jms.msg.JMSTextMessage;
+import org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum;
+import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNot.not;
+import static org.hamcrest.core.IsNull.notNullValue;
+import static org.hamcrest.core.IsNull.nullValue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SendMessageHookTest {
+
+    @Test(expected = UnsupportDeliveryModelException.class)
+    public void testValidate() throws Exception {
+        final JMSTextMessage message = new JMSTextMessage("text");
+        final RocketMQTopic destination = new RocketMQTopic("destination");
+        final int deliveryMode = DeliveryMode.NON_PERSISTENT;
+        final int priority = 4;
+        final long timeToLive = 1000 * 100L;
+
+        SendMessageHook hook = new SendMessageHook();
+        hook.before(message, destination, deliveryMode, priority, timeToLive);
+    }
+
+    @Test
+    public void testSetHeader() throws Exception {
+        RocketMQProducer producer = mock(RocketMQProducer.class);
+        when(producer.getDeliveryDelay()).thenReturn(0L);
+
+        final JMSTextMessage message = new JMSTextMessage("text");
+        final Destination destination = new RocketMQTopic("destination");
+        final int deliveryMode = DeliveryMode.PERSISTENT;
+        final int priority = 5;
+        long timeToLive = JMSHeaderEnum.JMS_TIME_TO_LIVE_DEFAULT_VALUE;
+        SendMessageHook hook = new SendMessageHook(producer);
+        hook.before(message, destination, deliveryMode, priority, timeToLive);
+
+        assertThat(message.getJMSDestination(), is(destination));
+        assertThat(message.getJMSDeliveryMode(), 
is(JMSHeaderEnum.JMS_DELIVERY_MODE_DEFAULT_VALUE));
+        assertThat(message.getJMSExpiration(), is(0L));
+        assertThat(message.getJMSDeliveryTime(), notNullValue());
+        assertThat(message.getJMSPriority(), is(5));
+        assertThat(message.getJMSMessageID(), notNullValue());
+        assertThat(message.getJMSTimestamp(), notNullValue());
+    }
+
+    /**
+     * Disable ID,timestamp, and set expired time
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testSetHeader2() throws Exception {
+        RocketMQProducer producer = mock(RocketMQProducer.class);
+        when(producer.getUserName()).thenReturn("user");
+        when(producer.getDisableMessageID()).thenReturn(true);
+        when(producer.getDisableMessageTimestamp()).thenReturn(true);
+
+        final JMSTextMessage message = new JMSTextMessage("text");
+        final Destination destination = new RocketMQTopic("destination");
+        final int deliveryMode = DeliveryMode.PERSISTENT;
+        final int priority = 5;
+        final long timeToLive = 1000 * 100L;
+        SendMessageHook hook = new SendMessageHook(producer);
+        hook.before(message, destination, deliveryMode, priority, timeToLive);
+
+        // assert header
+        assertThat(message.getJMSMessageID(), nullValue());
+        assertThat(message.getJMSTimestamp(), is(0L));
+        assertThat(message.getJMSExpiration(), not(is(0L)));
+
+        // assert properties
+        
assertThat(message.getStringProperty(JMSPropertiesEnum.JMSXUserID.name()), 
is("user"));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java
 
b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java
new file mode 100644
index 0000000..ae08bec
--- /dev/null
+++ 
b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration.source;
+
+import javax.jms.ConnectionFactory;
+import org.apache.rocketmq.jms.RocketMQConnectionFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.jms.annotation.EnableJms;
+import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.support.converter.SimpleMessageConverter;
+
+@Configuration
+@ComponentScan(basePackageClasses = {RocketMQServer.class})
+@EnableJms
+public class AppConfig {
+
+    @Bean
+    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
+        DefaultJmsListenerContainerFactory factory = new 
DefaultJmsListenerContainerFactory();
+        factory.setConnectionFactory(connectionFactory());
+        factory.setConcurrency("1");
+        return factory;
+    }
+
+    @Bean
+    public ConnectionFactory connectionFactory() {
+//        CachingConnectionFactory factory = new CachingConnectionFactory();
+//        factory.setTargetConnectionFactory(new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS));
+//        return factory;
+        //todo
+        return new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS);
+    }
+
+    @Bean
+    public JmsTemplate jmsTemplate() {
+        JmsTemplate jmsTemplate = new JmsTemplate();
+        jmsTemplate.setConnectionFactory(connectionFactory());
+        jmsTemplate.setMessageConverter(new SimpleMessageConverter());
+        return jmsTemplate;
+    }
+}


Reply via email to