http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java new file mode 100644 index 0000000..4bdf58b --- /dev/null +++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; +import javax.jms.IllegalStateRuntimeException; +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; +import org.apache.rocketmq.jms.support.PrimitiveTypeCast; + +import static java.lang.String.format; + +/** + * RocketMQ ByteMessage. + */ +public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.BytesMessage { + + private byte[] bytesIn; + private DataInputStream dataAsInput; + + private ByteArrayOutputStream bytesOut; + private DataOutputStream dataAsOutput; + + protected boolean readOnly; + + /** + * Message created for reading + * + * @param data to construct this object + */ + public JMSBytesMessage(byte[] data) { + this.bytesIn = data; + this.dataAsInput = new DataInputStream(new ByteArrayInputStream(data, 0, data.length)); + this.readOnly = true; + this.writeOnly = false; + } + + /** + * Message created to be sent + */ + public JMSBytesMessage() { + this.bytesOut = new ByteArrayOutputStream(); + this.dataAsOutput = new DataOutputStream(this.bytesOut); + this.readOnly = false; + this.writeOnly = true; + } + + @Override public byte[] getBody(Class clazz) throws JMSException { + byte[] result; + if (isBodyAssignableTo(clazz)) { + if (isWriteOnly()) { + result = bytesOut.toByteArray(); + this.reset(); + return result; + } + else if (isReadOnly()) { + result = Arrays.copyOf(bytesIn, bytesIn.length); + this.reset(); + return result; + } + else { + throw new IllegalStateRuntimeException("Message must be in write only or read only status"); + } + } + + throw new MessageFormatException(format("The type[%s] can't be casted to byte[]", clazz.toString())); + } + + @Override public byte[] getBody() throws JMSException { + return getBody(byte[].class); + } + + @Override public boolean isBodyAssignableTo(Class c) throws JMSException { + return byte[].class.isAssignableFrom(c); + } + + @Override public long getBodyLength() throws JMSException { + if (isWriteOnly()) { + return bytesOut.size(); + } + else if (isReadOnly()) { + return bytesIn.length; + } + else { + throw new IllegalStateRuntimeException("Message must be in write only or read only status"); + } + } + + public boolean readBoolean() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readBoolean(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + private void checkIsReadOnly() throws MessageNotReadableException { + if (!isReadOnly()) { + throw new MessageNotReadableException("Not readable"); + } + if (dataAsInput == null) { + throw new MessageNotReadableException("No data to read"); + } + } + + public byte readByte() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readByte(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public int readUnsignedByte() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readUnsignedByte(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public short readShort() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readShort(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public int readUnsignedShort() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readUnsignedShort(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public char readChar() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readChar(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public int readInt() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readInt(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public long readLong() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readLong(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public float readFloat() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readFloat(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public double readDouble() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readDouble(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public String readUTF() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readUTF(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public int readBytes(byte[] value) throws JMSException { + checkIsReadOnly(); + + return readBytes(value, value.length); + } + + public int readBytes(byte[] value, int length) throws JMSException { + checkIsReadOnly(); + + if (length > value.length) { + throw new IndexOutOfBoundsException("length must be smaller than the length of value"); + } + if (dataAsInput == null) { + throw new MessageNotReadableException("Message is not readable! "); + } + try { + int offset = 0; + while (offset < length) { + int read = dataAsInput.read(value, offset, length - offset); + if (read < 0) { + break; + } + offset += read; + } + + if (offset == 0 && length != 0) { + return -1; + } + else { + return offset; + } + } + catch (IOException e) { + throw handleInputException(e); + } + + } + + public void writeBoolean(boolean value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeBoolean(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + private void initializeWriteIfNecessary() { + if (bytesOut == null) { + bytesOut = new ByteArrayOutputStream(); + } + if (dataAsOutput == null) { + dataAsOutput = new DataOutputStream(bytesOut); + } + } + + public void writeByte(byte value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeByte(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeShort(short value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeShort(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeChar(char value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeChar(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeInt(int value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeInt(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeLong(long value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeLong(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeFloat(float value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeFloat(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeDouble(double value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeDouble(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeUTF(String value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeUTF(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeBytes(byte[] value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + if (dataAsOutput == null) { + throw new MessageNotWriteableException("Message is not writable! "); + } + try { + dataAsOutput.write(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeBytes(byte[] value, int offset, int length) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + if (dataAsOutput == null) { + throw new MessageNotWriteableException("Message is not writable! "); + } + try { + dataAsOutput.write(value, offset, length); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeObject(Object value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + if (!PrimitiveTypeCast.isPrimitiveType(value)) { + throw new JMSException("Object must be primitive type"); + } + + try { + dataAsOutput.writeBytes(String.valueOf(value)); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void reset() throws JMSException { + try { + if (bytesOut != null) { + bytesOut.reset(); + } + if (this.dataAsInput != null) { + this.dataAsInput.reset(); + } + + this.readOnly = true; + } + catch (IOException e) { + throw new JMSException(e.getMessage()); + } + } + + @Override public void clearBody() { + super.clearBody(); + this.bytesOut = null; + this.dataAsOutput = null; + this.dataAsInput = null; + this.bytesIn = null; + } + + private JMSException handleOutputException(final IOException e) { + JMSException ex = new JMSException(e.getMessage()); + ex.initCause(e); + ex.setLinkedException(e); + return ex; + } + + private JMSException handleInputException(final IOException e) { + JMSException ex; + if (e instanceof EOFException) { + ex = new MessageEOFException(e.getMessage()); + } + else { + ex = new MessageFormatException(e.getMessage()); + } + ex.initCause(e); + ex.setLinkedException(e); + return ex; + } + + protected boolean isReadOnly() { + return readOnly; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java new file mode 100644 index 0000000..3524d50 --- /dev/null +++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg; + +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotWriteableException; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.jms.msg.serialize.MapSerialize; + +import static java.lang.String.format; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Boolean; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Byte; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2ByteArray; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Char; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Double; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Float; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Int; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Long; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Short; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2String; + +/** + * Message can only be accessed by a thread at a time. + */ +public class JMSMapMessage extends AbstractJMSMessage implements MapMessage { + + private Map<String, Object> map; + + protected boolean readOnly; + + public JMSMapMessage(Map<String, Object> map) { + this.map = map; + } + + public JMSMapMessage() { + this.map = new HashMap(); + } + + @Override public Map<String, Object> getBody(Class clazz) throws JMSException { + if (isBodyAssignableTo(clazz)) { + return this.map; + } + + throw new MessageFormatException(format("The type[%s] can't be casted to byte[]", clazz.toString())); + } + + @Override public byte[] getBody() throws JMSException { + return MapSerialize.instance().serialize(this.map); + } + + @Override public boolean isBodyAssignableTo(Class c) throws JMSException { + return Map.class.isAssignableFrom(c); + } + + @Override public boolean getBoolean(String name) throws JMSException { + checkName(name); + + return cast2Boolean(map.get(name)); + } + + private void checkName(String name) throws JMSException { + if (StringUtils.isBlank(name)) { + throw new JMSException("Name is required"); + } + } + + @Override public byte getByte(String name) throws JMSException { + checkName(name); + + return cast2Byte(map.get(name)); + } + + @Override public short getShort(String name) throws JMSException { + checkName(name); + + return cast2Short(map.get(name)); + } + + @Override public char getChar(String name) throws JMSException { + checkName(name); + + return cast2Char(map.get(name)); + } + + @Override public int getInt(String name) throws JMSException { + checkName(name); + + return cast2Int(map.get(name)); + } + + @Override public long getLong(String name) throws JMSException { + checkName(name); + + return cast2Long(map.get(name)); + } + + @Override public float getFloat(String name) throws JMSException { + checkName(name); + + return cast2Float(map.get(name)); + } + + @Override public double getDouble(String name) throws JMSException { + checkName(name); + + return cast2Double(map.get(name)); + } + + @Override public String getString(String name) throws JMSException { + checkName(name); + + return cast2String(map.get(name)); + } + + @Override public byte[] getBytes(String name) throws JMSException { + checkName(name); + + return cast2ByteArray(map.get(name)); + } + + @Override public Object getObject(String name) throws JMSException { + checkName(name); + + return map.get(name); + } + + @Override public Enumeration getMapNames() throws JMSException { + return Collections.enumeration(map.keySet()); + } + + @Override public void setBoolean(String name, boolean value) throws JMSException { + putProperty(name, value); + } + + private void putProperty(String name, Object obj) throws JMSException { + if (isReadOnly()) { + throw new MessageNotWriteableException("Message is not writable"); + } + + checkName(name); + + map.put(name, obj); + } + + @Override public void setByte(String name, byte value) throws JMSException { + putProperty(name, value); + } + + @Override public void setShort(String name, short value) throws JMSException { + putProperty(name, value); + } + + @Override public void setChar(String name, char value) throws JMSException { + putProperty(name, value); + } + + @Override public void setInt(String name, int value) throws JMSException { + putProperty(name, value); + } + + @Override public void setLong(String name, long value) throws JMSException { + putProperty(name, value); + } + + @Override public void setFloat(String name, float value) throws JMSException { + putProperty(name, value); + } + + @Override public void setDouble(String name, double value) throws JMSException { + putProperty(name, value); + } + + @Override public void setString(String name, String value) throws JMSException { + putProperty(name, value); + } + + @Override public void setBytes(String name, byte[] value) throws JMSException { + putProperty(name, value); + } + + @Override public void setBytes(String name, byte[] value, int offset, int length) throws JMSException { + putProperty(name, value); + } + + @Override public void setObject(String name, Object value) throws JMSException { + putProperty(name, value); + } + + @Override public boolean itemExists(String name) throws JMSException { + checkName(name); + + return map.containsKey(name); + } + + @Override public void clearBody() { + super.clearBody(); + this.map.clear(); + this.readOnly = false; + } + + protected boolean isReadOnly() { + return this.readOnly; + } + + public void setReadOnly(boolean readOnly) { + this.readOnly = readOnly; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java new file mode 100644 index 0000000..4f29d33 --- /dev/null +++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg; + +import java.io.Serializable; +import javax.jms.JMSException; +import org.apache.rocketmq.jms.msg.serialize.ObjectSerialize; + +public class JMSObjectMessage extends AbstractJMSMessage implements javax.jms.ObjectMessage { + + private Serializable body; + + public JMSObjectMessage(Serializable object) { + this.body = object; + } + + public JMSObjectMessage() { + + } + + @Override public Serializable getBody(Class clazz) throws JMSException { + return body; + } + + @Override public byte[] getBody() throws JMSException { + return ObjectSerialize.instance().serialize(body); + } + + @Override public boolean isBodyAssignableTo(Class c) throws JMSException { + return true; + } + + public Serializable getObject() throws JMSException { + return this.body; + } + + public void setObject(Serializable object) throws JMSException { + this.body = object; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java new file mode 100644 index 0000000..5fd67a3 --- /dev/null +++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import org.apache.rocketmq.jms.msg.serialize.StringSerialize; + +import static java.lang.String.format; + +public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage { + + private String text; + + public JMSTextMessage() { + + } + + public JMSTextMessage(String text) { + setText(text); + } + + @Override public String getBody(Class clazz) throws JMSException { + if (isBodyAssignableTo(clazz)) { + return text; + } + + throw new MessageFormatException(format("The type[%s] can't be casted to byte[]", clazz.toString())); + } + + @Override public byte[] getBody() throws JMSException { + return StringSerialize.instance().serialize(this.text); + } + + @Override public boolean isBodyAssignableTo(Class c) throws JMSException { + return String.class.isAssignableFrom(c); + } + + public void clearBody() { + super.clearBody(); + this.text = null; + } + + public String getText() throws JMSException { + return this.text; + } + + public void setText(String text) { + this.text = text; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java new file mode 100644 index 0000000..ca2cbed --- /dev/null +++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.convert; + +import java.util.Map; +import javax.jms.JMSException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.jms.msg.AbstractJMSMessage; +import org.apache.rocketmq.jms.support.JMSUtils; + +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSExpiration; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSMessageID; +import static org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum.MSG_MODEL_NAME; +import static org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum.toMsgModelEnum; + +public class JMS2RMQMessageConvert { + + public static final String USER_PROPERTY_PREFIX = "USER:"; + + public static MessageExt convert(AbstractJMSMessage jmsMsg) throws Exception { + MessageExt rmqMsg = new MessageExt(); + + handleHeader(jmsMsg, rmqMsg); + + handleBody(jmsMsg, rmqMsg); + + handleProperties(jmsMsg, rmqMsg); + + return rmqMsg; + } + + private static void handleHeader(AbstractJMSMessage jmsMsg, MessageExt rmqMsg) { + rmqMsg.setTopic(JMSUtils.getDestinationName(jmsMsg.getJMSDestination())); + rmqMsg.putUserProperty(JMSMessageID.name(), jmsMsg.getJMSMessageID()); + rmqMsg.setBornTimestamp(jmsMsg.getJMSTimestamp()); + rmqMsg.putUserProperty(JMSExpiration.name(), String.valueOf(jmsMsg.getJMSExpiration())); + rmqMsg.setKeys(jmsMsg.getJMSMessageID()); + } + + private static void handleProperties(AbstractJMSMessage jmsMsg, MessageExt rmqMsg) { + Map<String, Object> userProps = jmsMsg.getProperties(); + for (Map.Entry<String, Object> entry : userProps.entrySet()) { + rmqMsg.putUserProperty(new StringBuffer(USER_PROPERTY_PREFIX).append(entry.getKey()).toString(), entry.getValue().toString()); + } + } + + private static void handleBody(AbstractJMSMessage jmsMsg, MessageExt rmqMsg) throws JMSException { + rmqMsg.putUserProperty(MSG_MODEL_NAME, toMsgModelEnum(jmsMsg).name()); + rmqMsg.setBody(jmsMsg.getBody()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java new file mode 100644 index 0000000..4adb692 --- /dev/null +++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.convert; + +import java.util.Map; +import javax.jms.JMSException; +import javax.jms.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.jms.destination.RocketMQTopic; +import org.apache.rocketmq.jms.msg.AbstractJMSMessage; +import org.apache.rocketmq.jms.msg.JMSBytesMessage; +import org.apache.rocketmq.jms.msg.JMSMapMessage; +import org.apache.rocketmq.jms.msg.JMSObjectMessage; +import org.apache.rocketmq.jms.msg.JMSTextMessage; +import org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum; +import org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum; +import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum; +import org.apache.rocketmq.jms.msg.serialize.MapSerialize; +import org.apache.rocketmq.jms.msg.serialize.ObjectSerialize; +import org.apache.rocketmq.jms.msg.serialize.StringSerialize; + +import static java.lang.String.format; +import static org.apache.rocketmq.jms.msg.convert.JMS2RMQMessageConvert.USER_PROPERTY_PREFIX; +import static org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum.MSG_MODEL_NAME; + +public class RMQ2JMSMessageConvert { + + public static Message convert(MessageExt rmqMsg) throws JMSException { + if (rmqMsg == null) { + throw new IllegalArgumentException("RocketMQ message could not be null"); + } + if (rmqMsg.getBody() == null) { + throw new IllegalArgumentException("RocketMQ message body could not be null"); + } + + AbstractJMSMessage jmsMsg = newAbstractJMSMessage(rmqMsg.getUserProperty(MSG_MODEL_NAME), rmqMsg.getBody()); + + setHeader(rmqMsg, jmsMsg); + + setProperties(rmqMsg, jmsMsg); + + return jmsMsg; + } + + private static AbstractJMSMessage newAbstractJMSMessage(String msgModel, byte[] body) throws JMSException { + AbstractJMSMessage message; + switch (JMSMessageModelEnum.valueOf(msgModel)) { + case BYTE: + return new JMSBytesMessage(body); + case MAP: + message = new JMSMapMessage(MapSerialize.instance().deserialize(body)); + break; + case OBJECT: + message = new JMSObjectMessage(ObjectSerialize.instance().deserialize(body)); + break; + case STRING: + message = new JMSTextMessage(StringSerialize.instance().deserialize(body)); + break; + default: + throw new JMSException(format("The type[%s] is not supported", msgModel)); + } + + return message; + } + + private static void setHeader(MessageExt rmqMsg, AbstractJMSMessage jmsMsg) { + jmsMsg.setJMSMessageID(rmqMsg.getUserProperty(JMSHeaderEnum.JMSMessageID.name())); + jmsMsg.setJMSTimestamp(rmqMsg.getBornTimestamp()); + jmsMsg.setJMSExpiration(Long.valueOf(rmqMsg.getUserProperty(JMSHeaderEnum.JMSExpiration.name()))); + jmsMsg.setJMSRedelivered(rmqMsg.getReconsumeTimes() > 0 ? true : false); + //todo: what about Queue? + jmsMsg.setJMSDestination(new RocketMQTopic(rmqMsg.getTopic())); + } + + private static void setProperties(MessageExt rmqMsg, AbstractJMSMessage jmsMsg) { + jmsMsg.setIntProperty(JMSPropertiesEnum.JMSXDeliveryCount.name(), rmqMsg.getReconsumeTimes() + 1); + + Map<String, String> propertiesMap = rmqMsg.getProperties(); + if (propertiesMap != null) { + for (String properName : propertiesMap.keySet()) { + if (properName.startsWith(USER_PROPERTY_PREFIX)) { + String properValue = propertiesMap.get(properName); + jmsMsg.setStringProperty(properName.substring(USER_PROPERTY_PREFIX.length()), properValue); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSHeaderEnum.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSHeaderEnum.java b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSHeaderEnum.java new file mode 100644 index 0000000..cb27675 --- /dev/null +++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSHeaderEnum.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.enums; + +import javax.jms.Message; + +public enum JMSHeaderEnum { + + JMSDestination, + JMSDeliveryMode, + JMSMessageID, + JMSTimestamp, + JMSCorrelationID, + JMSReplyTo, + JMSRedelivered, + JMSType, + JMSExpiration, + JMSPriority, + JMSDeliveryTime; + + public static final int JMS_DELIVERY_MODE_DEFAULT_VALUE = Message.DEFAULT_DELIVERY_MODE; + public static final long JMS_TIME_TO_LIVE_DEFAULT_VALUE = Message.DEFAULT_TIME_TO_LIVE; + public static final int JMS_PRIORITY_DEFAULT_VALUE = Message.DEFAULT_PRIORITY; + public static final long JMS_DELIVERY_TIME_DEFAULT_VALUE = Message.DEFAULT_DELIVERY_DELAY; + public static final boolean JMS_REDELIVERED_DEFAULT_VALUE = false; + public static final int JMS_TIMESTAMP_DEFAULT_VALUE = 0; + public static final int JMS_EXPIRATION_DEFAULT_VALUE = 0; + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java new file mode 100644 index 0000000..f7dc15a --- /dev/null +++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.enums; + +import org.apache.rocketmq.jms.msg.AbstractJMSMessage; +import org.apache.rocketmq.jms.msg.JMSBytesMessage; +import org.apache.rocketmq.jms.msg.JMSMapMessage; +import org.apache.rocketmq.jms.msg.JMSObjectMessage; +import org.apache.rocketmq.jms.msg.JMSTextMessage; + +public enum JMSMessageModelEnum { + BYTE(JMSBytesMessage.class), + MAP(JMSMapMessage.class), + OBJECT(JMSObjectMessage.class), + STRING(JMSTextMessage.class); + + public static final String MSG_MODEL_NAME = "MsgModel"; + + private Class jmsClass; + + JMSMessageModelEnum(Class jmsClass) { + this.jmsClass = jmsClass; + } + + public static JMSMessageModelEnum toMsgModelEnum(AbstractJMSMessage jmsMsg) { + for (JMSMessageModelEnum e : values()) { + if (e.getJmsClass().isInstance(jmsMsg)) { + return e; + } + } + + throw new IllegalArgumentException(String.format("Not supported class[%s]", jmsMsg.getClass().getSimpleName())); + } + + public Class getJmsClass() { + return jmsClass; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java new file mode 100644 index 0000000..dd5955b --- /dev/null +++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.enums; + +public enum JMSPropertiesEnum { + JMSXUserID, + JMSXDeliveryCount, + JMSXGroupID, + JMSXGroupSeq, + JMSXRcvTimestamp +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java new file mode 100644 index 0000000..7c7f1ea --- /dev/null +++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.serialize; + +import com.alibaba.fastjson.JSON; +import java.util.HashMap; +import java.util.Map; +import javax.jms.JMSException; + +public class MapSerialize implements Serialize<Map> { + + private static MapSerialize ins = new MapSerialize(); + + public static MapSerialize instance() { + return ins; + } + + @Override public byte[] serialize(Map map) throws JMSException { + return JSON.toJSONBytes(map); + } + + private MapSerialize() { + } + + @Override public Map deserialize(byte[] bytes) throws JMSException { + return JSON.parseObject(bytes, HashMap.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java new file mode 100644 index 0000000..a685808 --- /dev/null +++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.serialize; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import javax.jms.JMSException; +import org.apache.commons.lang3.exception.ExceptionUtils; + +public class ObjectSerialize implements Serialize<Object> { + + private static ObjectSerialize ins = new ObjectSerialize(); + + public static ObjectSerialize instance() { + return ins; + } + + private ObjectSerialize() { + } + + public byte[] serialize(Object object) throws JMSException { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(object); + oos.close(); + baos.close(); + return baos.toByteArray(); + } + catch (IOException e) { + throw new JMSException(ExceptionUtils.getStackTrace(e)); + } + } + + public Serializable deserialize(byte[] bytes) throws JMSException { + try { + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + ObjectInputStream ois = new ObjectInputStream(bais); + ois.close(); + bais.close(); + return (Serializable) ois.readObject(); + } + catch (IOException e) { + throw new JMSException(e.getMessage()); + } + catch (ClassNotFoundException e) { + throw new JMSException(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java new file mode 100644 index 0000000..78a499c --- /dev/null +++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.serialize; + +import javax.jms.JMSException; + +public interface Serialize<T> { + + byte[] serialize(T t) throws JMSException; + + T deserialize(byte[] bytes) throws JMSException; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java new file mode 100644 index 0000000..b6119a5 --- /dev/null +++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.serialize; + +import javax.jms.JMSException; +import org.apache.rocketmq.jms.support.JMSUtils; + +public class StringSerialize implements Serialize<String> { + + private static final String EMPTY_STRING = ""; + private static final byte[] EMPTY_BYTES = new byte[0]; + private static StringSerialize ins = new StringSerialize(); + + public static StringSerialize instance() { + return ins; + } + + private StringSerialize() { + } + + @Override public byte[] serialize(String s) throws JMSException { + if (null == s) { + return EMPTY_BYTES; + } + return JMSUtils.string2Bytes(s); + } + + @Override public String deserialize(byte[] bytes) throws JMSException { + if (null == bytes) { + return EMPTY_STRING; + } + return JMSUtils.bytes2String(bytes); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java new file mode 100644 index 0000000..67c54e9 --- /dev/null +++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.support; + +import java.io.UnsupportedEncodingException; +import java.nio.charset.Charset; +import java.util.UUID; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.JMSRuntimeException; +import javax.jms.Queue; +import javax.jms.Topic; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.rocketmq.jms.RocketMQConsumer; + +public class JMSUtils { + + public static final String DEFAULT_CHARSET = "UTF-8"; + + public static String getDestinationName(Destination destination) { + try { + String topicName; + if (destination instanceof Topic) { + topicName = ((Topic) destination).getTopicName(); + } + else if (destination instanceof Queue) { + topicName = ((Queue) destination).getQueueName(); + } + else { + throw new JMSException(String.format("Unsupported Destination type:", destination.getClass())); + } + return topicName; + } + catch (JMSException e) { + throw new JMSRuntimeException(e.getMessage()); + } + } + + public static String getConsumerGroup(RocketMQConsumer consumer) { + try { + return getConsumerGroup(consumer.getSubscriptionName(), + consumer.getSession().getConnection().getClientID(), + consumer.isShared() + ); + } + catch (JMSException e) { + throw new JMSRuntimeException(ExceptionUtils.getStackTrace(e)); + } + } + + public static String getConsumerGroup(String subscriptionName, String clientID, boolean shared) { + StringBuffer consumerGroup = new StringBuffer(); + + if (StringUtils.isNotBlank(subscriptionName)) { + consumerGroup.append(subscriptionName); + } + + if (StringUtils.isNotBlank(clientID)) { + if (consumerGroup.length() != 0) { + consumerGroup.append("-"); + } + consumerGroup.append(clientID); + } + + if (shared) { + if (consumerGroup.length() != 0) { + consumerGroup.append("-"); + } + consumerGroup.append(uuid()); + } + + if (consumerGroup.length() == 0) { + consumerGroup.append(uuid()); + } + + return consumerGroup.toString(); + } + + public static String uuid() { + return UUID.randomUUID().toString(); + } + + public static String bytes2String(byte[] bytes) { + Prediction.checkNotNull(bytes, "bytes could not be null"); + return new String(bytes, Charset.forName(DEFAULT_CHARSET)); + } + + public static byte[] string2Bytes(String source) { + Prediction.checkNotNull(source, "source could be null"); + try { + return source.getBytes(DEFAULT_CHARSET); + } + catch (UnsupportedEncodingException e) { + throw new JMSRuntimeException(ExceptionUtils.getStackTrace(e)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java new file mode 100644 index 0000000..3ff1d69 --- /dev/null +++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.support; + +/** + * Converter that convert object directly, which means Integer can only be + * converted to Integer,rather than Integer and Long. + */ +public class ObjectTypeCast { + + public static String cast2String(Object obj) { + if (obj == null) { + return null; + } + if (String.class.isInstance(obj)) { + return (String) obj; + } + throw new ClassCastException("To casted object is " + obj.getClass() + ", not String.class"); + } + + public static Long cast2Long(Object obj) { + if (obj == null) { + return null; + } + if (Long.class.isInstance(obj)) { + return (Long) obj; + } + throw new ClassCastException("To casted object is " + obj.getClass() + ", not Long.class"); + } + + public static Integer cast2Integer(Object obj) { + if (obj == null) { + return null; + } + if (Integer.class.isInstance(obj)) { + return (Integer) obj; + } + throw new ClassCastException("To casted object is " + obj.getClass() + ", not Integer.class"); + } + + public static Boolean cast2Boolean(Object obj) { + if (obj == null) { + return null; + } + if (Boolean.class.isInstance(obj)) { + return (Boolean) obj; + } + throw new ClassCastException("To casted object is " + obj.getClass() + ", not Boolean.class"); + } + + public static <T> T cast2Object(Object obj, Class<T> target) { + if (obj == null) { + return null; + } + if (target.isInstance(obj)) { + return (T) obj; + } + throw new ClassCastException("To casted object is " + obj.getClass() + ", not " + target.getSimpleName() + ".class"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/Prediction.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/Prediction.java b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/Prediction.java new file mode 100644 index 0000000..868c5d8 --- /dev/null +++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/Prediction.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.support; + +public class Prediction { + + public static void checkNotNull(Object obj, String errorMsg) { + if (obj == null) { + throw new IllegalArgumentException(errorMsg); + } + } + + public static void checkNotBlank(String source, String errorMsg) { + if (source == null || source.trim().length() == 0) { + throw new IllegalArgumentException(errorMsg); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/PrimitiveTypeCast.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/PrimitiveTypeCast.java b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/PrimitiveTypeCast.java new file mode 100644 index 0000000..6e24ab0 --- /dev/null +++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/PrimitiveTypeCast.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.support; + +import javax.jms.JMSException; +import javax.jms.MapMessage; + +/** + * Primitive type converter, according to the conversion table in {@link MapMessage}. + */ +public class PrimitiveTypeCast { + + /** + * Indicate if the parameter obj is primitive type. + * + * @param obj that to be check + * @return true if the obj is primitive type, otherwise return false + */ + public static boolean isPrimitiveType(Object obj) { + if (obj == null) { + return false; + } + if (Boolean.class.isInstance(obj) + || Byte.class.isInstance(obj) + || Short.class.isInstance(obj) + || Character.class.isInstance(obj) + || Integer.class.isInstance(obj) + || Long.class.isInstance(obj) + || Float.class.isInstance(obj) + || Double.class.isInstance(obj) + || String.class.isInstance(obj) + || byte[].class.isInstance(obj)) { + return true; + } + + return false; + } + + public static boolean cast2Boolean(Object obj) throws JMSException { + if (obj == null) { + return Boolean.valueOf(null); + } + + if (Boolean.class.isInstance(obj)) { + return (Boolean) obj; + } + if (String.class.isInstance(obj)) { + return Boolean.valueOf((String) obj); + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } + + public static byte cast2Byte(Object obj) throws JMSException { + if (obj == null) { + return Byte.valueOf(null); + } + + if (Byte.class.isInstance(obj)) { + return (Byte) obj; + } + if (String.class.isInstance(obj)) { + return Byte.valueOf((String) obj); + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } + + public static short cast2Short(Object obj) throws JMSException { + if (obj == null) { + return Short.valueOf(null); + } + + if (Byte.class.isInstance(obj)) { + return ((Byte) obj).shortValue(); + } + if (Short.class.isInstance(obj)) { + return (Short) obj; + } + if (String.class.isInstance(obj)) { + return Short.valueOf((String) obj); + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } + + public static char cast2Char(Object obj) throws JMSException { + if (obj == null) { + throw new NullPointerException("Obj is required"); + } + + if (Character.class.isInstance(obj)) { + return (Character) obj; + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } + + public static int cast2Int(Object obj) throws JMSException { + if (obj == null) { + return Integer.valueOf(null); + } + + if (Byte.class.isInstance(obj)) { + return ((Byte) obj).intValue(); + } + if (Short.class.isInstance(obj)) { + return ((Short) obj).intValue(); + } + if (Integer.class.isInstance(obj)) { + return (Integer) obj; + } + if (String.class.isInstance(obj)) { + return Integer.parseInt((String) obj); + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } + + public static long cast2Long(Object obj) throws JMSException { + if (obj == null) { + return Long.valueOf(null); + } + + if (Byte.class.isInstance(obj)) { + return ((Byte) obj).longValue(); + } + if (Short.class.isInstance(obj)) { + return ((Short) obj).longValue(); + } + if (Integer.class.isInstance(obj)) { + return ((Integer) obj).longValue(); + } + if (Long.class.isInstance(obj)) { + return (Long) obj; + } + if (String.class.isInstance(obj)) { + return Long.parseLong((String) obj); + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } + + public static float cast2Float(Object obj) throws JMSException { + if (obj == null) { + return Float.valueOf(null); + } + + if (Float.class.isInstance(obj)) { + return (Float) obj; + } + if (String.class.isInstance(obj)) { + return Float.parseFloat((String) obj); + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } + + public static double cast2Double(Object obj) throws JMSException { + if (obj == null) { + return Double.valueOf(null); + } + + if (Float.class.isInstance(obj)) { + return ((Float) obj).doubleValue(); + } + if (Double.class.isInstance(obj)) { + return (Double) obj; + } + if (String.class.isInstance(obj)) { + return Double.parseDouble((String) obj); + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } + + public static String cast2String(Object obj) throws JMSException { + if (obj == null) { + return String.valueOf(null); + } + + if (Boolean.class.isInstance(obj) + || Byte.class.isInstance(obj) + || Short.class.isInstance(obj) + || Character.class.isInstance(obj) + || Integer.class.isInstance(obj) + || Long.class.isInstance(obj) + || Float.class.isInstance(obj) + || Double.class.isInstance(obj) + || String.class.isInstance(obj) + ) { + return obj.toString(); + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } + + public static byte[] cast2ByteArray(Object obj) throws JMSException { + if (obj instanceof byte[]) { + return (byte[]) obj; + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/ProviderVersion.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/ProviderVersion.java b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/ProviderVersion.java new file mode 100644 index 0000000..c67e71c --- /dev/null +++ b/rocketmq-jms/src/main/java/org/apache/rocketmq/jms/support/ProviderVersion.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.support; + +public class ProviderVersion { + + public static final Version CURRENT_VERSION = Version.V1_1_0; + + public enum Version { + + V1_1_0(1); + private int value; + + Version(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/main/resources/logback.xml ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/main/resources/logback.xml b/rocketmq-jms/src/main/resources/logback.xml new file mode 100644 index 0000000..39da112 --- /dev/null +++ b/rocketmq-jms/src/main/resources/logback.xml @@ -0,0 +1,56 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="DefaultAppender" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>${user.home}/logs/rocketmq/jms.log</file> + <append>true</append> + <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> + <fileNamePattern>${user.home}/logs/rocketmq/otherdays/jms.%i.log + </fileNamePattern> + <minIndex>1</minIndex> + <maxIndex>10</maxIndex> + </rollingPolicy> + <triggeringPolicy + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + <maxFileSize>100MB</maxFileSize> + </triggeringPolicy> + <encoder> + <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern> + <charset class="java.nio.charset.Charset">UTF-8</charset> + </encoder> + </appender> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <append>true</append> + <encoder> + <pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n</pattern> + <charset class="java.nio.charset.Charset">UTF-8</charset> + </encoder> + </appender> + + <logger name="org.apache.rocketmq.jms"> + <level value="DEBUG"/> + </logger> + + <root> + <level value="ERROR"/> + <appender-ref ref="STDOUT"/> + </root> +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/RocketMQConnectionFactoryTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/RocketMQConnectionFactoryTest.java b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/RocketMQConnectionFactoryTest.java new file mode 100644 index 0000000..61f1e54 --- /dev/null +++ b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/RocketMQConnectionFactoryTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms; + +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsNull.notNullValue; + +public class RocketMQConnectionFactoryTest { + + @Test + public void testClientId() throws Exception { + final String nameServerAddress = "localhost:6789"; + RocketMQConnectionFactory connectionFactory = new RocketMQConnectionFactory(nameServerAddress); + + assertThat(connectionFactory.getNameServerAddress(), is(nameServerAddress)); + assertThat(connectionFactory.getClientId(), notNullValue()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/destination/RocketMQQueueTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/destination/RocketMQQueueTest.java b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/destination/RocketMQQueueTest.java new file mode 100644 index 0000000..0a3b36b --- /dev/null +++ b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/destination/RocketMQQueueTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.destination; + +import org.junit.Test; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +public class RocketMQQueueTest { + + @Test + public void test() throws Exception { + RocketMQQueue queue = new RocketMQQueue("MyQueue"); + + assertThat(queue.getQueueName(), is("MyQueue")); + assertThat(queue.toString(), is("MyQueue")); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/destination/RocketMQTopicTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/destination/RocketMQTopicTest.java b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/destination/RocketMQTopicTest.java new file mode 100644 index 0000000..c482e1c --- /dev/null +++ b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/destination/RocketMQTopicTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.destination; + +import org.junit.Test; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +public class RocketMQTopicTest { + + @Test + public void test() throws Exception { + RocketMQTopic topic = new RocketMQTopic("MyTopic"); + + assertThat(topic.getTopicName(), is("MyTopic")); + assertThat(topic.toString(), is("MyTopic")); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java new file mode 100644 index 0000000..62b5056 --- /dev/null +++ b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.hook; + +import java.text.SimpleDateFormat; +import java.util.Date; +import javax.jms.Message; +import org.apache.rocketmq.jms.exception.MessageExpiredException; +import org.apache.rocketmq.jms.msg.JMSTextMessage; +import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum; +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; + +public class ReceiveMessageHookTest { + + @Test(expected = MessageExpiredException.class) + public void testValidateFail() throws Exception { + ReceiveMessageHook hook = new ReceiveMessageHook(); + + Message message = new JMSTextMessage("text"); + message.setJMSExpiration(new Date().getTime()); + Thread.sleep(100); + hook.before(message); + } + + @Test + public void testValidateSuccess() throws Exception { + ReceiveMessageHook hook = new ReceiveMessageHook(); + + Message message = new JMSTextMessage("text"); + // never expired + message.setJMSExpiration(0); + hook.before(message); + + // expired in the future + message.setJMSExpiration(new SimpleDateFormat("yyyy-MM-dd").parse("2999-01-01").getTime()); + hook.before(message); + } + + @Test + public void setProviderProperties() throws Exception { + ReceiveMessageHook hook = new ReceiveMessageHook(); + + Message message = new JMSTextMessage("text"); + hook.before(message); + + assertThat(message.getLongProperty(JMSPropertiesEnum.JMSXRcvTimestamp.name()), greaterThan(0L)); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java new file mode 100644 index 0000000..29e91ec --- /dev/null +++ b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.hook; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import org.apache.rocketmq.jms.RocketMQProducer; +import org.apache.rocketmq.jms.destination.RocketMQTopic; +import org.apache.rocketmq.jms.exception.UnsupportDeliveryModelException; +import org.apache.rocketmq.jms.msg.JMSTextMessage; +import org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum; +import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum; +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsNot.not; +import static org.hamcrest.core.IsNull.notNullValue; +import static org.hamcrest.core.IsNull.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SendMessageHookTest { + + @Test(expected = UnsupportDeliveryModelException.class) + public void testValidate() throws Exception { + final JMSTextMessage message = new JMSTextMessage("text"); + final RocketMQTopic destination = new RocketMQTopic("destination"); + final int deliveryMode = DeliveryMode.NON_PERSISTENT; + final int priority = 4; + final long timeToLive = 1000 * 100L; + + SendMessageHook hook = new SendMessageHook(); + hook.before(message, destination, deliveryMode, priority, timeToLive); + } + + @Test + public void testSetHeader() throws Exception { + RocketMQProducer producer = mock(RocketMQProducer.class); + when(producer.getDeliveryDelay()).thenReturn(0L); + + final JMSTextMessage message = new JMSTextMessage("text"); + final Destination destination = new RocketMQTopic("destination"); + final int deliveryMode = DeliveryMode.PERSISTENT; + final int priority = 5; + long timeToLive = JMSHeaderEnum.JMS_TIME_TO_LIVE_DEFAULT_VALUE; + SendMessageHook hook = new SendMessageHook(producer); + hook.before(message, destination, deliveryMode, priority, timeToLive); + + assertThat(message.getJMSDestination(), is(destination)); + assertThat(message.getJMSDeliveryMode(), is(JMSHeaderEnum.JMS_DELIVERY_MODE_DEFAULT_VALUE)); + assertThat(message.getJMSExpiration(), is(0L)); + assertThat(message.getJMSDeliveryTime(), notNullValue()); + assertThat(message.getJMSPriority(), is(5)); + assertThat(message.getJMSMessageID(), notNullValue()); + assertThat(message.getJMSTimestamp(), notNullValue()); + } + + /** + * Disable ID,timestamp, and set expired time + * + * @throws Exception + */ + @Test + public void testSetHeader2() throws Exception { + RocketMQProducer producer = mock(RocketMQProducer.class); + when(producer.getUserName()).thenReturn("user"); + when(producer.getDisableMessageID()).thenReturn(true); + when(producer.getDisableMessageTimestamp()).thenReturn(true); + + final JMSTextMessage message = new JMSTextMessage("text"); + final Destination destination = new RocketMQTopic("destination"); + final int deliveryMode = DeliveryMode.PERSISTENT; + final int priority = 5; + final long timeToLive = 1000 * 100L; + SendMessageHook hook = new SendMessageHook(producer); + hook.before(message, destination, deliveryMode, priority, timeToLive); + + // assert header + assertThat(message.getJMSMessageID(), nullValue()); + assertThat(message.getJMSTimestamp(), is(0L)); + assertThat(message.getJMSExpiration(), not(is(0L))); + + // assert properties + assertThat(message.getStringProperty(JMSPropertiesEnum.JMSXUserID.name()), is("user")); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java new file mode 100644 index 0000000..ae08bec --- /dev/null +++ b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration.source; + +import javax.jms.ConnectionFactory; +import org.apache.rocketmq.jms.RocketMQConnectionFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.jms.annotation.EnableJms; +import org.springframework.jms.config.DefaultJmsListenerContainerFactory; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.support.converter.SimpleMessageConverter; + +@Configuration +@ComponentScan(basePackageClasses = {RocketMQServer.class}) +@EnableJms +public class AppConfig { + + @Bean + public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { + DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); + factory.setConnectionFactory(connectionFactory()); + factory.setConcurrency("1"); + return factory; + } + + @Bean + public ConnectionFactory connectionFactory() { +// CachingConnectionFactory factory = new CachingConnectionFactory(); +// factory.setTargetConnectionFactory(new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS)); +// return factory; + //todo + return new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS); + } + + @Bean + public JmsTemplate jmsTemplate() { + JmsTemplate jmsTemplate = new JmsTemplate(); + jmsTemplate.setConnectionFactory(connectionFactory()); + jmsTemplate.setMessageConverter(new SimpleMessageConverter()); + return jmsTemplate; + } +}