http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQMessage.java b/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQMessage.java deleted file mode 100644 index 0db4f5e..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQMessage.java +++ /dev/null @@ -1,431 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg; - -import com.google.common.collect.Maps; -import com.google.common.io.BaseEncoding; -import java.io.Serializable; -import java.util.Enumeration; -import java.util.Map; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageNotWriteableException; -import org.apache.commons.lang.builder.ToStringBuilder; -import org.apache.rocketmq.jms.Constant; -import org.apache.rocketmq.jms.support.JmsHelper; -import org.apache.rocketmq.jms.support.DirectTypeConverter; - -//todo: add unit test after finishing JMS Properties -public class RocketMQMessage implements javax.jms.Message { - - protected Map<String, Object> properties = Maps.newHashMap(); - protected Map<String, Object> headers = Maps.newHashMap(); - protected Serializable body; - - protected boolean writeOnly; - - @Override - public String getJMSMessageID() { - return DirectTypeConverter.convert2String(headers.get(Constant.JMS_MESSAGE_ID)); - } - - /** - * Sets the message ID. - * <p/> - * <P>JMS providers set this field when a message is sent. Do not allow User to set the message ID by yourself. - * - * @param id the ID of the message - * @see javax.jms.Message#getJMSMessageID() - */ - - @Override - public void setJMSMessageID(String id) { - JmsHelper.handleUnSupportedException(); - } - - @Override - public long getJMSTimestamp() { - if (headers.containsKey(Constant.JMS_TIMESTAMP)) { - return DirectTypeConverter.convert2Long(headers.get(Constant.JMS_TIMESTAMP)); - } - return 0; - } - - @Override - public void setJMSTimestamp(long timestamp) { - JmsHelper.handleUnSupportedException(); - } - - @Override - public byte[] getJMSCorrelationIDAsBytes() { - String jmsCorrelationID = getJMSCorrelationID(); - if (jmsCorrelationID != null) { - try { - return BaseEncoding.base64().decode(jmsCorrelationID); - } - catch (Exception e) { - return jmsCorrelationID.getBytes(); - } - } - return null; - } - - @Override - public void setJMSCorrelationIDAsBytes(byte[] correlationID) { - String encodedText = BaseEncoding.base64().encode(correlationID); - setJMSCorrelationID(encodedText); - } - - @Override - public String getJMSCorrelationID() { - if (headers.containsKey(Constant.JMS_CORRELATION_ID)) { - return DirectTypeConverter.convert2String(headers.get(Constant.JMS_CORRELATION_ID)); - } - return null; - } - - @Override - public void setJMSCorrelationID(String correlationID) { - JmsHelper.handleUnSupportedException(); - } - - @Override - public Destination getJMSReplyTo() { - if (headers.containsKey(Constant.JMS_REPLY_TO)) { - return DirectTypeConverter.convert2Object(headers.get(Constant.JMS_REPLY_TO), Destination.class); - } - return null; - } - - @Override - public void setJMSReplyTo(Destination replyTo) { - JmsHelper.handleUnSupportedException(); - } - - @Override - public String toString() { - return ToStringBuilder.reflectionToString(this); - } - - @Override - public Destination getJMSDestination() { - if (headers.containsKey(Constant.JMS_DESTINATION)) { - return DirectTypeConverter.convert2Object(headers.get(Constant.JMS_DESTINATION), Destination.class); - } - return null; - } - - @Override - public void setJMSDestination(Destination destination) { - JmsHelper.handleUnSupportedException(); - } - - @SuppressWarnings("unchecked") - public <T> T getBody(Class<T> clazz) throws JMSException { - if (clazz.isInstance(body)) { - return DirectTypeConverter.convert2Object(body, clazz); - } - else { - throw new IllegalArgumentException("The class " + clazz - + " is unknown to this implementation"); - } - } - - @Override - public int getJMSDeliveryMode() { - if (headers.containsKey(Constant.JMS_DELIVERY_MODE)) { - return DirectTypeConverter.convert2Integer(headers.get(Constant.JMS_DELIVERY_MODE)); - } - return 0; - } - - /** - * Sets the <CODE>DeliveryMode</CODE> value for this message. - * <p/> - * <P>JMS providers set this field when a message is sent. ONS only support DeliveryMode.PERSISTENT mode. So do not - * allow User to set this by yourself, but you can get the default mode by <CODE>getJMSDeliveryMode</CODE> method. - * - * @param deliveryMode the delivery mode for this message - * @see javax.jms.Message#getJMSDeliveryMode() - * @see javax.jms.DeliveryMode - */ - - @Override - public void setJMSDeliveryMode(int deliveryMode) { - JmsHelper.handleUnSupportedException(); - } - - @Override - public boolean getJMSRedelivered() { - return headers.containsKey(Constant.JMS_REDELIVERED) - && DirectTypeConverter.convert2Boolean(headers.get(Constant.JMS_REDELIVERED)); - } - - @Override - public void setJMSRedelivered(boolean redelivered) { - JmsHelper.handleUnSupportedException(); - } - - @Override - public String getJMSType() { - return DirectTypeConverter.convert2String(headers.get(Constant.JMS_TYPE)); - } - - @Override - public void setJMSType(String type) { - JmsHelper.handleUnSupportedException(); - } - - public Map<String, Object> getHeaders() { - return this.headers; - } - - @Override - public long getJMSExpiration() { - if (headers.containsKey(Constant.JMS_EXPIRATION)) { - return DirectTypeConverter.convert2Long(headers.get(Constant.JMS_EXPIRATION)); - } - return 0; - } - - @Override - public void setJMSExpiration(long expiration) { - JmsHelper.handleUnSupportedException(); - } - - public boolean headerExits(String name) { - return this.headers.containsKey(name); - } - - @Override - public int getJMSPriority() { - if (headers.containsKey(Constant.JMS_PRIORITY)) { - return DirectTypeConverter.convert2Integer(headers.get(Constant.JMS_PRIORITY)); - } - return 5; - } - - @Override - public void setJMSPriority(int priority) { - JmsHelper.handleUnSupportedException(); - } - - public void setHeader(String name, Object value) { - this.headers.put(name, value); - } - - public Map<String, Object> getProperties() { - return this.properties; - } - - public void setProperties(Map<String, Object> properties) { - this.properties = properties; - } - - @Override - public void acknowledge() throws JMSException { - throw new UnsupportedOperationException("Unsupported!"); - } - - @Override - public void clearProperties() { - this.properties.clear(); - } - - @Override - public void clearBody() { - this.body = null; - this.writeOnly = true; - } - - @Override - public boolean propertyExists(String name) { - return properties.containsKey(name); - } - - @Override - public boolean getBooleanProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return Boolean.valueOf(value.toString()); - } - return false; - } - - @Override - public byte getByteProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return Byte.valueOf(value.toString()); - } - return 0; - } - - @Override - public short getShortProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return Short.valueOf(value.toString()); - } - return 0; - } - - @Override - public int getIntProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return Integer.valueOf(value.toString()); - } - return 0; - } - - @Override - public long getLongProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return Long.valueOf(value.toString()); - } - return 0L; - } - - @Override - public float getFloatProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return Float.valueOf(value.toString()); - } - return 0f; - } - - @Override - public double getDoubleProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return Double.valueOf(value.toString()); - } - return 0d; - } - - @Override - public String getStringProperty(String name) throws JMSException { - if (propertyExists(name)) { - return getObjectProperty(name).toString(); - } - return null; - } - - @Override - public Object getObjectProperty(String name) throws JMSException { - return this.properties.get(name); - } - - @Override - public Enumeration<?> getPropertyNames() throws JMSException { - final Object[] keys = this.properties.keySet().toArray(); - return new Enumeration<Object>() { - int i; - - @Override - public boolean hasMoreElements() { - return i < keys.length; - } - - @Override - public Object nextElement() { - return keys[i++]; - } - }; - } - - @Override - public void setBooleanProperty(String name, boolean value) { - setObjectProperty(name, value); - } - - @Override - public void setByteProperty(String name, byte value) { - setObjectProperty(name, value); - } - - @Override - public void setShortProperty(String name, short value) { - setObjectProperty(name, value); - } - - @Override - public void setIntProperty(String name, int value) { - setObjectProperty(name, value); - } - - @Override - public void setLongProperty(String name, long value) { - setObjectProperty(name, value); - } - - public void setFloatProperty(String name, float value) { - setObjectProperty(name, value); - } - - @Override - public void setDoubleProperty(String name, double value) { - setObjectProperty(name, value); - } - - @Override - public void setStringProperty(String name, String value) { - setObjectProperty(name, value); - } - - @Override - public long getJMSDeliveryTime() throws JMSException { - // todo - return 0; - } - - @Override - public void setJMSDeliveryTime(long deliveryTime) throws JMSException { - // todo - } - - @Override - public boolean isBodyAssignableTo(Class c) throws JMSException { - return c.isInstance(body); - } - - @Override - public void setObjectProperty(String name, Object value) { - if (value instanceof Number || value instanceof String || value instanceof Boolean) { - this.properties.put(name, value); - } - else { - throw new IllegalArgumentException( - "Value should be boolean, byte, short, int, long, float, double, and String."); - } - } - - protected boolean isWriteOnly() { - return writeOnly; - } - - protected void checkIsWriteOnly() throws MessageNotWriteableException { - if (!writeOnly) { - throw new MessageNotWriteableException("Not writable"); - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQObjectMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQObjectMessage.java b/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQObjectMessage.java deleted file mode 100644 index b5859ea..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQObjectMessage.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg; - -import javax.jms.JMSException; -import java.io.Serializable; - -public class RocketMQObjectMessage extends RocketMQMessage implements javax.jms.ObjectMessage { - - public RocketMQObjectMessage(Serializable object) { - this.body = object; - } - - public RocketMQObjectMessage() { - - } - - public Serializable getObject() throws JMSException { - return this.body; - } - - public void setObject(Serializable object) throws JMSException { - this.body = object; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQTextMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQTextMessage.java b/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQTextMessage.java deleted file mode 100644 index 3fb9518..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQTextMessage.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg; - -import javax.jms.JMSException; - -public class RocketMQTextMessage extends RocketMQMessage implements javax.jms.TextMessage { - private String text; - - public RocketMQTextMessage() { - - } - - public RocketMQTextMessage(String text) { - setText(text); - } - - public void clearBody() { - this.text = null; - super.clearBody(); - } - - public String getText() throws JMSException { - return this.text; - } - - public void setText(String text) { - this.body = text; - this.text = text; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java new file mode 100644 index 0000000..b979d74 --- /dev/null +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.serialize; + +import com.alibaba.fastjson.JSON; +import java.util.Map; +import javax.jms.JMSException; + +public class MapSerialize implements Serialize<Map> { + + @Override public byte[] serialize(Map map) throws JMSException { + return JSON.toJSONBytes(map); + } + + @Override public Map deserialize(byte[] bytes) throws JMSException { + return JSON.parseObject(bytes, Map.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java new file mode 100644 index 0000000..38ffa2d --- /dev/null +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.serialize; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import javax.jms.JMSException; + +public class ObjectSerialize implements Serialize<Object> { + + public byte[] serialize(Object object) throws JMSException { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(object); + oos.close(); + baos.close(); + return baos.toByteArray(); + } + catch (IOException e) { + throw new JMSException(e.getMessage()); + } + } + + public Serializable deserialize(byte[] bytes) throws JMSException { + try { + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + ObjectInputStream ois = new ObjectInputStream(bais); + ois.close(); + bais.close(); + return (Serializable)ois.readObject(); + } + catch (IOException e) { + throw new JMSException(e.getMessage()); + } + catch (ClassNotFoundException e) { + throw new JMSException(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java new file mode 100644 index 0000000..8e4224f --- /dev/null +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.serialize; + +import javax.jms.JMSException; + +public interface Serialize<T> { + + static final byte[] EMPTY_BYTES = new byte[0]; + + byte[] serialize(T t) throws JMSException; + + T deserialize(byte[] bytes) throws JMSException; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java new file mode 100644 index 0000000..a6dca20 --- /dev/null +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.serialize; + +import com.google.common.base.Charsets; +import java.nio.charset.Charset; +import javax.jms.JMSException; + +public class StringSerialize implements Serialize<String> { + + public static final String EMPTY_STRING = ""; + public static final Charset DEFAULT_CHARSET = Charsets.UTF_8; + + @Override public byte[] serialize(String s) throws JMSException { + if (null == s) { + return EMPTY_BYTES; + } + try { + return s.getBytes(DEFAULT_CHARSET); + } + catch (Exception e) { + throw new JMSException(e.getMessage()); + } + } + + @Override public String deserialize(byte[] bytes) throws JMSException { + if (null == bytes) { + return EMPTY_STRING; + } + try { + return new String(bytes, DEFAULT_CHARSET); + } + catch (Exception e) { + throw new JMSException(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/support/MessageConverter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/support/MessageConverter.java b/core/src/main/java/org/apache/rocketmq/jms/support/MessageConverter.java index 0770170..5c86237 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/support/MessageConverter.java +++ b/core/src/main/java/org/apache/rocketmq/jms/support/MessageConverter.java @@ -17,9 +17,7 @@ package org.apache.rocketmq.jms.support; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageExt; +import com.alibaba.fastjson.JSON; import com.google.common.base.Charsets; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -27,214 +25,86 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; -import java.util.Arrays; +import java.nio.charset.Charset; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.atomic.AtomicLong; -import javax.jms.BytesMessage; -import javax.jms.Destination; import javax.jms.JMSException; -import javax.jms.ObjectMessage; -import javax.jms.TextMessage; -import javax.jms.Topic; -import org.apache.commons.lang.StringUtils; -import org.apache.rocketmq.jms.Constant; -import org.apache.rocketmq.jms.JmsContent; -import org.apache.rocketmq.jms.RocketMQQueue; +import javax.jms.Message; +import javax.jms.StreamMessage; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.jms.JMSHeaderEnum; +import org.apache.rocketmq.jms.JMSMessageModelEnum; import org.apache.rocketmq.jms.RocketMQTopic; -import org.apache.rocketmq.jms.msg.RocketMQBytesMessage; -import org.apache.rocketmq.jms.msg.RocketMQMessage; -import org.apache.rocketmq.jms.msg.RocketMQObjectMessage; -import org.apache.rocketmq.jms.msg.RocketMQTextMessage; +import org.apache.rocketmq.jms.msg.AbstractJMSMessage; +import org.apache.rocketmq.jms.msg.JMSBytesMessage; +import org.apache.rocketmq.jms.msg.JMSMapMessage; +import org.apache.rocketmq.jms.msg.JMSObjectMessage; +import org.apache.rocketmq.jms.msg.JMSTextMessage; -import static com.google.common.base.Preconditions.checkState; -import static org.apache.rocketmq.jms.Constant.NO_MESSAGE_SELECTOR; +import static java.lang.String.format; +import static org.apache.rocketmq.jms.JMSMessageModelEnum.MSG_MODEL_NAME; public class MessageConverter { public static final String EMPTY_STRING = ""; - public static final String JMS_MSGMODEL = "jmsMsgModel"; - /** - * To adapt this scene: "Notify client try to receive ObjectMessage sent by JMS client" Set notify out message - * model, value can be textMessage OR objectMessage - */ - public static final String COMPATIBLE_FIELD_MSGMODEL = "notifyOutMsgModel"; - public static final String MSGMODEL_TEXT = "textMessage"; - public static final String MSGMODEL_BYTES = "bytesMessage"; - public static final String MSGMODEL_OBJ = "objectMessage"; - - public static final String MSG_TOPIC = "msgTopic"; - public static final String MSG_TYPE = "msgType"; - - public static final byte[] EMPTY_BYTES = new byte[0]; - private static AtomicLong counter = new AtomicLong(1L); - - public static JmsContent getContentFromJms(javax.jms.Message jmsMessage) throws JMSException { + public static Object getBodyFromJMSMessage(javax.jms.Message jmsMessage) throws JMSException { if (jmsMessage == null) { return null; } - JmsContent jmsContent = new JmsContent(); - if (jmsMessage instanceof TextMessage) { - if (StringUtils.isEmpty(((TextMessage) jmsMessage).getText())) { - throw new IllegalArgumentException("Message body length is zero"); - } - jmsContent.setMessageModel(MSGMODEL_TEXT); - jmsContent.setContent(string2Bytes(((TextMessage) jmsMessage).getText(), - Charsets.UTF_8.toString())); - } - else if (jmsMessage instanceof ObjectMessage) { - if (((ObjectMessage) jmsMessage).getObject() == null) { - throw new IllegalArgumentException("Message body length is zero"); - } - try { - jmsContent.setMessageModel(MSGMODEL_OBJ); - jmsContent.setContent(objectSerialize(((ObjectMessage) jmsMessage).getObject())); - } - catch (IOException e) { - throw new JMSException(e.getMessage()); - } - } - else if (jmsMessage instanceof BytesMessage) { - RocketMQBytesMessage bytesMessage = (RocketMQBytesMessage) jmsMessage; - if (bytesMessage.getBodyLength() == 0) { - throw new IllegalArgumentException("Message body length is zero"); - } - jmsContent.setMessageModel(MSGMODEL_BYTES); - jmsContent.setContent(bytesMessage.getData()); - } - else { - throw new IllegalArgumentException("Unknown message type " + jmsMessage.getJMSType()); + if (StreamMessage.class.isInstance(jmsMessage)) { + throw new UnsupportedOperationException(StreamMessage.class.getSimpleName() + " is not supported"); } - - return jmsContent; + return jmsMessage.getBody(Object.class); } - public static RocketMQMessage convert2JMSMessage(MessageExt msg) throws JMSException { + public static Message convert2JMSMessage(MessageExt msg) throws Exception { if (msg == null) { return null; } - RocketMQMessage message; - if (MSGMODEL_BYTES.equals( - msg.getUserProperty(JMS_MSGMODEL))) { - message = new RocketMQBytesMessage(msg.getBody()); - } - else if (MSGMODEL_OBJ.equals( - msg.getUserProperty(JMS_MSGMODEL))) { - try { - message = new RocketMQObjectMessage(objectDeserialize(msg.getBody())); - } - catch (Exception e) { - throw new JMSException(e.getMessage()); - } - } - else if (MSGMODEL_TEXT.equals( - msg.getUserProperty(JMS_MSGMODEL))) { - message = new RocketMQTextMessage(bytes2String(msg.getBody(), - Charsets.UTF_8.toString())); - } - else { - // rocketmq producer sends bytesMessage without setting JMS_MSGMODEL. - message = new RocketMQBytesMessage(msg.getBody()); + AbstractJMSMessage message; + final String msgModel = msg.getUserProperty(MSG_MODEL_NAME); + switch (JMSMessageModelEnum.valueOf(msgModel)) { + case BYTE: + message = new JMSBytesMessage(msg.getBody()); + break; + case MAP: + message = new JMSMapMessage(JSON.parseObject(new String(msg.getBody()), HashMap.class)); + break; + case OBJECT: + message = new JMSObjectMessage(objectDeserialize(msg.getBody())); + break; + case STRING: + message = new JMSTextMessage(bytes2String(msg.getBody(), Charsets.UTF_8)); + break; + default: + throw new JMSException(format("The type[%s] is not supported", msgModel)); } //-------------------------set headers------------------------- - Map<String, Object> properties = new HashMap<String, Object>(); - - message.setHeader(Constant.JMS_MESSAGE_ID, "ID:" + msg.getMsgId()); - - if (msg.getReconsumeTimes() > 0) { - message.setHeader(Constant.JMS_REDELIVERED, Boolean.TRUE); - } - else { - message.setHeader(Constant.JMS_REDELIVERED, Boolean.FALSE); - } + message.setJMSMessageID(msg.getUserProperty(JMSHeaderEnum.JMSMessageID.name())); + message.setJMSTimestamp(msg.getBornTimestamp()); + message.setJMSExpiration(Long.valueOf(msg.getUserProperty(JMSHeaderEnum.JMSExpiration.name()))); + message.setJMSRedelivered(msg.getReconsumeTimes() > 0 ? true : false); + //todo: what about Queue? + message.setJMSDestination(new RocketMQTopic(msg.getTopic())); Map<String, String> propertiesMap = msg.getProperties(); if (propertiesMap != null) { for (String properName : propertiesMap.keySet()) { String properValue = propertiesMap.get(properName); - if (Constant.JMS_DESTINATION.equals(properName)) { - String destinationStr = properValue; - if (null != destinationStr) { - List<String> msgTuple = Arrays.asList(destinationStr.split(":")); - //todo: what about Queue? - RocketMQTopic topic = null; - if (msgTuple.size() == 1) { - topic = new RocketMQTopic(msgTuple.get(0)); - } - else { - topic = new RocketMQTopic(msgTuple.get(0), msgTuple.get(1)); - } - message.setHeader(Constant.JMS_DESTINATION, - topic); - } - } - else if (Constant.JMS_DELIVERY_MODE.equals(properName) || - Constant.JMS_PRIORITY.equals(properName)) { - message.setHeader(properName, properValue); - } - else if (Constant.JMS_TIMESTAMP.equals(properName) || - Constant.JMS_EXPIRATION.equals(properName)) { - message.setHeader(properName, properValue); - } - else if (Constant.JMS_CORRELATION_ID.equals(properName) || - Constant.JMS_TYPE.equals(properName)) { - message.setHeader(properName, properValue); - } - else if (Constant.JMS_MESSAGE_ID.equals(properName) || - Constant.JMS_REDELIVERED.equals(properName)) { - //JMS_MESSAGE_ID should set by msg.getMsgID() - continue; - } - else { - properties.put(properName, properValue); - } + message.setStringProperty(properName, properValue); } } - //Handle System properties, put into header. - message.setProperties(properties); - return message; } - public static byte[] objectSerialize(Object object) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(object); - oos.close(); - baos.close(); - return baos.toByteArray(); - } - - public static Serializable objectDeserialize(byte[] bytes) throws IOException, ClassNotFoundException { - ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - ObjectInputStream ois = new ObjectInputStream(bais); - ois.close(); - bais.close(); - return (Serializable) ois.readObject(); - } - - public static final byte[] string2Bytes(String s, String charset) { - if (null == s) { - return EMPTY_BYTES; - } - byte[] bs = null; - try { - bs = s.getBytes(charset); - } - catch (Exception e) { - // ignore - } - return bs; - } - - public static final String bytes2String(byte[] bs, String charset) { + public static final String bytes2String(byte[] bs, Charset charset) { if (null == bs) { return EMPTY_STRING; } @@ -248,89 +118,39 @@ public class MessageConverter { return s; } - public static Message convert2RMQMessage(RocketMQMessage jmsMsg) throws Exception { - Message rmqMsg = new MessageExt(); + public static MessageExt convert2RMQMessage(AbstractJMSMessage jmsMsg) throws Exception { + MessageExt rmqMsg = new MessageExt(); - rmqMsg.setKeys(System.currentTimeMillis() + "" + counter.incrementAndGet()); + rmqMsg.putUserProperty(JMSHeaderEnum.JMSMessageID.name(), jmsMsg.getJMSMessageID()); + rmqMsg.setBornTimestamp(jmsMsg.getJMSTimestamp()); + rmqMsg.putUserProperty(JMSHeaderEnum.JMSExpiration.name(), String.valueOf(jmsMsg.getJMSExpiration())); + rmqMsg.setKeys(jmsMsg.getJMSMessageID()); // 1. Transform message body - rmqMsg.setBody(MessageConverter.getContentFromJms(jmsMsg).getContent()); - - // 2. Transform topic and messageType - String topic, tag; - Destination destination = (Destination) jmsMsg.getHeaders().get(Constant.JMS_DESTINATION); - if (destination instanceof Topic) { - topic = ((RocketMQTopic) destination).getTopicName(); - tag = ((RocketMQTopic) destination).getTypeName(); - } - else { - topic = ((RocketMQQueue) destination).getQueueName(); - tag = NO_MESSAGE_SELECTOR; - } - checkState(!tag.contains("||"), "'||' can not be in the destination when sending a message"); - rmqMsg.setTopic(topic); - rmqMsg.setTags(tag); + rmqMsg.setBody(MessageConverter.getBodyFromJMSMessage(jmsMsg)); - // 3. Transform message properties - Properties properties = getAllProperties(jmsMsg, topic, tag); + // 2. Transform message properties + Properties properties = getAllProperties(jmsMsg); for (String name : properties.stringPropertyNames()) { String value = properties.getProperty(name); - if (MessageConst.PROPERTY_KEYS.equals(name)) { - rmqMsg.setKeys(value); - } - else if (MessageConst.PROPERTY_TAGS.equals(name)) { - rmqMsg.setTags(value); - } - else if (MessageConst.PROPERTY_DELAY_TIME_LEVEL.equals(name)) { - rmqMsg.setDelayTimeLevel(Integer.parseInt(value)); - } - else if (MessageConst.PROPERTY_WAIT_STORE_MSG_OK.equals(name)) { - rmqMsg.setWaitStoreMsgOK(Boolean.parseBoolean(value)); - } - else if (MessageConst.PROPERTY_BUYER_ID.equals(name)) { - rmqMsg.setBuyerId(value); - } - else { - rmqMsg.putUserProperty(name, value); - } + rmqMsg.putUserProperty(name, value); } return rmqMsg; } - private static Properties getAllProperties(RocketMQMessage rmqJmsMsg, - String topic, String tag) throws JMSException { + private static Properties getAllProperties(AbstractJMSMessage jmsMsg) throws JMSException { Properties userProperties = new Properties(); - //Jms userProperties to ONS properties - Map<String, Object> userProps = rmqJmsMsg.getProperties(); + Map<String, Object> userProps = jmsMsg.getProperties(); Iterator<Map.Entry<String, Object>> userPropsIter = userProps.entrySet().iterator(); while (userPropsIter.hasNext()) { Map.Entry<String, Object> entry = userPropsIter.next(); userProperties.setProperty(entry.getKey(), entry.getValue().toString()); } - //Jms systemProperties to ONS properties - Map<String, Object> sysProps = rmqJmsMsg.getHeaders(); - Iterator<Map.Entry<String, Object>> sysPropsIter = sysProps.entrySet().iterator(); - while (sysPropsIter.hasNext()) { - Map.Entry<String, Object> entry = sysPropsIter.next(); - userProperties.setProperty(entry.getKey(), entry.getValue().toString()); - } //Jms message Model - if (rmqJmsMsg instanceof RocketMQBytesMessage) { - userProperties.setProperty(JMS_MSGMODEL, MSGMODEL_BYTES); - } - else if (rmqJmsMsg instanceof RocketMQObjectMessage) { - userProperties.setProperty(JMS_MSGMODEL, MSGMODEL_OBJ); - } - else if (rmqJmsMsg instanceof RocketMQTextMessage) { - userProperties.setProperty(JMS_MSGMODEL, MSGMODEL_TEXT); - } - - //message topic and tag - userProperties.setProperty(MSG_TOPIC, topic); - userProperties.setProperty(MSG_TYPE, tag); + userProperties.setProperty(MSG_MODEL_NAME, JMSMessageModelEnum.toMsgModelEnum(jmsMsg.getClass()).name()) return userProperties; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/test/java/org/apache/rocketmq/jms/msg/JmsMapMessageTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/msg/JmsMapMessageTest.java b/core/src/test/java/org/apache/rocketmq/jms/msg/JmsMapMessageTest.java new file mode 100644 index 0000000..7befeb7 --- /dev/null +++ b/core/src/test/java/org/apache/rocketmq/jms/msg/JmsMapMessageTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg; + +import javax.jms.JMSException; +import javax.jms.MessageNotWriteableException; +import org.junit.Test; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +public class JmsMapMessageTest { + + @Test + public void testGetBoolean() throws Exception { + JMSMapMessage msg = new JMSMapMessage(); + + // get an empty value will return false + assertThat(msg.getBoolean("man"), is(false)); + + // get an not empty value + msg.setBoolean("man", true); + assertThat(msg.getBoolean("man"), is(true)); + + // key is null + try { + msg.getBoolean(null); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + + // in read-only model + msg.setReadOnly(true); + try { + msg.setBoolean("man", true); + assertTrue(false); + } + catch (MessageNotWriteableException e) { + assertTrue(true); + } + + // both read and write are allowed after clearBody() + msg.clearBody(); + msg.setBoolean("man", false); + msg.getBoolean("man"); + + // map is empty after clearBody() + msg.clearBody(); + assertThat(msg.getBoolean("man"), is(false)); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessageTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessageTest.java b/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessageTest.java index d641454..7664c08 100644 --- a/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessageTest.java +++ b/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessageTest.java @@ -31,23 +31,23 @@ public class RocketMQBytesMessageTest { @Test public void testGetData() throws Exception { - RocketMQBytesMessage readMessage = new RocketMQBytesMessage(receiveData); + JMSBytesMessage readMessage = new JMSBytesMessage(receiveData); assertThat(new String(receiveData), is(new String(readMessage.getData()))); - RocketMQBytesMessage sendMessage = new RocketMQBytesMessage(); + JMSBytesMessage sendMessage = new JMSBytesMessage(); sendMessage.writeBytes(sendData, 0, sendData.length); assertThat(new String(sendData), is(new String(sendMessage.getData()))); } @Test public void testGetBodyLength() throws Exception { - RocketMQBytesMessage msg = new RocketMQBytesMessage(receiveData); + JMSBytesMessage msg = new JMSBytesMessage(receiveData); assertThat(msg.getBodyLength(), is(new Long(receiveData.length))); } @Test public void testReadBytes1() throws Exception { - RocketMQBytesMessage msg = new RocketMQBytesMessage(receiveData); + JMSBytesMessage msg = new JMSBytesMessage(receiveData); byte[] receiveValue = new byte[receiveData.length]; msg.readBytes(receiveValue); assertThat(new String(receiveValue), is(new String(receiveData))); @@ -56,7 +56,7 @@ public class RocketMQBytesMessageTest { @Test public void testReadBytes2() throws Exception { - RocketMQBytesMessage msg = new RocketMQBytesMessage(receiveData); + JMSBytesMessage msg = new JMSBytesMessage(receiveData); byte[] receiveValue1 = new byte[2]; msg.readBytes(receiveValue1); @@ -70,34 +70,34 @@ public class RocketMQBytesMessageTest { @Test public void testWriteBytes() throws Exception { - RocketMQBytesMessage msg = new RocketMQBytesMessage(); + JMSBytesMessage msg = new JMSBytesMessage(); msg.writeBytes(sendData); assertThat(new String(msg.getData()), is(new String(sendData))); } @Test(expected = MessageNotReadableException.class) public void testNotReadableException() throws Exception { - RocketMQBytesMessage msg = new RocketMQBytesMessage(); + JMSBytesMessage msg = new JMSBytesMessage(); msg.writeBoolean(true); msg.readBoolean(); } @Test(expected = MessageNotWriteableException.class) public void testNotWritableException() throws Exception { - RocketMQBytesMessage msg = new RocketMQBytesMessage(receiveData); + JMSBytesMessage msg = new JMSBytesMessage(receiveData); msg.writeBoolean(true); } @Test public void testClearBody() throws Exception { - RocketMQBytesMessage msg = new RocketMQBytesMessage(receiveData); + JMSBytesMessage msg = new JMSBytesMessage(receiveData); msg.clearBody(); msg.writeBoolean(true); } @Test public void testReset() throws Exception { - RocketMQBytesMessage msg = new RocketMQBytesMessage(receiveData); + JMSBytesMessage msg = new JMSBytesMessage(receiveData); byte[] b = new byte[2]; msg.readBytes(b); msg.reset(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQMapMessageTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQMapMessageTest.java b/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQMapMessageTest.java deleted file mode 100644 index 2a7597f..0000000 --- a/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQMapMessageTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg; - -import javax.jms.JMSException; -import javax.jms.MessageNotWriteableException; -import org.junit.Test; - -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -public class RocketMQMapMessageTest { - - @Test - public void testGetBoolean() throws Exception { - RocketMQMapMessage msg = new RocketMQMapMessage(); - - // get an empty value will return false - assertThat(msg.getBoolean("man"), is(false)); - - // get an not empty value - msg.setBoolean("man", true); - assertThat(msg.getBoolean("man"), is(true)); - - // key is null - try { - msg.getBoolean(null); - assertTrue(false); - } - catch (JMSException e) { - assertTrue(true); - } - - // in read-only model - msg.setReadOnly(true); - try { - msg.setBoolean("man", true); - assertTrue(false); - } - catch (MessageNotWriteableException e) { - assertTrue(true); - } - - // both read and write are allowed after clearBody() - msg.clearBody(); - msg.setBoolean("man", false); - msg.getBoolean("man"); - - // map is empty after clearBody() - msg.clearBody(); - assertThat(msg.getBoolean("man"), is(false)); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQObjectMessageTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQObjectMessageTest.java b/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQObjectMessageTest.java index 5291acf..b68bd8d 100644 --- a/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQObjectMessageTest.java +++ b/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQObjectMessageTest.java @@ -29,14 +29,14 @@ public class RocketMQObjectMessageTest { @Test public void testGetObject() throws Exception { final User user = new User("jack", 20); - RocketMQObjectMessage msg = new RocketMQObjectMessage(user); + JMSObjectMessage msg = new JMSObjectMessage(user); assertThat((User)msg.getObject(), is(user)); } @Test public void testGetBody() throws Exception { final User user = new User("jack", 20); - RocketMQObjectMessage msg = new RocketMQObjectMessage(user); + JMSObjectMessage msg = new JMSObjectMessage(user); assertThat((User)msg.getBody(Object.class), is((User)msg.getObject())); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQTextMessageTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQTextMessageTest.java b/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQTextMessageTest.java index f5a23e2..3fd861b 100644 --- a/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQTextMessageTest.java +++ b/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQTextMessageTest.java @@ -27,13 +27,13 @@ public class RocketMQTextMessageTest { @Test public void testGetBody() throws Exception { - RocketMQTextMessage msg = new RocketMQTextMessage(text); + JMSTextMessage msg = new JMSTextMessage(text); assertThat(msg.getBody(String.class), is(text)); } @Test public void testSetText() throws Exception { - RocketMQTextMessage msg = new RocketMQTextMessage(); + JMSTextMessage msg = new JMSTextMessage(); msg.setText(text); assertThat(msg.getText(), is(text)); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/test/java/org/apache/rocketmq/jms/support/MessageConvertTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/support/MessageConvertTest.java b/core/src/test/java/org/apache/rocketmq/jms/support/MessageConvertTest.java index 410b955..6ac7acd 100644 --- a/core/src/test/java/org/apache/rocketmq/jms/support/MessageConvertTest.java +++ b/core/src/test/java/org/apache/rocketmq/jms/support/MessageConvertTest.java @@ -20,9 +20,8 @@ package org.apache.rocketmq.jms.support; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.jms.RocketMQTopic; -import org.apache.rocketmq.jms.msg.RocketMQMessage; -import org.apache.rocketmq.jms.msg.RocketMQTextMessage; -import org.apache.rocketmq.jms.support.MessageConverter; +import org.apache.rocketmq.jms.msg.AbstractJMSMessage; +import org.apache.rocketmq.jms.msg.JMSTextMessage; import org.junit.Assert; import org.junit.Test; @@ -41,7 +40,7 @@ public class MessageConvertTest { String topic = "TestTopic"; String messageType = "TagA"; - RocketMQMessage rmqJmsMessage = new RocketMQTextMessage("testText"); + AbstractJMSMessage rmqJmsMessage = new JMSTextMessage("testText"); rmqJmsMessage.setHeader(JMS_DESTINATION, new RocketMQTopic(topic, messageType)); rmqJmsMessage.setHeader(JMS_MESSAGE_ID, "ID:null"); rmqJmsMessage.setHeader(JMS_REDELIVERED, Boolean.FALSE); @@ -56,10 +55,10 @@ public class MessageConvertTest { MessageExt message = (MessageExt)MessageConverter.convert2RMQMessage(rmqJmsMessage); //then convert back to RmqJmsMessage - RocketMQMessage RmqJmsMessageBack = MessageConverter.convert2JMSMessage(message); + AbstractJMSMessage RmqJmsMessageBack = MessageConverter.convert2JMSMessage(message); - RocketMQTextMessage jmsTextMessage = (RocketMQTextMessage) rmqJmsMessage; - RocketMQTextMessage jmsTextMessageBack = (RocketMQTextMessage) RmqJmsMessageBack; + JMSTextMessage jmsTextMessage = (JMSTextMessage) rmqJmsMessage; + JMSTextMessage jmsTextMessageBack = (JMSTextMessage) RmqJmsMessageBack; Assert.assertEquals(jmsTextMessage.getText(), jmsTextMessageBack.getText()); Assert.assertEquals(jmsTextMessage.getJMSDestination().toString(), jmsTextMessageBack.getJMSDestination().toString());