http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java 
b/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java
deleted file mode 100644
index ea4b49e..0000000
--- 
a/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java
+++ /dev/null
@@ -1,434 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.domain.message;
-
-import com.google.common.collect.Maps;
-import com.google.common.io.BaseEncoding;
-import java.io.Serializable;
-import java.util.Enumeration;
-import java.util.Map;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.rocketmq.jms.domain.JmsBaseConstant;
-import org.apache.rocketmq.jms.util.ExceptionUtil;
-
-public class JmsBaseMessage implements Message {
-    /**
-     * Message properties
-     */
-    protected Map<String, Object> properties = Maps.newHashMap();
-    /**
-     * Message headers
-     */
-    protected Map<String, Object> headers = Maps.newHashMap();
-    /**
-     * Message body
-     */
-    protected Serializable body;
-
-    @Override
-    public String getJMSMessageID() {
-        return (String) headers.get(JmsBaseConstant.JMS_MESSAGE_ID);
-    }
-
-    /**
-     * Sets the message ID.
-     * <p/>
-     * <P>JMS providers set this field when a message is sent. Do not allow 
User to set the message ID by yourself.
-     *
-     * @param id the ID of the message
-     * @see javax.jms.Message#getJMSMessageID()
-     */
-
-    @Override
-    public void setJMSMessageID(String id) {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    @Override
-    public long getJMSTimestamp() {
-        if (headers.containsKey(JmsBaseConstant.JMS_TIMESTAMP)) {
-            return (Long) headers.get(JmsBaseConstant.JMS_TIMESTAMP);
-        }
-        return 0;
-    }
-
-    @Override
-    public void setJMSTimestamp(long timestamp) {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    @Override
-    public byte[] getJMSCorrelationIDAsBytes() {
-        String jmsCorrelationID = getJMSCorrelationID();
-        if (jmsCorrelationID != null) {
-            try {
-                return BaseEncoding.base64().decode(jmsCorrelationID);
-            }
-            catch (Exception e) {
-                return jmsCorrelationID.getBytes();
-            }
-        }
-        return null;
-    }
-
-    @Override
-    public void setJMSCorrelationIDAsBytes(byte[] correlationID) {
-        String encodedText = BaseEncoding.base64().encode(correlationID);
-        setJMSCorrelationID(encodedText);
-    }
-
-    @Override
-    public String getJMSCorrelationID() {
-        if (headers.containsKey(JmsBaseConstant.JMS_CORRELATION_ID)) {
-            return (String) headers.get(JmsBaseConstant.JMS_CORRELATION_ID);
-        }
-        return null;
-    }
-
-    @Override
-    public void setJMSCorrelationID(String correlationID) {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    @Override
-    public Destination getJMSReplyTo() {
-        if (headers.containsKey(JmsBaseConstant.JMS_REPLY_TO)) {
-            return (Destination) headers.get(JmsBaseConstant.JMS_REPLY_TO);
-        }
-        return null;
-    }
-
-    @Override
-    public void setJMSReplyTo(Destination replyTo) {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    @Override
-    public String toString() {
-        return ToStringBuilder.reflectionToString(this);
-    }
-
-    @Override
-    public Destination getJMSDestination() {
-        if (headers.containsKey(JmsBaseConstant.JMS_DESTINATION)) {
-            return (Destination) headers.get(JmsBaseConstant.JMS_DESTINATION);
-        }
-        return null;
-    }
-
-    @Override
-    public void setJMSDestination(Destination destination) {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    @SuppressWarnings("unchecked")
-    public <T> T getBody(Class<T> clazz) throws JMSException {
-        if (clazz.isInstance(body)) {
-            return (T) body;
-        }
-        else {
-            throw new IllegalArgumentException("The class " + clazz
-                + " is unknown to this implementation");
-        }
-    }
-
-    @Override
-    public int getJMSDeliveryMode() {
-        if (headers.containsKey(JmsBaseConstant.JMS_DELIVERY_MODE)) {
-            return (Integer) headers.get(JmsBaseConstant.JMS_DELIVERY_MODE);
-        }
-        return 0;
-    }
-
-    /**
-     * Sets the <CODE>DeliveryMode</CODE> value for this message.
-     * <p/>
-     * <P>JMS providers set this field when a message is sent. ROCKETMQ only 
support DeliveryMode.PERSISTENT mode. So do not
-     * allow User to set this by yourself, but you can get the default mode by 
<CODE>getJMSDeliveryMode</CODE> method.
-     *
-     * @param deliveryMode the delivery mode for this message
-     * @see javax.jms.Message#getJMSDeliveryMode()
-     * @see javax.jms.DeliveryMode
-     */
-
-    @Override
-    public void setJMSDeliveryMode(int deliveryMode) {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    public boolean isBodyAssignableTo(Class<?> clazz) throws JMSException {
-        return clazz.isInstance(body);
-    }
-
-    @Override
-    public boolean getJMSRedelivered() {
-        return headers.containsKey(JmsBaseConstant.JMS_REDELIVERED)
-            && (Boolean) headers.get(JmsBaseConstant.JMS_REDELIVERED);
-    }
-
-    @Override
-    public void setJMSRedelivered(boolean redelivered) {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    /**
-     * copy meta data from source message
-     *
-     * @param sourceMessage source message
-     */
-    public void copyMetaData(JmsBaseMessage sourceMessage) {
-        if (!sourceMessage.getHeaders().isEmpty()) {
-            for (Map.Entry<String, Object> entry : 
sourceMessage.getHeaders().entrySet()) {
-                if (!headerExits(entry.getKey())) {
-                    setHeader(entry.getKey(), entry.getValue());
-                }
-            }
-        }
-        if (!sourceMessage.getProperties().isEmpty()) {
-            for (Map.Entry<String, Object> entry : 
sourceMessage.getProperties().entrySet()) {
-                if (!propertyExists(entry.getKey())) {
-                    setObjectProperty(entry.getKey(), entry.getValue());
-                }
-            }
-        }
-    }
-
-    @Override
-    public String getJMSType() {
-        return (String) headers.get(JmsBaseConstant.JMS_TYPE);
-    }
-
-    @Override
-    public void setJMSType(String type) {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    public Map<String, Object> getHeaders() {
-        return this.headers;
-    }
-
-    @Override
-    public long getJMSExpiration() {
-        if (headers.containsKey(JmsBaseConstant.JMS_EXPIRATION)) {
-            return (Long) headers.get(JmsBaseConstant.JMS_EXPIRATION);
-        }
-        return 0;
-    }
-
-    @Override
-    public void setJMSExpiration(long expiration) {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    public boolean headerExits(String name) {
-        return this.headers.containsKey(name);
-    }
-
-    @Override
-    public int getJMSPriority() {
-        if (headers.containsKey(JmsBaseConstant.JMS_PRIORITY)) {
-            return (Integer) headers.get(JmsBaseConstant.JMS_PRIORITY);
-        }
-        return 5;
-    }
-
-    @Override
-    public void setJMSPriority(int priority) {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    public void setHeader(String name, Object value) {
-        this.headers.put(name, value);
-    }
-
-    public Map<String, Object> getProperties() {
-        return this.properties;
-    }
-
-    public void setProperties(Map<String, Object> properties) {
-        this.properties = properties;
-    }
-
-    @Override
-    public void acknowledge() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported!");
-    }
-
-    @Override
-    public void clearProperties() {
-        this.properties.clear();
-    }
-
-    @Override
-    public void clearBody() {
-        this.body = null;
-    }
-
-    @Override
-    public boolean propertyExists(String name) {
-        return properties.containsKey(name);
-    }
-
-    @Override
-    public boolean getBooleanProperty(String name) throws JMSException {
-        if (propertyExists(name)) {
-            Object value = getObjectProperty(name);
-            return value instanceof Boolean ? (Boolean) value : 
Boolean.valueOf(value.toString());
-        }
-        return false;
-    }
-
-    @Override
-    public byte getByteProperty(String name) throws JMSException {
-        if (propertyExists(name)) {
-            Object value = getObjectProperty(name);
-            return value instanceof Byte ? (Byte) value : 
Byte.valueOf(value.toString());
-        }
-        return 0;
-    }
-
-    @Override
-    public short getShortProperty(String name) throws JMSException {
-        if (propertyExists(name)) {
-            Object value = getObjectProperty(name);
-            return value instanceof Short ? (Short) value : 
Short.valueOf(value.toString());
-        }
-        return 0;
-    }
-
-    @Override
-    public int getIntProperty(String name) throws JMSException {
-        if (propertyExists(name)) {
-            Object value = getObjectProperty(name);
-            return value instanceof Integer ? (Integer) value : 
Integer.valueOf(value.toString());
-        }
-        return 0;
-    }
-
-    @Override
-    public long getLongProperty(String name) throws JMSException {
-        if (propertyExists(name)) {
-            Object value = getObjectProperty(name);
-            return value instanceof Long ? (Long) value : 
Long.valueOf(value.toString());
-        }
-        return 0L;
-    }
-
-    @Override
-    public float getFloatProperty(String name) throws JMSException {
-        if (propertyExists(name)) {
-            Object value = getObjectProperty(name);
-            return value instanceof Float ? (Float) value : 
Float.valueOf(value.toString());
-        }
-        return 0f;
-    }
-
-    @Override
-    public double getDoubleProperty(String name) throws JMSException {
-        if (propertyExists(name)) {
-            Object value = getObjectProperty(name);
-            return value instanceof Double ? (Double) value : 
Double.valueOf(value.toString());
-        }
-        return 0d;
-    }
-
-    @Override
-    public String getStringProperty(String name) throws JMSException {
-        if (propertyExists(name)) {
-            return getObjectProperty(name).toString();
-        }
-        return null;
-    }
-
-    @Override
-    public Object getObjectProperty(String name) throws JMSException {
-        return this.properties.get(name);
-    }
-
-    @Override
-    public Enumeration<?> getPropertyNames() throws JMSException {
-        final Object[] keys = this.properties.keySet().toArray();
-        return new Enumeration<Object>() {
-            int i;
-
-            @Override
-            public boolean hasMoreElements() {
-                return i < keys.length;
-            }
-
-            @Override
-            public Object nextElement() {
-                return keys[i++];
-            }
-        };
-    }
-
-    @Override
-    public void setBooleanProperty(String name, boolean value) {
-        setObjectProperty(name, value);
-    }
-
-    @Override
-    public void setByteProperty(String name, byte value) {
-        setObjectProperty(name, value);
-    }
-
-    @Override
-    public void setShortProperty(String name, short value) {
-        setObjectProperty(name, value);
-    }
-
-    @Override
-    public void setIntProperty(String name, int value) {
-        setObjectProperty(name, value);
-    }
-
-    @Override
-    public void setLongProperty(String name, long value) {
-        setObjectProperty(name, value);
-    }
-
-    public void setFloatProperty(String name, float value) {
-        setObjectProperty(name, value);
-    }
-
-    @Override
-    public void setDoubleProperty(String name, double value) {
-        setObjectProperty(name, value);
-    }
-
-    @Override
-    public void setStringProperty(String name, String value) {
-        setObjectProperty(name, value);
-    }
-
-    @Override
-    public void setObjectProperty(String name, Object value) {
-        if (value instanceof Number || value instanceof String || value 
instanceof Boolean) {
-            this.properties.put(name, value);
-        }
-        else {
-            throw new IllegalArgumentException(
-                "Value should be boolean, byte, short, int, long, float, 
double, and String.");
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java
 
b/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java
deleted file mode 100644
index b1e85b0..0000000
--- 
a/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.domain.message;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.MessageEOFException;
-import javax.jms.MessageFormatException;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
-import org.apache.rocketmq.jms.util.ExceptionUtil;
-
-/**
- * The <CODE>BytesMessage</CODE> methods are based largely on those found in 
<CODE>java.io.DataInputStream</CODE> and
- * <CODE>java.io.DataOutputStream</CODE>. <P> Notice:Although the JMS API 
allows the use of message properties with byte
- * messages, they are typically not used, since the inclusion of properties 
may affect the format. <P>
- */
-public class JmsBytesMessage extends JmsBaseMessage implements BytesMessage {
-    private DataInputStream dataAsInput;
-    private DataOutputStream dataAsOutput;
-    private ByteArrayOutputStream bytesOut;
-    private byte[] bytesIn;
-
-    /**
-     * Message created for reading
-     *
-     * @param data
-     */
-    public JmsBytesMessage(byte[] data) {
-        this.bytesIn = data;
-        dataAsInput = new DataInputStream(new ByteArrayInputStream(data, 0, 
data.length));
-    }
-
-    /**
-     * Message created to be sent
-     */
-    public JmsBytesMessage() {
-        bytesOut = new ByteArrayOutputStream();
-        dataAsOutput = new DataOutputStream(bytesOut);
-    }
-
-    public long getBodyLength() throws JMSException {
-        return getData().length;
-    }
-
-    /**
-     * @return the data
-     */
-    public byte[] getData() {
-        if (bytesOut != null) {
-            return bytesOut.toByteArray();
-        }
-        else {
-            return bytesIn;
-        }
-
-    }
-
-    public boolean readBoolean() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported!");
-    }
-
-    public byte readByte() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported!");
-    }
-
-    public int readUnsignedByte() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported!");
-    }
-
-    public short readShort() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported!");
-    }
-
-    public int readUnsignedShort() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported!");
-    }
-
-    public char readChar() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported!");
-    }
-
-    public int readInt() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported!");
-    }
-
-    public long readLong() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported!");
-    }
-
-    public float readFloat() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported!");
-    }
-
-    public double readDouble() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported!");
-    }
-
-    public String readUTF() throws JMSException {
-        throw new UnsupportedOperationException("Unsupported!");
-    }
-
-    public int readBytes(byte[] value) throws JMSException {
-        return readBytes(value, value.length);
-    }
-
-    public int readBytes(byte[] value, int length) throws JMSException {
-        if (length > value.length) {
-            throw new IndexOutOfBoundsException("length must be smaller than 
the length of value");
-        }
-        if (dataAsInput == null) {
-            throw new MessageNotReadableException("Message is not readable! ");
-        }
-        try {
-            int offset = 0;
-            while (offset < length) {
-                int read = dataAsInput.read(value, offset, length - offset);
-                if (read < 0) {
-                    break;
-                }
-                offset += read;
-            }
-
-            if (offset == 0 && length != 0) {
-                return -1;
-            }
-            else {
-                return offset;
-            }
-        }
-        catch (IOException e) {
-            throw handleInputException(e);
-        }
-
-    }
-
-    public void writeBoolean(boolean value) throws JMSException {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    public void writeByte(byte value) throws JMSException {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    public void writeShort(short value) throws JMSException {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    public void writeChar(char value) throws JMSException {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    public void writeInt(int value) throws JMSException {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    public void writeLong(long value) throws JMSException {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    public void writeFloat(float value) throws JMSException {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    public void writeDouble(double value) throws JMSException {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    public void writeUTF(String value) throws JMSException {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    public void writeBytes(byte[] value) throws JMSException {
-        if (dataAsOutput == null) {
-            throw new MessageNotWriteableException("Message is not writable! 
");
-        }
-        try {
-            dataAsOutput.write(value);
-        }
-        catch (IOException e) {
-            throw handleOutputException(e);
-        }
-    }
-
-    public void writeBytes(byte[] value, int offset, int length) throws 
JMSException {
-        if (dataAsOutput == null) {
-            throw new MessageNotWriteableException("Message is not writable! 
");
-        }
-        try {
-            dataAsOutput.write(value, offset, length);
-        }
-        catch (IOException e) {
-            throw handleOutputException(e);
-        }
-    }
-
-    public void writeObject(Object value) throws JMSException {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    public void reset() throws JMSException {
-        ExceptionUtil.handleUnSupportedException();
-    }
-
-    private JMSException handleOutputException(final IOException e) {
-        JMSException ex = new JMSException(e.getMessage());
-        ex.initCause(e);
-        ex.setLinkedException(e);
-        return ex;
-    }
-
-    private JMSException handleInputException(final IOException e) {
-        JMSException ex;
-        if (e instanceof EOFException) {
-            ex = new MessageEOFException(e.getMessage());
-        }
-        else {
-            ex = new MessageFormatException(e.getMessage());
-        }
-        ex.initCause(e);
-        ex.setLinkedException(e);
-        return ex;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java
 
b/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java
deleted file mode 100644
index f67da14..0000000
--- 
a/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.domain.message;
-
-import java.io.Serializable;
-import javax.jms.JMSException;
-import javax.jms.ObjectMessage;
-
-public class JmsObjectMessage extends JmsBaseMessage implements ObjectMessage {
-
-    public JmsObjectMessage(Serializable object) {
-        this.body = object;
-    }
-
-    public JmsObjectMessage() {
-
-    }
-
-    public Serializable getObject() throws JMSException {
-        return this.body;
-    }
-
-    public void setObject(Serializable object) throws JMSException {
-        this.body = object;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java 
b/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java
deleted file mode 100644
index ce19b51..0000000
--- 
a/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.domain.message;
-
-import javax.jms.JMSException;
-import javax.jms.TextMessage;
-
-public class JmsTextMessage extends JmsBaseMessage implements TextMessage {
-    private String text;
-
-    public JmsTextMessage() {
-
-    }
-
-    public JmsTextMessage(String text) {
-        setText(text);
-    }
-
-    public void clearBody() {
-        this.text = null;
-        super.clearBody();
-    }
-
-    public String getText() throws JMSException {
-        return this.text;
-    }
-
-    public void setText(String text) {
-        this.body = text;
-        this.text = text;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java 
b/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java
deleted file mode 100644
index bd926e5..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.util;
-
-import com.google.common.base.Preconditions;
-import javax.jms.JMSException;
-
-public class ExceptionUtil {
-    public static final boolean SKIP_SET_EXCEPTION
-        = Boolean.parseBoolean(System.getProperty("skip.set.exception", 
"false"));
-
-    public static void handleUnSupportedException() {
-        if (!ExceptionUtil.SKIP_SET_EXCEPTION) {
-            throw new UnsupportedOperationException("Operation unsupported! If 
you want to skip this Exception," +
-                " use '-Dskip.set.exception=true' in JVM options.");
-        }
-    }
-
-    public static JMSException convertToJmsException(Exception e, String 
extra) {
-        Preconditions.checkNotNull(extra);
-        Preconditions.checkNotNull(e);
-        JMSException jmsException = new JMSException(extra);
-        jmsException.initCause(e);
-        return jmsException;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java 
b/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java
deleted file mode 100644
index 3cf03f9..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.util;
-
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageExt;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import javax.jms.BytesMessage;
-import javax.jms.ObjectMessage;
-import javax.jms.TextMessage;
-import org.apache.commons.lang.StringUtils;
-import org.apache.rocketmq.jms.domain.JmsBaseConstant;
-import org.apache.rocketmq.jms.domain.JmsBaseTopic;
-import org.apache.rocketmq.jms.domain.message.JmsBaseMessage;
-import org.apache.rocketmq.jms.domain.message.JmsBytesMessage;
-import org.apache.rocketmq.jms.domain.message.JmsObjectMessage;
-import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
-
-import static 
org.apache.rocketmq.jms.domain.JmsBaseMessageProducer.initRocketMQHeaders;
-
-public class MessageConverter {
-    public static byte[] getContentFromJms(javax.jms.Message jmsMessage) 
throws Exception {
-        byte[] content;
-        if (jmsMessage instanceof TextMessage) {
-            if (StringUtils.isEmpty(((TextMessage) jmsMessage).getText())) {
-                throw new IllegalArgumentException("Message body length is 
zero");
-            }
-            content = MsgConvertUtil.string2Bytes(((TextMessage) 
jmsMessage).getText(),
-                Charsets.UTF_8.toString());
-        }
-        else if (jmsMessage instanceof ObjectMessage) {
-            if (((ObjectMessage) jmsMessage).getObject() == null) {
-                throw new IllegalArgumentException("Message body length is 
zero");
-            }
-            content = MsgConvertUtil.objectSerialize(((ObjectMessage) 
jmsMessage).getObject());
-        }
-        else if (jmsMessage instanceof BytesMessage) {
-            JmsBytesMessage bytesMessage = (JmsBytesMessage) jmsMessage;
-            if (bytesMessage.getBodyLength() == 0) {
-                throw new IllegalArgumentException("Message body length is 
zero");
-            }
-            content = bytesMessage.getData();
-        }
-        else {
-            throw new IllegalArgumentException("Unknown message type " + 
jmsMessage.getJMSType());
-        }
-
-        return content;
-    }
-
-    public static JmsBaseMessage convert2JMSMessage(MessageExt msg) throws 
Exception {
-        JmsBaseMessage message;
-        if (MsgConvertUtil.MSGMODEL_BYTES.equals(
-            msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) {
-            message = new JmsBytesMessage(msg.getBody());
-        }
-        else if (MsgConvertUtil.MSGMODEL_OBJ.equals(
-            msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) {
-            message = new 
JmsObjectMessage(MsgConvertUtil.objectDeserialize(msg.getBody()));
-        }
-        else if (MsgConvertUtil.MSGMODEL_TEXT.equals(
-            msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) {
-            message = new 
JmsTextMessage(MsgConvertUtil.bytes2String(msg.getBody(),
-                Charsets.UTF_8.toString()));
-        }
-        else {
-            // rocketmq producer sends bytesMessage without setting 
JMS_MSGMODEL.
-            message = new JmsBytesMessage(msg.getBody());
-        }
-
-        //-------------------------set headers-------------------------
-        Map<String, Object> properties = new HashMap<String, Object>();
-
-        message.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:" + 
msg.getMsgId());
-
-        if (msg.getReconsumeTimes() > 0) {
-            message.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.TRUE);
-        }
-        else {
-            message.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.FALSE);
-        }
-
-        Map<String, String> propertiesMap = msg.getProperties();
-        if (propertiesMap != null) {
-            for (String properName : propertiesMap.keySet()) {
-                String properValue = propertiesMap.get(properName);
-                if (JmsBaseConstant.JMS_DESTINATION.equals(properName)) {
-                    String destinationStr = properValue;
-                    if (null != destinationStr) {
-                        List<String> msgTuple = 
Arrays.asList(destinationStr.split(":"));
-                        message.setHeader(JmsBaseConstant.JMS_DESTINATION,
-                            new JmsBaseTopic(msgTuple.get(0), 
msgTuple.get(1)));
-                    }
-                }
-                else if (JmsBaseConstant.JMS_DELIVERY_MODE.equals(properName) 
||
-                    JmsBaseConstant.JMS_PRIORITY.equals(properName)) {
-                    message.setHeader(properName, properValue);
-                }
-                else if (JmsBaseConstant.JMS_TIMESTAMP.equals(properName) ||
-                    JmsBaseConstant.JMS_EXPIRATION.equals(properName)) {
-                    message.setHeader(properName, properValue);
-                }
-                else if (JmsBaseConstant.JMS_CORRELATION_ID.equals(properName) 
||
-                    JmsBaseConstant.JMS_TYPE.equals(properName)) {
-                    message.setHeader(properName, properValue);
-                }
-                else if (JmsBaseConstant.JMS_MESSAGE_ID.equals(properName) ||
-                    JmsBaseConstant.JMS_REDELIVERED.equals(properName)) {
-                    //JMS_MESSAGE_ID should set by msg.getMsgID()
-                    continue;
-                }
-                else {
-                    properties.put(properName, properValue);
-                }
-            }
-        }
-
-        //Handle System properties, put into header.
-        //add what?
-        message.setProperties(properties);
-
-        return message;
-    }
-
-    public static Message convert2RMQMessage(JmsBaseMessage jmsMsg) throws 
Exception {
-        Message rocketmqMsg = new MessageExt();
-        // 1. Transform message body
-        rocketmqMsg.setBody(MessageConverter.getContentFromJms(jmsMsg));
-
-        // 2. Transform topic and messageType
-        JmsBaseTopic destination = (JmsBaseTopic) 
jmsMsg.getHeaders().get(JmsBaseConstant.JMS_DESTINATION);
-        String topic = destination.getMessageTopic();
-        rocketmqMsg.setTopic(topic);
-        String messageType = destination.getMessageType();
-        Preconditions.checkState(!messageType.contains("||"),
-            "'||' can not be in the destination when sending a message");
-        rocketmqMsg.setTags(messageType);
-
-        // 3. Transform message properties
-        Properties properties = initRocketMQHeaders(jmsMsg, topic, 
messageType);
-        for (String name : properties.stringPropertyNames()) {
-            String value = properties.getProperty(name);
-            if (MessageConst.PROPERTY_KEYS.equals(name)) {
-                rocketmqMsg.setKeys(value);
-            } else if (MessageConst.PROPERTY_TAGS.equals(name)) {
-                rocketmqMsg.setTags(value);
-            } else if (MessageConst.PROPERTY_DELAY_TIME_LEVEL.equals(name)) {
-                rocketmqMsg.setDelayTimeLevel(Integer.parseInt(value));
-            } else if (MessageConst.PROPERTY_WAIT_STORE_MSG_OK.equals(name)) {
-                rocketmqMsg.setWaitStoreMsgOK(Boolean.parseBoolean(value));
-            } else if (MessageConst.PROPERTY_BUYER_ID.equals(name)) {
-                rocketmqMsg.setBuyerId(value);
-            } else {
-                rocketmqMsg.putUserProperty(name, value);
-            }
-        }
-
-        return rocketmqMsg;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java 
b/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java
deleted file mode 100644
index ec55bbc..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.util;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-public class MsgConvertUtil {
-
-    public static final byte[] EMPTY_BYTES = new byte[0];
-    public static final String EMPTY_STRING = "";
-
-    public static final String JMS_MSGMODEL = "jmsMsgModel";
-    /**
-     * To adapt this scene: "Notify client try to receive ObjectMessage sent 
by JMS client" Set notify out message
-     * model, value can be textMessage OR objectMessage
-     */
-    public static final String COMPATIBLE_FIELD_MSGMODEL = "notifyOutMsgModel";
-
-    public static final String MSGMODEL_TEXT = "textMessage";
-    public static final String MSGMODEL_BYTES = "bytesMessage";
-    public static final String MSGMODEL_OBJ = "objectMessage";
-
-    public static final String MSG_TOPIC = "msgTopic";
-    public static final String MSG_TYPE = "msgType";
-
-    public static byte[] objectSerialize(Object object) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ObjectOutputStream oos = new ObjectOutputStream(baos);
-        oos.writeObject(object);
-        oos.close();
-        baos.close();
-        return baos.toByteArray();
-    }
-
-    public static Serializable objectDeserialize(byte[] bytes) throws 
IOException, ClassNotFoundException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-        ObjectInputStream ois = new ObjectInputStream(bais);
-        ois.close();
-        bais.close();
-        return (Serializable) ois.readObject();
-    }
-
-    public static final byte[] string2Bytes(String s, String charset) {
-        if (null == s) {
-            return EMPTY_BYTES;
-        }
-        byte[] bs = null;
-        try {
-            bs = s.getBytes(charset);
-        }
-        catch (Exception e) {
-            // ignore
-        }
-        return bs;
-    }
-
-    public static final String bytes2String(byte[] bs, String charset) {
-        if (null == bs) {
-            return EMPTY_STRING;
-        }
-        String s = null;
-        try {
-            s = new String(bs, charset);
-        }
-        catch (Exception e) {
-            // ignore
-        }
-        return s;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java 
b/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java
deleted file mode 100644
index 9b29928..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.util;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import java.util.Map;
-import org.apache.commons.lang.StringUtils;
-import org.apache.rocketmq.jms.domain.CommonConstant;
-
-public abstract class URISpecParser {
-
-    private static final String DEFAULT_BROKER = "rocketmq";
-
-    /**
-     * ConnectionUrl spec is broker://ip:port?key1=value1&key2=value2
-     *
-     * @param uri Just like broker://ip:port?key1=value1&key2=value2
-     * @return The parameters' map
-     */
-    public static Map<String, String> parseURI(String uri) {
-        Preconditions.checkArgument(null != uri && !uri.trim().isEmpty(), "Uri 
can not be empty!");
-
-        Map<String, String> results = Maps.newHashMap();
-        String broker = uri.substring(0, uri.indexOf(":"));
-        results.put(CommonConstant.PROVIDER, broker);
-
-        if (broker.equals(DEFAULT_BROKER)) {
-            //Special handle for alibaba inner mq broker
-            String queryStr = uri.substring(uri.indexOf("?") + 1, 
uri.length());
-            if (StringUtils.isNotEmpty(queryStr)) {
-                String[] params = queryStr.split("&");
-                for (String param : params) {
-                    if (param.contains("=")) {
-                        String[] values = param.split("=", 2);
-                        results.put(values[0], values[1]);
-                    }
-                }
-            }
-        }
-        else {
-            throw new IllegalArgumentException("Broker must be rocketmq");
-        }
-        return results;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/core/src/main/resources/application.conf 
b/core/src/main/resources/application.conf
deleted file mode 100644
index 713c915..0000000
--- a/core/src/main/resources/application.conf
+++ /dev/null
@@ -1 +0,0 @@
-version = ${project.version}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java 
b/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java
deleted file mode 100644
index d77b13e..0000000
--- a/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import org.junit.Assert;
-
-public class JmsTestListener implements MessageListener {
-
-    private int expectd;
-    private CountDownLatch latch;
-    private AtomicInteger consumedNum = new AtomicInteger(0);
-
-    public JmsTestListener() {
-        this.expectd = 10;
-    }
-    public JmsTestListener(int expectd) {
-        this.expectd = expectd;
-    }
-    public JmsTestListener(int expected, CountDownLatch latch) {
-        this.expectd = expected;
-        this.latch = latch;
-    }
-    @Override
-    public void onMessage(Message message) {
-        try {
-            Assert.assertNotNull(message);
-            Assert.assertNotNull(message.getJMSMessageID());
-            if (consumedNum.incrementAndGet() == expectd && latch != null) {
-                latch.countDown();
-            }
-        }
-        catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public int getConsumedNum() {
-        return consumedNum.get();
-    }
-
-    public void setLatch(CountDownLatch latch) {
-        this.latch = latch;
-    }
-
-    public void setExpectd(int expectd) {
-        this.expectd = expectd;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java 
b/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java
deleted file mode 100644
index 855cb19..0000000
--- a/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms;
-
-import java.lang.reflect.Field;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.rocketmq.client.producer.MQProducer;
-import org.apache.rocketmq.jms.domain.JmsBaseMessageConsumer;
-import org.apache.rocketmq.jms.domain.JmsBaseMessageProducer;
-import org.apache.rocketmq.jms.domain.RMQPushConsumerExt;
-import org.junit.Assert;
-
-public class JmsTestUtil {
-    public static MQProducer getMQProducer(String producerId) throws Exception 
{
-        Assert.assertNotNull(producerId);
-        Field field = 
JmsBaseMessageProducer.class.getDeclaredField("producerMap");
-        field.setAccessible(true);
-        ConcurrentMap<String, MQProducer> producerMap = (ConcurrentMap<String, 
MQProducer>) field.get(null);
-        return  producerMap.get(producerId);
-    }
-    public static RMQPushConsumerExt getRMQPushConsumerExt(String consumerId) 
throws Exception {
-        Assert.assertNotNull(consumerId);
-        Field field = 
JmsBaseMessageConsumer.class.getDeclaredField("consumerMap");
-        field.setAccessible(true);
-        ConcurrentMap<String, RMQPushConsumerExt> consumerMap = 
(ConcurrentMap<String, RMQPushConsumerExt>) field.get(null);
-        return  consumerMap.get(consumerId);
-    }
-    public static void checkConsumerState(String consumerId, boolean isNull, 
boolean isStarted) throws Exception {
-        RMQPushConsumerExt rmqPushConsumerExt = 
getRMQPushConsumerExt(consumerId);
-        if (isNull) {
-            Assert.assertNull(rmqPushConsumerExt);
-        } else  {
-            Assert.assertNotNull(rmqPushConsumerExt);
-            Assert.assertEquals(isStarted, rmqPushConsumerExt.isStarted());
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java
 
b/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java
deleted file mode 100644
index 9fe9f5e..0000000
--- 
a/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.domain.message;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class JmsBytesMessageTest {
-
-    private byte[] receiveData = "receive data test".getBytes();
-    private byte[] sendData = "send data test".getBytes();
-
-    @Test
-    public void testGetData() throws Exception {
-        JmsBytesMessage readMessage = new JmsBytesMessage(receiveData);
-
-        System.out.println(new String(readMessage.getData()));
-        Assert.assertEquals(new String(receiveData), new 
String(readMessage.getData()));
-
-        JmsBytesMessage sendMessage = new JmsBytesMessage();
-        sendMessage.writeBytes(sendData, 0, sendData.length);
-
-        System.out.println(new String(sendMessage.getData()));
-        Assert.assertEquals(new String(sendData), new 
String(sendMessage.getData()));
-
-    }
-
-    @Test
-    public void testGetBodyLength() throws Exception {
-
-        JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData);
-
-        System.out.println(bytesMessage.getBodyLength());
-        Assert.assertEquals(bytesMessage.getBodyLength(), receiveData.length);
-    }
-
-    @Test
-    public void testReadBytes() throws Exception {
-        JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData);
-
-        Assert.assertEquals(bytesMessage.getBodyLength(), receiveData.length);
-        byte[] receiveValue = new byte[receiveData.length];
-        bytesMessage.readBytes(receiveValue);
-
-        System.out.println(new String(receiveValue));
-        Assert.assertEquals(new String(receiveValue), new String(receiveData));
-
-    }
-
-    @Test
-    public void testReadBytes1() throws Exception {
-        JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData);
-
-        byte[] receiveValue1 = new byte[2];
-        bytesMessage.readBytes(receiveValue1, 2);
-        System.out.println(new String(receiveValue1));
-        Assert.assertEquals(new String(receiveData).substring(0, 2), new 
String(receiveValue1));
-
-        byte[] receiceValue2 = new byte[2];
-        bytesMessage.readBytes(receiceValue2, 2);
-        System.out.println(new String(receiceValue2));
-        Assert.assertEquals(new String(receiveData).substring(2, 4), new 
String(receiceValue2));
-
-    }
-
-    @Test
-    public void testWriteBytes() throws Exception {
-        JmsBytesMessage jmsBytesMessage = new JmsBytesMessage();
-        jmsBytesMessage.writeBytes(sendData);
-
-        System.out.println(new String(jmsBytesMessage.getData()));
-        Assert.assertEquals(new String(jmsBytesMessage.getData()), new 
String(sendData));
-
-    }
-
-    @Test
-    public void testException() throws Exception {
-        JmsBytesMessage jmsBytesMessage = new JmsBytesMessage();
-
-        byte[] receiveValue = new byte[receiveData.length];
-//        Throws out NullPointerException
-//        jmsBytesMessage.readBytes(receiveValue);
-
-        JmsBytesMessage sendMessage = new JmsBytesMessage(sendData);
-//        Throws out NullPointerException
-//        sendMessage.writeBytes("hello again".getBytes());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java
 
b/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java
deleted file mode 100644
index b570142..0000000
--- 
a/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package org.apache.rocketmq.jms.domain.message;
-
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.jms.domain.JmsBaseConstant;
-import org.apache.rocketmq.jms.domain.JmsBaseTopic;
-import org.apache.rocketmq.jms.util.MessageConverter;
-import org.apache.rocketmq.jms.util.MsgConvertUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class JmsMessageConvertTest {
-    @Test
-    public void testCovert2RMQ() throws Exception {
-        //init jmsBaseMessage
-        String topic = "TestTopic";
-        String messageType = "TagA";
-
-        JmsBaseMessage jmsBaseMessage = new JmsTextMessage("testText");
-        jmsBaseMessage.setHeader(JmsBaseConstant.JMS_DESTINATION, new 
JmsBaseTopic(topic, messageType));
-        jmsBaseMessage.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:null");
-        jmsBaseMessage.setHeader(JmsBaseConstant.JMS_REDELIVERED, 
Boolean.FALSE);
-
-        jmsBaseMessage.setObjectProperty(MsgConvertUtil.JMS_MSGMODEL, 
MsgConvertUtil.MSGMODEL_TEXT);
-        jmsBaseMessage.setObjectProperty(MsgConvertUtil.MSG_TOPIC, topic);
-        jmsBaseMessage.setObjectProperty(MsgConvertUtil.MSG_TYPE, messageType);
-        jmsBaseMessage.setObjectProperty(MessageConst.PROPERTY_TAGS, 
messageType);
-        jmsBaseMessage.setObjectProperty(MessageConst.PROPERTY_KEYS, 
messageType);
-
-        //convert to RMQMessage
-        MessageExt message = 
(MessageExt)MessageConverter.convert2RMQMessage(jmsBaseMessage);
-
-        System.out.println(message);
-
-        //then convert back to jmsBaseMessage
-        JmsBaseMessage jmsBaseMessageBack = 
MessageConverter.convert2JMSMessage(message);
-
-        JmsTextMessage jmsTextMessage = (JmsTextMessage) jmsBaseMessage;
-        JmsTextMessage jmsTextMessageBack = (JmsTextMessage) 
jmsBaseMessageBack;
-
-        Assert.assertEquals(jmsTextMessage.getText(), 
jmsTextMessageBack.getText());
-        Assert.assertEquals(jmsTextMessage.getJMSDestination().toString(), 
jmsTextMessageBack.getJMSDestination().toString());
-        Assert.assertEquals(jmsTextMessage.getJMSMessageID(), 
jmsTextMessageBack.getJMSMessageID());
-        Assert.assertEquals(jmsTextMessage.getJMSRedelivered(), 
jmsTextMessageBack.getJMSRedelivered());
-        
Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.JMS_MSGMODEL),
 jmsTextMessageBack.getHeaders().get(MsgConvertUtil.JMS_MSGMODEL));
-        
Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.MSG_TOPIC), 
jmsTextMessageBack.getHeaders().get(MsgConvertUtil.MSG_TOPIC));
-        
Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.MSG_TYPE), 
jmsTextMessageBack.getHeaders().get(MsgConvertUtil.MSG_TYPE));
-        
Assert.assertEquals(jmsTextMessage.getHeaders().get(MessageConst.PROPERTY_TAGS),
 jmsTextMessageBack.getHeaders().get(MessageConst.PROPERTY_TAGS));
-        
Assert.assertEquals(jmsTextMessage.getHeaders().get(MessageConst.PROPERTY_KEYS),
 jmsTextMessageBack.getHeaders().get(MessageConst.PROPERTY_KEYS));
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java
 
b/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java
deleted file mode 100644
index 6951976..0000000
--- 
a/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.domain.message;
-
-import java.io.Serializable;
-import javax.jms.JMSException;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class JmsObjectMessageTest {
-
-    @Test
-    public void testGetObject() {
-        JmsObjectMessage jmsObjectMessage = new JmsObjectMessage(new 
User("jack", 20));
-        try {
-            Assert.assertEquals(jmsObjectMessage.getObject(), new User("jack", 
20));
-        }
-        catch (JMSException e) {
-            e.printStackTrace();
-        }
-    }
-
-    @Test
-    public void testGetBody() {
-        JmsObjectMessage jmsObjectMessage = new JmsObjectMessage(new 
User("jack", 20));
-
-        try {
-            User user = (User)jmsObjectMessage.getBody(Object.class);
-            System.out.println(user.getName() + ": " + user.getAge());
-            Assert.assertEquals(jmsObjectMessage.getBody(Object.class), 
jmsObjectMessage.getObject());
-        }
-        catch (JMSException e) {
-            e.printStackTrace();
-        }
-    }
-
-    private class User implements Serializable {
-        private String name;
-        private int age;
-
-        private User(String name, int age) {
-            this.name = name;
-            this.age = age;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj)
-                return true;
-            if (obj == null || getClass() != obj.getClass())
-                return false;
-
-            User user = (User)obj;
-            if (age != user.getAge())
-                return false;
-            if (name != null ? !name.equals(user.getName()) : user.getName() 
!= null)
-                return false;
-            return true;
-        }
-
-        public int getAge() {
-            return age;
-        }
-
-        public void setAge(int age) {
-            this.age = age;
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        public void setName(String name) {
-            this.name = name;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java
 
b/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java
deleted file mode 100644
index d3c8287..0000000
--- 
a/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.domain.message;
-
-import javax.jms.JMSException;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class JmsTextMessageTest {
-    private String text = "jmsTextMessage test";
-
-    @Test
-    public void testGetBody() {
-        JmsTextMessage jmsTextMessage = new JmsTextMessage(text);
-        try {
-            Assert.assertEquals(jmsTextMessage.getBody(String.class), text);
-        }
-        catch (JMSException e) {
-            e.printStackTrace();
-        }
-    }
-
-    @Test
-    public void testSetGetText() {
-        JmsTextMessage jmsTextMessage = new JmsTextMessage();
-        jmsTextMessage.setText(text);
-        try {
-            Assert.assertEquals(jmsTextMessage.getText(), text);
-        }
-        catch (JMSException e) {
-            e.printStackTrace();
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java
 
b/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java
deleted file mode 100644
index 02fe111..0000000
--- 
a/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.integration;
-
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.namesrv.NamesrvConfig;
-import org.apache.rocketmq.jms.domain.CommonConstant;
-import org.apache.rocketmq.namesrv.NamesrvController;
-import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class IntegrationTestBase {
-    public static Logger logger = 
LoggerFactory.getLogger(IntegrationTestBase.class);
-
-    protected static Random random = new Random();
-    protected static final String SEP = File.separator;
-
-
-    protected static String topic = "jms-test";
-    protected static String topic2 = "jms-test-2";
-    protected static String messageType = "TagA";
-    protected static String producerId = "PID-jms-test";
-    protected static String consumerId = "CID-jms-test";
-    protected static String consumerId2 = "CID-jms-test-2";
-    protected static String nameServer;
-    protected static String text = "English test";
-    protected static int consumeThreadNums = 16;
-
-
-
-
-    protected static final String BROKER_NAME_PREFIX = "TestBrokerName_";
-    protected static final AtomicInteger BROKER_INDEX = new AtomicInteger(0);
-    protected static final List<File> TMPE_FILES = new ArrayList<File>();
-    protected static final List<BrokerController> BROKER_CONTROLLERS =  new 
ArrayList<BrokerController>();
-    protected static final List<NamesrvController> NAMESRV_CONTROLLERS = new 
ArrayList<NamesrvController>();
-
-
-    private static String createBaseDir() {
-        String baseDir = System.getProperty("user.home") + SEP + 
"unitteststore-" + UUID.randomUUID();
-        final File file = new File(baseDir);
-        if (file.exists()) {
-            System.out.println(String.format("[%s] has already existed, please 
bake up and remove it for integration tests", baseDir));
-            System.exit(1);
-        }
-        TMPE_FILES.add(file);
-        return baseDir;
-    }
-
-    public static NamesrvController createAndStartNamesrv() {
-        String baseDir = createBaseDir();
-        NamesrvConfig namesrvConfig = new NamesrvConfig();
-        NettyServerConfig nameServerNettyServerConfig = new 
NettyServerConfig();
-        namesrvConfig.setKvConfigPath(baseDir + SEP + "namesrv" + SEP + 
"kvConfig.json");
-
-        nameServerNettyServerConfig.setListenPort(9000 + random.nextInt(1000));
-        NamesrvController namesrvController = new 
NamesrvController(namesrvConfig, nameServerNettyServerConfig);
-        try {
-            Assert.assertTrue(namesrvController.initialize());
-            logger.info("Name Server Start:{}", 
nameServerNettyServerConfig.getListenPort());
-            namesrvController.start();
-        } catch (Exception e) {
-            System.out.println("Name Server start failed");
-            System.exit(1);
-        }
-        NAMESRV_CONTROLLERS.add(namesrvController);
-        return namesrvController;
-
-    }
-
-
-    public static BrokerController createAndStartBroker(String nsAddr) {
-        String baseDir = createBaseDir();
-        BrokerConfig brokerConfig = new BrokerConfig();
-        NettyServerConfig nettyServerConfig = new NettyServerConfig();
-        NettyClientConfig nettyClientConfig = new NettyClientConfig();
-        MessageStoreConfig storeConfig = new MessageStoreConfig();
-        brokerConfig.setBrokerName(BROKER_NAME_PREFIX + 
BROKER_INDEX.getAndIncrement());
-        brokerConfig.setBrokerIP1("127.0.0.1");
-        brokerConfig.setNamesrvAddr(nsAddr);
-        storeConfig.setStorePathRootDir(baseDir);
-        storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
-        storeConfig.setHaListenPort(8000 + random.nextInt(1000));
-        nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
-        BrokerController brokerController = new BrokerController(brokerConfig, 
nettyServerConfig, nettyClientConfig, storeConfig);
-        try {
-            Assert.assertTrue(brokerController.initialize());
-            logger.info("Broker Start name:{} addr:{}", 
brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
-            brokerController.start();
-        } catch (Exception e) {
-            System.out.println("Broker start failed");
-            System.exit(1);
-        }
-        BROKER_CONTROLLERS.add(brokerController);
-        return brokerController;
-    }
-
-
-
-    protected static DefaultMQAdminExt defaultMQAdminExt;
-
-    static {
-        //clear the environment
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override public void run() {
-                if (defaultMQAdminExt != null) {
-                    defaultMQAdminExt.shutdown();
-                }
-                for (NamesrvController namesrvController: NAMESRV_CONTROLLERS) 
{
-                    if (namesrvController != null) {
-                        namesrvController.shutdown();
-                    }
-                }
-                for (BrokerController brokerController: BROKER_CONTROLLERS) {
-                    if (brokerController != null) {
-                        brokerController.shutdown();
-                    }
-                }
-                for (File file : TMPE_FILES) {
-                    deleteFile(file);
-                }
-            }
-        });
-
-
-        NamesrvController namesrvController = 
IntegrationTestBase.createAndStartNamesrv();
-        nameServer = "127.0.0.1:" + 
namesrvController.getNettyServerConfig().getListenPort();
-        BrokerController brokerController = createAndStartBroker(nameServer);
-
-        defaultMQAdminExt = new DefaultMQAdminExt();
-        defaultMQAdminExt.setNamesrvAddr(nameServer);
-        try {
-            defaultMQAdminExt.start();
-        } catch (MQClientException e) {
-            System.out.println("DefaultMQAdminExt start failed");
-            System.exit(1);
-        }
-
-        createTopic(topic, brokerController.getBrokerAddr());
-
-
-    }
-
-    public static void deleteFile(File file) {
-        if (!file.exists()) {
-            return;
-        }
-        if (file.isFile()) {
-            file.delete();
-        } else if (file.isDirectory()) {
-            File[] files = file.listFiles();
-            for (int i = 0;i < files.length;i ++) {
-                deleteFile(files[i]);
-            }
-            file.delete();
-        }
-    }
-    public static void createTopic(String topic, String addr) {
-        TopicConfig topicConfig = new TopicConfig();
-        topicConfig.setTopicName(topic);
-        topicConfig.setReadQueueNums(4);
-        topicConfig.setWriteQueueNums(4);
-        try {
-            defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
-        } catch (Exception e) {
-            logger.error("Create topic:{} addr:{} failed", addr, topic);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java 
b/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java
deleted file mode 100644
index 367700a..0000000
--- a/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.integration;
-
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.jms.JmsTestListener;
-import org.apache.rocketmq.jms.JmsTestUtil;
-import org.apache.rocketmq.jms.domain.CommonConstant;
-import org.apache.rocketmq.jms.domain.JmsBaseConnectionFactory;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.apache.rocketmq.jms.JmsTestUtil.getRMQPushConsumerExt;
-
-public class JmsClientIT extends IntegrationTestBase {
-
-    @Test
-    public void testConfigInURI() throws Exception {
-        JmsBaseConnectionFactory connectionFactory = new 
JmsBaseConnectionFactory(new
-            
URI(String.format("rocketmq://xxx?%s=%s&%s=%s&%s=%s&%s=%s&%s=%s&%s=%s",
-            CommonConstant.PRODUCERID, producerId,
-            CommonConstant.CONSUMERID, consumerId,
-            CommonConstant.NAMESERVER, nameServer,
-            CommonConstant.CONSUME_THREAD_NUMS, consumeThreadNums,
-            CommonConstant.SEND_TIMEOUT_MILLIS, 10*1000,
-            CommonConstant.INSTANCE_NAME, "JMS_TEST")));
-
-        Connection connection = connectionFactory.createConnection();
-        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-        connection.start();
-        try {
-            Destination destination = session.createTopic(topic + ":" + 
messageType);
-            session.createConsumer(destination);
-            session.createProducer(destination);
-
-            DefaultMQPushConsumer rmqPushConsumer = (DefaultMQPushConsumer) 
getRMQPushConsumerExt(consumerId).getConsumer();
-            Assert.assertNotNull(rmqPushConsumer);
-            Assert.assertEquals(consumerId, 
rmqPushConsumer.getConsumerGroup());
-            Assert.assertEquals("JMS_TEST", rmqPushConsumer.getInstanceName());
-            Assert.assertEquals(consumeThreadNums, 
rmqPushConsumer.getConsumeThreadMax());
-            Assert.assertEquals(consumeThreadNums, 
rmqPushConsumer.getConsumeThreadMin());
-            Assert.assertEquals(nameServer, rmqPushConsumer.getNamesrvAddr());
-
-            DefaultMQProducer mqProducer = (DefaultMQProducer) 
JmsTestUtil.getMQProducer(producerId);
-            Assert.assertNotNull(mqProducer);
-            Assert.assertEquals(producerId, mqProducer.getProducerGroup());
-            Assert.assertEquals("JMS_TEST", mqProducer.getInstanceName());
-            Assert.assertEquals(10 * 1000, mqProducer.getSendMsgTimeout());
-            Assert.assertEquals(nameServer, mqProducer.getNamesrvAddr());
-
-            Thread.sleep(2000);
-        }
-        finally {
-            connection.close();
-        }
-
-    }
-
-
-    private Connection createConnection(String producerGroup, String 
consumerGroup) throws Exception {
-        JmsBaseConnectionFactory connectionFactory = new 
JmsBaseConnectionFactory(new
-            
URI(String.format("rocketmq://xxx?%s=%s&%s=%s&%s=%s&%s=%s&%s=%s&%s=%s",
-            CommonConstant.PRODUCERID, producerGroup,
-            CommonConstant.CONSUMERID, consumerGroup,
-            CommonConstant.NAMESERVER, nameServer,
-            CommonConstant.CONSUME_THREAD_NUMS, consumeThreadNums,
-            CommonConstant.SEND_TIMEOUT_MILLIS, 10*1000,
-            CommonConstant.INSTANCE_NAME, "JMS_TEST")));
-        return  connectionFactory.createConnection();
-    }
-
-    @Test
-    public void testProducerAndConsume_TwoConsumer() throws Exception {
-
-        Connection connection = createConnection(producerId, consumerId);
-        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-        Destination destinationA = session.createTopic("TopicA");
-        Destination destinationB = session.createTopic("TopicB");
-        final CountDownLatch countDownLatch = new CountDownLatch(2);
-        JmsTestListener listenerA = new JmsTestListener(10,countDownLatch);
-        JmsTestListener listenerB = new JmsTestListener(10, countDownLatch);
-
-        try {
-            //two consumers
-            MessageConsumer messageConsumerA = 
session.createConsumer(destinationA);
-            messageConsumerA.setMessageListener(listenerA);
-            MessageConsumer messageConsumerB = 
session.createConsumer(destinationB);
-            messageConsumerB.setMessageListener(listenerB);
-            //producer
-            MessageProducer messageProducer = 
session.createProducer(destinationA);
-            connection.start();
-
-            for (int i = 0; i < 10; i++) {
-                TextMessage message = session.createTextMessage(text + i);
-                Assert.assertNull(message.getJMSMessageID());
-                messageProducer.send(message);
-                Assert.assertNotNull(message.getJMSMessageID());
-            }
-            for (int i = 0; i < 10; i++) {
-                TextMessage message = session.createTextMessage(text + i);
-                Assert.assertNull(message.getJMSMessageID());
-                messageProducer.send(destinationB, message);
-                Assert.assertNotNull(message.getJMSMessageID());
-            }
-
-            if (countDownLatch.await(30, TimeUnit.SECONDS)) {
-                Thread.sleep(2000);
-            }
-            Assert.assertEquals(10, listenerA.getConsumedNum());
-            Assert.assertEquals(10, listenerB.getConsumedNum());
-        }
-        finally {
-            //Close the connection
-            connection.close();
-        }
-
-    }
-
-    @Test
-    public void testProducerAndConsume_TagFilter() throws Exception {
-        Connection connection = createConnection(producerId, consumerId);
-        Connection anotherConnection = createConnection(producerId, consumerId 
+"other");
-        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-        Session anotherSession = anotherConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-
-        Destination destinationA = session.createTopic("topic:tagA");
-        Destination destinationB = session.createTopic("topic:tagB");
-        final CountDownLatch countDownLatch = new CountDownLatch(2);
-        JmsTestListener listenerForTagA =  new JmsTestListener(10, 
countDownLatch);
-        JmsTestListener listenerForAll = new JmsTestListener(40, 
countDownLatch);
-        try {
-            
session.createConsumer(destinationA).setMessageListener(listenerForTagA);
-            
anotherSession.createConsumer(session.createTopic("topic")).setMessageListener(listenerForAll);
-            //producer
-            MessageProducer messageProducer = 
session.createProducer(destinationA);
-            connection.start();
-            anotherConnection.start();
-
-            for (int i = 0; i < 20; i++) {
-                TextMessage message = session.createTextMessage(text + i);
-                Assert.assertNull(message.getJMSMessageID());
-                messageProducer.send(message);
-                Assert.assertNotNull(message.getJMSMessageID());
-            }
-            for (int i = 0; i < 20; i++) {
-                TextMessage message = session.createTextMessage(text + i);
-                Assert.assertNull(message.getJMSMessageID());
-                messageProducer.send(destinationB, message);
-                Assert.assertNotNull(message.getJMSMessageID());
-            }
-
-            if (countDownLatch.await(30, TimeUnit.SECONDS)) {
-                Thread.sleep(2000);
-            }
-            Assert.assertEquals(20, listenerForTagA.getConsumedNum());
-            Assert.assertEquals(40, listenerForAll.getConsumedNum());
-        }
-        finally {
-            //Close the connection
-            connection.close();
-            anotherConnection.close();
-        }
-
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java 
b/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java
deleted file mode 100644
index 6cbb7b1..0000000
--- a/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.integration;
-
-import java.net.URI;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import org.apache.rocketmq.jms.domain.JmsBaseConnectionFactory;
-import org.apache.rocketmq.jms.domain.JmsBaseMessageConsumer;
-import org.apache.rocketmq.jms.domain.RMQPushConsumerExt;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.rocketmq.jms.JmsTestUtil.checkConsumerState;
-import static org.apache.rocketmq.jms.JmsTestUtil.getRMQPushConsumerExt;
-
-public class JmsConsumerIT extends IntegrationTestBase {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-
-    private  MessageListener listener = new MessageListener() {
-        @Override
-        public void onMessage(Message message) {
-            try {
-                Assert.assertNotNull(message);
-                Assert.assertNotNull(message.getJMSMessageID());
-            }
-            catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    };
-
-
-    @Test
-    public void testStartIdempotency() throws Exception {
-        JmsBaseConnectionFactory connectionFactory = new 
JmsBaseConnectionFactory(new
-            URI("rocketmq://xxx?consumerId=" + consumerId + "&nameServer=" + 
nameServer));
-        Connection connection = connectionFactory.createConnection();
-        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-
-        checkConsumerState(consumerId, true, false);
-        try {
-            Destination destination = session.createTopic(topic + ":" + 
messageType);
-            MessageConsumer consumer = session.createConsumer(destination);
-            consumer.setMessageListener(listener);
-
-            checkConsumerState(consumerId, false, false);
-
-            ((JmsBaseMessageConsumer) consumer).startConsumer();
-            checkConsumerState(consumerId, false, true);
-
-            Destination destination1 = session.createTopic(topic2 + ":" + 
messageType);
-            MessageConsumer consumer1 = session.createConsumer(destination1);
-            consumer1.setMessageListener(listener);
-
-            ((JmsBaseMessageConsumer) consumer1).startConsumer();
-            checkConsumerState(consumerId, false, true);
-
-            //the start is idempotent
-            connection.start();
-            connection.start();
-
-            Thread.sleep(5000);
-        }
-        finally {
-            connection.close();
-        }
-    }
-
-    @Test
-    public void testReferenceCount() throws Exception {
-        JmsBaseConnectionFactory connectionFactory = new 
JmsBaseConnectionFactory(new
-            URI("rocketmq://xxx?consumerId=" + consumerId + "&nameServer=" + 
nameServer));
-        Connection connection = connectionFactory.createConnection();
-        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-
-        connection.start();
-        try {
-            Destination destination = session.createTopic(topic + ":" + 
messageType);
-            MessageConsumer consumer = session.createConsumer(destination);
-            consumer.setMessageListener(listener);
-
-            RMQPushConsumerExt rmqPushConsumerExt = 
getRMQPushConsumerExt(consumerId);
-            Assert.assertNotNull(rmqPushConsumerExt);
-            Assert.assertEquals(1, rmqPushConsumerExt.getReferenceCount());
-
-
-            MessageConsumer consumer2 = session.createConsumer(destination);
-            Assert.assertEquals(2, rmqPushConsumerExt.getReferenceCount());
-
-            MessageConsumer consumer3 = 
session.createConsumer(session.createTopic(topic + ":" + messageType));
-
-            Assert.assertEquals(3, rmqPushConsumerExt.getReferenceCount());
-
-            session.close();
-
-            Assert.assertEquals(0, rmqPushConsumerExt.getReferenceCount());
-            Assert.assertEquals(false, rmqPushConsumerExt.isStarted());
-            Assert.assertNull(getRMQPushConsumerExt(consumerId));
-
-            Thread.sleep(5000);
-        }
-        finally {
-            connection.close();
-        }
-    }
-
-}


Reply via email to