Fix error,refactor serialize and MessageConverter further. Unit test will be added in next commit
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/464cbc1d Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/464cbc1d Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/464cbc1d Branch: refs/heads/jms-dev-1.1.0 Commit: 464cbc1d89ba67d61371e398e74fe78025a71c24 Parents: 37576dd Author: zhangke <zhangke_beij...@qq.com> Authored: Thu Feb 23 22:21:25 2017 +0800 Committer: zhangke <zhangke_beij...@qq.com> Committed: Thu Feb 23 22:21:25 2017 +0800 ---------------------------------------------------------------------- .../rocketmq/jms/DeliverMessageService.java | 4 +- .../org/apache/rocketmq/jms/JMSHeaderEnum.java | 11 +- .../rocketmq/jms/JMSMessageModelEnum.java | 7 +- .../apache/rocketmq/jms/RocketMQProducer.java | 121 ++++---------- .../apache/rocketmq/jms/RocketMQSession.java | 11 +- .../apache/rocketmq/jms/SendMessageHook.java | 27 ---- .../rocketmq/jms/hook/SendMessageHook.java | 68 ++++++++ .../apache/rocketmq/jms/msg/JMSMapMessage.java | 2 +- .../rocketmq/jms/msg/JMSObjectMessage.java | 8 +- .../apache/rocketmq/jms/msg/JMSTextMessage.java | 3 +- .../jms/msg/convert/JMS2RMQMessageConvert.java | 62 ++++++++ .../jms/msg/convert/RMQ2JMSMessageConvert.java | 94 +++++++++++ .../jms/msg/serialize/MapSerialize.java | 12 +- .../jms/msg/serialize/ObjectSerialize.java | 11 +- .../rocketmq/jms/msg/serialize/Serialize.java | 2 - .../jms/msg/serialize/StringSerialize.java | 13 +- .../rocketmq/jms/support/MessageConverter.java | 157 ------------------- .../jms/msg/RocketMQBytesMessageTest.java | 6 +- .../jms/support/MessageConvertTest.java | 74 --------- 19 files changed, 310 insertions(+), 383 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java index da8196f..bcfc680 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java +++ b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java @@ -38,8 +38,8 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.jms.msg.convert.RMQ2JMSMessageConvert; import org.apache.rocketmq.jms.support.JmsHelper; -import org.apache.rocketmq.jms.support.MessageConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -183,7 +183,7 @@ public class DeliverMessageService extends ServiceThread { * @throws JMSException */ private void handleMessage(MessageExt msg, MessageQueue mq) throws InterruptedException, JMSException { - Message jmsMessage = MessageConverter.convert2JMSMessage(msg); + Message jmsMessage = RMQ2JMSMessageConvert.convert(msg); if (jmsMessage.getJMSExpiration() != 0 && System.currentTimeMillis() > jmsMessage.getJMSExpiration()) { log.debug("The message[id={}] has been expired", msg.getMsgId()); return; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/JMSHeaderEnum.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/JMSHeaderEnum.java b/core/src/main/java/org/apache/rocketmq/jms/JMSHeaderEnum.java index a9c758e..4979f88 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/JMSHeaderEnum.java +++ b/core/src/main/java/org/apache/rocketmq/jms/JMSHeaderEnum.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.jms; +import javax.jms.Message; + public enum JMSHeaderEnum { JMSDestination, @@ -31,11 +33,12 @@ public enum JMSHeaderEnum { JMSPriority, JMSDeliveryTime; - public static final int JMS_DELIVERY_MODE_DEFAULT_VALUE = 0; - public static final int JMS_TIMESTAMP_DEFAULT_VALUE = 0; + public static final int JMS_DELIVERY_MODE_DEFAULT_VALUE = Message.DEFAULT_DELIVERY_MODE; + public static final long JMS_TIME_TO_LIVE_DEFAULT_VALUE = Message.DEFAULT_TIME_TO_LIVE; + public static final int JMS_PRIORITY_DEFAULT_VALUE = Message.DEFAULT_PRIORITY; + public static final long JMS_DELIVERY_TIME_DEFAULT_VALUE = Message.DEFAULT_DELIVERY_DELAY; public static final boolean JMS_REDELIVERED_DEFAULT_VALUE = false; + public static final int JMS_TIMESTAMP_DEFAULT_VALUE = 0; public static final int JMS_EXPIRATION_DEFAULT_VALUE = 0; - public static final int JMS_PRIORITY_DEFAULT_VALUE = 5; - public static final int JMS_DELIVERY_TIME_DEFAULT_VALUE = 0; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/JMSMessageModelEnum.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/JMSMessageModelEnum.java b/core/src/main/java/org/apache/rocketmq/jms/JMSMessageModelEnum.java index 0659f92..feee4e3 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/JMSMessageModelEnum.java +++ b/core/src/main/java/org/apache/rocketmq/jms/JMSMessageModelEnum.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.jms; +import org.apache.rocketmq.jms.msg.AbstractJMSMessage; import org.apache.rocketmq.jms.msg.JMSBytesMessage; import org.apache.rocketmq.jms.msg.JMSMapMessage; import org.apache.rocketmq.jms.msg.JMSObjectMessage; @@ -36,14 +37,14 @@ public enum JMSMessageModelEnum { this.jmsClass = jmsClass; } - public static JMSMessageModelEnum toMsgModelEnum(Class clazz) { + public static JMSMessageModelEnum toMsgModelEnum(AbstractJMSMessage jmsMsg) { for (JMSMessageModelEnum e : values()) { - if (e.getJmsClass() == clazz) { + if (e.getJmsClass().isInstance(jmsMsg)) { return e; } } - throw new IllegalArgumentException(String.format("Not supported class[%s]", clazz)); + throw new IllegalArgumentException(String.format("Not supported class[%s]", jmsMsg.getClass().getSimpleName())); } public Class getJmsClass() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java b/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java index 8cc5903..109f3bb 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java +++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java @@ -30,24 +30,18 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.jms.hook.SendMessageHook; import org.apache.rocketmq.jms.msg.AbstractJMSMessage; -import org.apache.rocketmq.jms.support.MessageConverter; +import org.apache.rocketmq.jms.msg.convert.JMS2RMQMessageConvert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static java.lang.String.format; -import static javax.jms.Message.DEFAULT_DELIVERY_MODE; -import static javax.jms.Message.DEFAULT_PRIORITY; -import static javax.jms.Message.DEFAULT_TIME_TO_LIVE; import static org.apache.commons.lang.exception.ExceptionUtils.getStackTrace; -import static org.apache.rocketmq.jms.Constant.DEFAULT_JMS_TYPE; -import static org.apache.rocketmq.jms.Constant.JMS_DELIVERY_MODE; -import static org.apache.rocketmq.jms.Constant.JMS_DESTINATION; -import static org.apache.rocketmq.jms.Constant.JMS_EXPIRATION; -import static org.apache.rocketmq.jms.Constant.JMS_PRIORITY; -import static org.apache.rocketmq.jms.Constant.JMS_TIMESTAMP; -import static org.apache.rocketmq.jms.Constant.JMS_TYPE; -import static org.apache.rocketmq.jms.Constant.MESSAGE_ID_PREFIX; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMS_DELIVERY_MODE_DEFAULT_VALUE; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMS_DELIVERY_TIME_DEFAULT_VALUE; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMS_PRIORITY_DEFAULT_VALUE; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMS_TIME_TO_LIVE_DEFAULT_VALUE; import static org.apache.rocketmq.jms.support.DirectTypeConverter.convert2Object; public class RocketMQProducer implements MessageProducer { @@ -59,7 +53,12 @@ public class RocketMQProducer implements MessageProducer { private boolean disableMessageID; private boolean disableMessageTimestamp; - private long timeToLive; + private long timeToLive = JMS_TIME_TO_LIVE_DEFAULT_VALUE; + private int deliveryMode = JMS_DELIVERY_MODE_DEFAULT_VALUE; + private int priority = JMS_PRIORITY_DEFAULT_VALUE; + private long deliveryDelay = JMS_DELIVERY_TIME_DEFAULT_VALUE; + + private SendMessageHook sendMessageHook; public RocketMQProducer(RocketMQSession session, Destination destination) { this.session = session; @@ -75,6 +74,8 @@ public class RocketMQProducer implements MessageProducer { catch (MQClientException e) { throw new JMSRuntimeException(format("Fail to start producer, error msg:%s", getStackTrace(e))); } + + this.sendMessageHook = new SendMessageHook(this); } @Override @@ -99,24 +100,22 @@ public class RocketMQProducer implements MessageProducer { @Override public void setDeliveryMode(int deliveryMode) throws JMSException { - //todo + this.deliveryMode = deliveryMode; } @Override public int getDeliveryMode() throws JMSException { - //todo - return 0; + return this.deliveryMode; } @Override - public void setPriority(int defaultPriority) throws JMSException { - //todo + public void setPriority(int priority) throws JMSException { + this.priority = priority; } @Override public int getPriority() throws JMSException { - //todo - return 0; + return this.priority; } @Override @@ -126,24 +125,22 @@ public class RocketMQProducer implements MessageProducer { @Override public long getTimeToLive() throws JMSException { - return this.getTimeToLive(); + return this.timeToLive; } @Override public void setDeliveryDelay(long deliveryDelay) throws JMSException { - //todo + this.deliveryDelay = deliveryDelay; } @Override public long getDeliveryDelay() throws JMSException { - //todo - return 0; + return this.deliveryDelay; } @Override public Destination getDestination() throws JMSException { - //todo - return null; + return this.destination; } @Override @@ -163,15 +160,14 @@ public class RocketMQProducer implements MessageProducer { @Override public void send(Destination destination, Message message) throws JMSException { - //todo: DEFAULT_TIME_TO_LIVE is zero which means message never expires. This feature is not supported by RMQ now. - this.send(destination, message, DEFAULT_DELIVERY_MODE, DEFAULT_PRIORITY, DEFAULT_TIME_TO_LIVE); + this.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive()); } @Override public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { - before(message); + sendMessageHook.before(message, destination, deliveryMode, priority, timeToLive); MessageExt rmqMsg = createRocketMQMessage(message); @@ -205,93 +201,42 @@ public class RocketMQProducer implements MessageProducer { } } - private MessageExt createRocketMQMessage(Message message) throws JMSException { - AbstractJMSMessage jmsMsg = convert2Object(message, AbstractJMSMessage.class); - initJMSHeaders(jmsMsg, destination); + private MessageExt createRocketMQMessage(Message jmsMsg) throws JMSException { + AbstractJMSMessage abstractJMSMessage = convert2Object(jmsMsg, AbstractJMSMessage.class); try { - return MessageConverter.convert2RMQMessage(jmsMsg); + return JMS2RMQMessageConvert.convert(abstractJMSMessage); } catch (Exception e) { - throw new JMSException(format("Fail to convert to RocketMQ message. Error: %s", getStackTrace(e))); + throw new JMSException(format("Fail to convert to RocketMQ jmsMsg. Error: %s", getStackTrace(e))); } } - /** - * Init the JmsMessage Headers. - * <p/> - * <P>JMS providers init message's headers. Do not allow user to set these by yourself. - * - * @param jmsMsg message - * @param destination - * @throws javax.jms.JMSException - * @see <CODE>Destination</CODE> - */ - private void initJMSHeaders(AbstractJMSMessage jmsMsg, Destination destination) throws JMSException { - - //JMS_DESTINATION default:"topic:message" - jmsMsg.setHeader(JMS_DESTINATION, destination); - //JMS_DELIVERY_MODE default : PERSISTENT - jmsMsg.setHeader(JMS_DELIVERY_MODE, javax.jms.Message.DEFAULT_DELIVERY_MODE); - //JMS_TIMESTAMP default : current time - jmsMsg.setHeader(JMS_TIMESTAMP, System.currentTimeMillis()); - //JMS_EXPIRATION default : 3 days - //JMS_EXPIRATION = currentTime + time_to_live - jmsMsg.setHeader(JMS_EXPIRATION, System.currentTimeMillis() + DEFAULT_TIME_TO_LIVE); - //JMS_PRIORITY default : 4 - jmsMsg.setHeader(JMS_PRIORITY, javax.jms.Message.DEFAULT_PRIORITY); - //JMS_TYPE default : ons(open notification service) - jmsMsg.setHeader(JMS_TYPE, DEFAULT_JMS_TYPE); - //JMS_REPLY_TO,JMS_CORRELATION_ID default : null - //JMS_MESSAGE_ID is set by sendResult. - //JMS_REDELIVERED is set by broker. - } - @Override public void send(Message message, CompletionListener completionListener) throws JMSException { - this.send(this.destination, message, DEFAULT_DELIVERY_MODE, DEFAULT_PRIORITY, DEFAULT_TIME_TO_LIVE, completionListener); + this.send(this.destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), completionListener); } @Override public void send(Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException { - this.send(this.destination, message, DEFAULT_DELIVERY_MODE, DEFAULT_PRIORITY, DEFAULT_TIME_TO_LIVE, completionListener); + this.send(this.destination, message, deliveryMode, priority, timeToLive, completionListener); } @Override public void send(Destination destination, Message message, CompletionListener completionListener) throws JMSException { - this.send(destination, message, DEFAULT_DELIVERY_MODE, DEFAULT_PRIORITY, DEFAULT_TIME_TO_LIVE, completionListener); + this.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), completionListener); } @Override public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException { - before(message); + sendMessageHook.before(message, destination, deliveryMode, priority, timeToLive); MessageExt rmqMsg = createRocketMQMessage(message); sendAsync(rmqMsg, completionListener); } - private void before(Message message) throws JMSException { - // timestamp - if (!getDisableMessageTimestamp()) { - message.setJMSTimestamp(System.currentTimeMillis()); - } - - // messageID is also required in async model, so {@link MessageExt#getMsgId()} can't be used. - if (!getDisableMessageID()) { - message.setJMSMessageID(new StringBuffer(MESSAGE_ID_PREFIX).append(UUID.randomUUID().getLeastSignificantBits()).toString()); - } - - // expiration - if (getTimeToLive() != 0) { - message.setJMSExpiration(System.currentTimeMillis() + getTimeToLive()); - } - else { - message.setJMSExpiration(0l); - } - } - } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java b/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java index c14c85d..d5b64d1 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java +++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java @@ -20,7 +20,6 @@ package org.apache.rocketmq.jms; import com.google.common.base.Preconditions; import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -229,16 +228,8 @@ public class RocketMQSession implements Session { @Override public Topic createTopic(String topicName) throws JMSException { Preconditions.checkNotNull(topicName); - List<String> msgTuple = Arrays.asList(topicName.split(":")); - Preconditions.checkState(msgTuple.size() >= 1 && msgTuple.size() <= 2, - "Destination must match messageTopic:messageType !"); - - //If messageType is null, use * instead. - if (1 == msgTuple.size()) { - return new RocketMQTopic(topicName); - } - return new RocketMQTopic(msgTuple.get(0), msgTuple.get(1)); + return new RocketMQTopic(topicName); } @Override http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/SendMessageHook.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/SendMessageHook.java b/core/src/main/java/org/apache/rocketmq/jms/SendMessageHook.java deleted file mode 100644 index 0dee423..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/SendMessageHook.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import javax.jms.Message; - -public class SendMessageHook { - - public void before(Message message) { - - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java b/core/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java new file mode 100644 index 0000000..2e33cd8 --- /dev/null +++ b/core/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.hook; + +import java.util.UUID; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import org.apache.rocketmq.jms.RocketMQProducer; + +import static org.apache.rocketmq.jms.Constant.MESSAGE_ID_PREFIX; + +public class SendMessageHook { + + private RocketMQProducer producer; + + public SendMessageHook(RocketMQProducer producer) { + this.producer = producer; + } + + public void before(Message message, Destination destination, int deliveryMode, int priority, + long timeToLive) throws JMSException { + // destination + message.setJMSDestination(destination); + + // delivery mode + message.setJMSDeliveryMode(deliveryMode); + + // expiration + if (timeToLive != 0) { + message.setJMSExpiration(System.currentTimeMillis() + timeToLive); + } + else { + message.setJMSExpiration(0L); + } + + // delivery time + message.setJMSDeliveryTime(message.getJMSTimestamp() + this.producer.getDeliveryDelay()); + + // priority + message.setJMSPriority(priority); + + // messageID is also required in async model, so {@link MessageExt#getMsgId()} can't be used. + if (!this.producer.getDisableMessageID()) { + message.setJMSMessageID(new StringBuffer(MESSAGE_ID_PREFIX).append(UUID.randomUUID().getLeastSignificantBits()).toString()); + } + + // timestamp + if (!this.producer.getDisableMessageTimestamp()) { + message.setJMSTimestamp(System.currentTimeMillis()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java index d1dd15d..d10247f 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java @@ -66,7 +66,7 @@ public class JMSMapMessage extends AbstractJMSMessage implements MapMessage { } @Override public byte[] getBody() throws JMSException { - return new MapSerialize().serialize(this.map); + return MapSerialize.instance().serialize(this.map); } @Override public boolean isBodyAssignableTo(Class c) throws JMSException { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java index 239ecc7..4f29d33 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.jms.msg; -import java.io.IOException; import java.io.Serializable; import javax.jms.JMSException; import org.apache.rocketmq.jms.msg.serialize.ObjectSerialize; @@ -39,12 +38,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements javax.jms.Ob } @Override public byte[] getBody() throws JMSException { - try { - return ObjectSerialize.serialize(body); - } - catch (IOException e) { - throw new JMSException(e.getMessage()); - } + return ObjectSerialize.instance().serialize(body); } @Override public boolean isBodyAssignableTo(Class c) throws JMSException { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java index 13e344d..5fd67a3 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.jms.msg; import javax.jms.JMSException; import javax.jms.MessageFormatException; +import org.apache.rocketmq.jms.msg.serialize.StringSerialize; import static java.lang.String.format; @@ -43,7 +44,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text } @Override public byte[] getBody() throws JMSException { - return new byte[0]; + return StringSerialize.instance().serialize(this.text); } @Override public boolean isBodyAssignableTo(Class c) throws JMSException { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java b/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java new file mode 100644 index 0000000..cf7d975 --- /dev/null +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.convert; + +import java.util.Map; +import javax.jms.JMSException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.jms.msg.AbstractJMSMessage; + +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSExpiration; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSMessageID; +import static org.apache.rocketmq.jms.JMSMessageModelEnum.MSG_MODEL_NAME; +import static org.apache.rocketmq.jms.JMSMessageModelEnum.toMsgModelEnum; + +public class JMS2RMQMessageConvert { + + public static MessageExt convert(AbstractJMSMessage jmsMsg) throws Exception { + MessageExt rmqMsg = new MessageExt(); + + handleHeader(jmsMsg, rmqMsg); + + handleBody(jmsMsg, rmqMsg); + + handleProperties(jmsMsg, rmqMsg); + + return rmqMsg; + } + + private static void handleHeader(AbstractJMSMessage jmsMsg, MessageExt rmqMsg) { + rmqMsg.putUserProperty(JMSMessageID.name(), jmsMsg.getJMSMessageID()); + rmqMsg.setBornTimestamp(jmsMsg.getJMSTimestamp()); + rmqMsg.putUserProperty(JMSExpiration.name(), String.valueOf(jmsMsg.getJMSExpiration())); + rmqMsg.setKeys(jmsMsg.getJMSMessageID()); + } + + private static void handleProperties(AbstractJMSMessage jmsMsg, MessageExt rmqMsg) { + Map<String, Object> userProps = jmsMsg.getProperties(); + for (Map.Entry<String, Object> entry : userProps.entrySet()) { + rmqMsg.putUserProperty(entry.getKey(), entry.getValue().toString()); + } + } + + private static void handleBody(AbstractJMSMessage jmsMsg, MessageExt rmqMsg) throws JMSException { + rmqMsg.putUserProperty(MSG_MODEL_NAME, toMsgModelEnum(jmsMsg).name()); + rmqMsg.setBody(jmsMsg.getBody()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java b/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java new file mode 100644 index 0000000..4fa197b --- /dev/null +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.convert; + +import java.util.Map; +import javax.jms.JMSException; +import javax.jms.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.jms.JMSHeaderEnum; +import org.apache.rocketmq.jms.JMSMessageModelEnum; +import org.apache.rocketmq.jms.RocketMQTopic; +import org.apache.rocketmq.jms.msg.AbstractJMSMessage; +import org.apache.rocketmq.jms.msg.JMSBytesMessage; +import org.apache.rocketmq.jms.msg.JMSMapMessage; +import org.apache.rocketmq.jms.msg.JMSObjectMessage; +import org.apache.rocketmq.jms.msg.JMSTextMessage; +import org.apache.rocketmq.jms.msg.serialize.MapSerialize; +import org.apache.rocketmq.jms.msg.serialize.ObjectSerialize; +import org.apache.rocketmq.jms.msg.serialize.StringSerialize; + +import static java.lang.String.format; +import static org.apache.rocketmq.jms.JMSMessageModelEnum.MSG_MODEL_NAME; + +public class RMQ2JMSMessageConvert { + + public static Message convert(MessageExt rmqMsg) throws JMSException { + if (rmqMsg == null) { + return null; + } + + AbstractJMSMessage jmsMsg = newAbstractJMSMessage(rmqMsg.getUserProperty(MSG_MODEL_NAME), rmqMsg.getBody()); + + setHeader(rmqMsg, jmsMsg); + + setProperties(rmqMsg, jmsMsg); + + return jmsMsg; + } + + private static AbstractJMSMessage newAbstractJMSMessage(String msgModel, byte[] body) throws JMSException { + AbstractJMSMessage message; + switch (JMSMessageModelEnum.valueOf(msgModel)) { + case BYTE: + return new JMSBytesMessage(body); + case MAP: + message = new JMSMapMessage(MapSerialize.instance().deserialize(body)); + break; + case OBJECT: + message = new JMSObjectMessage(ObjectSerialize.instance().deserialize(body)); + break; + case STRING: + message = new JMSTextMessage(StringSerialize.instance().deserialize(body)); + break; + default: + throw new JMSException(format("The type[%s] is not supported", msgModel)); + } + + return message; + } + + private static void setHeader(MessageExt rmqMsg, AbstractJMSMessage jmsMsg) { + jmsMsg.setJMSMessageID(rmqMsg.getUserProperty(JMSHeaderEnum.JMSMessageID.name())); + jmsMsg.setJMSTimestamp(rmqMsg.getBornTimestamp()); + jmsMsg.setJMSExpiration(Long.valueOf(rmqMsg.getUserProperty(JMSHeaderEnum.JMSExpiration.name()))); + jmsMsg.setJMSRedelivered(rmqMsg.getReconsumeTimes() > 0 ? true : false); + //todo: what about Queue? + jmsMsg.setJMSDestination(new RocketMQTopic(rmqMsg.getTopic())); + } + + private static void setProperties(MessageExt rmqMsg, AbstractJMSMessage jmsMsg) { + Map<String, String> propertiesMap = rmqMsg.getProperties(); + if (propertiesMap != null) { + for (String properName : propertiesMap.keySet()) { + String properValue = propertiesMap.get(properName); + jmsMsg.setStringProperty(properName, properValue); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java index b979d74..7c7f1ea 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java @@ -18,16 +18,26 @@ package org.apache.rocketmq.jms.msg.serialize; import com.alibaba.fastjson.JSON; +import java.util.HashMap; import java.util.Map; import javax.jms.JMSException; public class MapSerialize implements Serialize<Map> { + private static MapSerialize ins = new MapSerialize(); + + public static MapSerialize instance() { + return ins; + } + @Override public byte[] serialize(Map map) throws JMSException { return JSON.toJSONBytes(map); } + private MapSerialize() { + } + @Override public Map deserialize(byte[] bytes) throws JMSException { - return JSON.parseObject(bytes, Map.class); + return JSON.parseObject(bytes, HashMap.class); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java index 38ffa2d..34e9c22 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java @@ -27,6 +27,15 @@ import javax.jms.JMSException; public class ObjectSerialize implements Serialize<Object> { + private static ObjectSerialize ins = new ObjectSerialize(); + + public static ObjectSerialize instance() { + return ins; + } + + private ObjectSerialize() { + } + public byte[] serialize(Object object) throws JMSException { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -47,7 +56,7 @@ public class ObjectSerialize implements Serialize<Object> { ObjectInputStream ois = new ObjectInputStream(bais); ois.close(); bais.close(); - return (Serializable)ois.readObject(); + return (Serializable) ois.readObject(); } catch (IOException e) { throw new JMSException(e.getMessage()); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java index 8e4224f..78a499c 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java @@ -21,8 +21,6 @@ import javax.jms.JMSException; public interface Serialize<T> { - static final byte[] EMPTY_BYTES = new byte[0]; - byte[] serialize(T t) throws JMSException; T deserialize(byte[] bytes) throws JMSException; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java index a6dca20..9ee0d3b 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java @@ -23,8 +23,17 @@ import javax.jms.JMSException; public class StringSerialize implements Serialize<String> { - public static final String EMPTY_STRING = ""; - public static final Charset DEFAULT_CHARSET = Charsets.UTF_8; + private static final String EMPTY_STRING = ""; + private static final byte[] EMPTY_BYTES = new byte[0]; + private static final Charset DEFAULT_CHARSET = Charsets.UTF_8; + private static StringSerialize ins = new StringSerialize(); + + public static StringSerialize instance() { + return ins; + } + + private StringSerialize() { + } @Override public byte[] serialize(String s) throws JMSException { if (null == s) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/main/java/org/apache/rocketmq/jms/support/MessageConverter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/support/MessageConverter.java b/core/src/main/java/org/apache/rocketmq/jms/support/MessageConverter.java deleted file mode 100644 index 5c86237..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/support/MessageConverter.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.support; - -import com.alibaba.fastjson.JSON; -import com.google.common.base.Charsets; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.nio.charset.Charset; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicLong; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.StreamMessage; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.jms.JMSHeaderEnum; -import org.apache.rocketmq.jms.JMSMessageModelEnum; -import org.apache.rocketmq.jms.RocketMQTopic; -import org.apache.rocketmq.jms.msg.AbstractJMSMessage; -import org.apache.rocketmq.jms.msg.JMSBytesMessage; -import org.apache.rocketmq.jms.msg.JMSMapMessage; -import org.apache.rocketmq.jms.msg.JMSObjectMessage; -import org.apache.rocketmq.jms.msg.JMSTextMessage; - -import static java.lang.String.format; -import static org.apache.rocketmq.jms.JMSMessageModelEnum.MSG_MODEL_NAME; - -public class MessageConverter { - public static final String EMPTY_STRING = ""; - - public static Object getBodyFromJMSMessage(javax.jms.Message jmsMessage) throws JMSException { - if (jmsMessage == null) { - return null; - } - - if (StreamMessage.class.isInstance(jmsMessage)) { - throw new UnsupportedOperationException(StreamMessage.class.getSimpleName() + " is not supported"); - } - return jmsMessage.getBody(Object.class); - } - - public static Message convert2JMSMessage(MessageExt msg) throws Exception { - if (msg == null) { - return null; - } - - AbstractJMSMessage message; - final String msgModel = msg.getUserProperty(MSG_MODEL_NAME); - switch (JMSMessageModelEnum.valueOf(msgModel)) { - case BYTE: - message = new JMSBytesMessage(msg.getBody()); - break; - case MAP: - message = new JMSMapMessage(JSON.parseObject(new String(msg.getBody()), HashMap.class)); - break; - case OBJECT: - message = new JMSObjectMessage(objectDeserialize(msg.getBody())); - break; - case STRING: - message = new JMSTextMessage(bytes2String(msg.getBody(), Charsets.UTF_8)); - break; - default: - throw new JMSException(format("The type[%s] is not supported", msgModel)); - } - - //-------------------------set headers------------------------- - message.setJMSMessageID(msg.getUserProperty(JMSHeaderEnum.JMSMessageID.name())); - message.setJMSTimestamp(msg.getBornTimestamp()); - message.setJMSExpiration(Long.valueOf(msg.getUserProperty(JMSHeaderEnum.JMSExpiration.name()))); - message.setJMSRedelivered(msg.getReconsumeTimes() > 0 ? true : false); - //todo: what about Queue? - message.setJMSDestination(new RocketMQTopic(msg.getTopic())); - - Map<String, String> propertiesMap = msg.getProperties(); - if (propertiesMap != null) { - for (String properName : propertiesMap.keySet()) { - String properValue = propertiesMap.get(properName); - message.setStringProperty(properName, properValue); - } - } - - return message; - } - - public static final String bytes2String(byte[] bs, Charset charset) { - if (null == bs) { - return EMPTY_STRING; - } - String s = null; - try { - s = new String(bs, charset); - } - catch (Exception e) { - // ignore - } - return s; - } - - public static MessageExt convert2RMQMessage(AbstractJMSMessage jmsMsg) throws Exception { - MessageExt rmqMsg = new MessageExt(); - - rmqMsg.putUserProperty(JMSHeaderEnum.JMSMessageID.name(), jmsMsg.getJMSMessageID()); - rmqMsg.setBornTimestamp(jmsMsg.getJMSTimestamp()); - rmqMsg.putUserProperty(JMSHeaderEnum.JMSExpiration.name(), String.valueOf(jmsMsg.getJMSExpiration())); - rmqMsg.setKeys(jmsMsg.getJMSMessageID()); - - // 1. Transform message body - rmqMsg.setBody(MessageConverter.getBodyFromJMSMessage(jmsMsg)); - - // 2. Transform message properties - Properties properties = getAllProperties(jmsMsg); - for (String name : properties.stringPropertyNames()) { - String value = properties.getProperty(name); - rmqMsg.putUserProperty(name, value); - } - - return rmqMsg; - } - - private static Properties getAllProperties(AbstractJMSMessage jmsMsg) throws JMSException { - Properties userProperties = new Properties(); - - Map<String, Object> userProps = jmsMsg.getProperties(); - Iterator<Map.Entry<String, Object>> userPropsIter = userProps.entrySet().iterator(); - while (userPropsIter.hasNext()) { - Map.Entry<String, Object> entry = userPropsIter.next(); - userProperties.setProperty(entry.getKey(), entry.getValue().toString()); - } - - //Jms message Model - userProperties.setProperty(MSG_MODEL_NAME, JMSMessageModelEnum.toMsgModelEnum(jmsMsg.getClass()).name()) - - return userProperties; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessageTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessageTest.java b/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessageTest.java index 7664c08..723ca1a 100644 --- a/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessageTest.java +++ b/core/src/test/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessageTest.java @@ -32,11 +32,11 @@ public class RocketMQBytesMessageTest { @Test public void testGetData() throws Exception { JMSBytesMessage readMessage = new JMSBytesMessage(receiveData); - assertThat(new String(receiveData), is(new String(readMessage.getData()))); + assertThat(new String(receiveData), is(new String(readMessage.getBody()))); JMSBytesMessage sendMessage = new JMSBytesMessage(); sendMessage.writeBytes(sendData, 0, sendData.length); - assertThat(new String(sendData), is(new String(sendMessage.getData()))); + assertThat(new String(sendData), is(new String(sendMessage.getBody()))); } @Test @@ -72,7 +72,7 @@ public class RocketMQBytesMessageTest { public void testWriteBytes() throws Exception { JMSBytesMessage msg = new JMSBytesMessage(); msg.writeBytes(sendData); - assertThat(new String(msg.getData()), is(new String(sendData))); + assertThat(new String(msg.getBody()), is(new String(sendData))); } @Test(expected = MessageNotReadableException.class) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/464cbc1d/core/src/test/java/org/apache/rocketmq/jms/support/MessageConvertTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/support/MessageConvertTest.java b/core/src/test/java/org/apache/rocketmq/jms/support/MessageConvertTest.java deleted file mode 100644 index 6ac7acd..0000000 --- a/core/src/test/java/org/apache/rocketmq/jms/support/MessageConvertTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.support; - -import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.jms.RocketMQTopic; -import org.apache.rocketmq.jms.msg.AbstractJMSMessage; -import org.apache.rocketmq.jms.msg.JMSTextMessage; -import org.junit.Assert; -import org.junit.Test; - -import static org.apache.rocketmq.jms.Constant.JMS_DESTINATION; -import static org.apache.rocketmq.jms.Constant.JMS_MESSAGE_ID; -import static org.apache.rocketmq.jms.Constant.JMS_REDELIVERED; -import static org.apache.rocketmq.jms.support.MessageConverter.JMS_MSGMODEL; -import static org.apache.rocketmq.jms.support.MessageConverter.MSGMODEL_TEXT; -import static org.apache.rocketmq.jms.support.MessageConverter.MSG_TOPIC; -import static org.apache.rocketmq.jms.support.MessageConverter.MSG_TYPE; - -public class MessageConvertTest { - @Test - public void testCovert2RMQ() throws Exception { - //build RmqJmsMessage - String topic = "TestTopic"; - String messageType = "TagA"; - - AbstractJMSMessage rmqJmsMessage = new JMSTextMessage("testText"); - rmqJmsMessage.setHeader(JMS_DESTINATION, new RocketMQTopic(topic, messageType)); - rmqJmsMessage.setHeader(JMS_MESSAGE_ID, "ID:null"); - rmqJmsMessage.setHeader(JMS_REDELIVERED, Boolean.FALSE); - - rmqJmsMessage.setObjectProperty(JMS_MSGMODEL, MSGMODEL_TEXT); - rmqJmsMessage.setObjectProperty(MSG_TOPIC, topic); - rmqJmsMessage.setObjectProperty(MSG_TYPE, messageType); - rmqJmsMessage.setObjectProperty(MessageConst.PROPERTY_TAGS, messageType); - rmqJmsMessage.setObjectProperty(MessageConst.PROPERTY_KEYS, messageType); - - //convert to RMQMessagemiz - MessageExt message = (MessageExt)MessageConverter.convert2RMQMessage(rmqJmsMessage); - - //then convert back to RmqJmsMessage - AbstractJMSMessage RmqJmsMessageBack = MessageConverter.convert2JMSMessage(message); - - JMSTextMessage jmsTextMessage = (JMSTextMessage) rmqJmsMessage; - JMSTextMessage jmsTextMessageBack = (JMSTextMessage) RmqJmsMessageBack; - - Assert.assertEquals(jmsTextMessage.getText(), jmsTextMessageBack.getText()); - Assert.assertEquals(jmsTextMessage.getJMSDestination().toString(), jmsTextMessageBack.getJMSDestination().toString()); - Assert.assertEquals(jmsTextMessage.getJMSMessageID(), jmsTextMessageBack.getJMSMessageID()); - Assert.assertEquals(jmsTextMessage.getJMSRedelivered(), jmsTextMessageBack.getJMSRedelivered()); - Assert.assertEquals(jmsTextMessage.getHeaders().get(JMS_MSGMODEL), jmsTextMessageBack.getHeaders().get(JMS_MSGMODEL)); - Assert.assertEquals(jmsTextMessage.getHeaders().get(MSG_TOPIC), jmsTextMessageBack.getHeaders().get(MSG_TOPIC)); - Assert.assertEquals(jmsTextMessage.getHeaders().get(MSG_TYPE), jmsTextMessageBack.getHeaders().get(MSG_TYPE)); - Assert.assertEquals(jmsTextMessage.getHeaders().get(MessageConst.PROPERTY_TAGS), jmsTextMessageBack.getHeaders().get(MessageConst.PROPERTY_TAGS)); - Assert.assertEquals(jmsTextMessage.getHeaders().get(MessageConst.PROPERTY_KEYS), jmsTextMessageBack.getHeaders().get(MessageConst.PROPERTY_KEYS)); - - } -}