http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java b/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java deleted file mode 100644 index ea4b49e..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java +++ /dev/null @@ -1,434 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.domain.message; - -import com.google.common.collect.Maps; -import com.google.common.io.BaseEncoding; -import java.io.Serializable; -import java.util.Enumeration; -import java.util.Map; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import org.apache.commons.lang.builder.ToStringBuilder; -import org.apache.rocketmq.jms.domain.JmsBaseConstant; -import org.apache.rocketmq.jms.util.ExceptionUtil; - -public class JmsBaseMessage implements Message { - /** - * Message properties - */ - protected Map<String, Object> properties = Maps.newHashMap(); - /** - * Message headers - */ - protected Map<String, Object> headers = Maps.newHashMap(); - /** - * Message body - */ - protected Serializable body; - - @Override - public String getJMSMessageID() { - return (String) headers.get(JmsBaseConstant.JMS_MESSAGE_ID); - } - - /** - * Sets the message ID. - * <p/> - * <P>JMS providers set this field when a message is sent. Do not allow User to set the message ID by yourself. - * - * @param id the ID of the message - * @see javax.jms.Message#getJMSMessageID() - */ - - @Override - public void setJMSMessageID(String id) { - ExceptionUtil.handleUnSupportedException(); - } - - @Override - public long getJMSTimestamp() { - if (headers.containsKey(JmsBaseConstant.JMS_TIMESTAMP)) { - return (Long) headers.get(JmsBaseConstant.JMS_TIMESTAMP); - } - return 0; - } - - @Override - public void setJMSTimestamp(long timestamp) { - ExceptionUtil.handleUnSupportedException(); - } - - @Override - public byte[] getJMSCorrelationIDAsBytes() { - String jmsCorrelationID = getJMSCorrelationID(); - if (jmsCorrelationID != null) { - try { - return BaseEncoding.base64().decode(jmsCorrelationID); - } - catch (Exception e) { - return jmsCorrelationID.getBytes(); - } - } - return null; - } - - @Override - public void setJMSCorrelationIDAsBytes(byte[] correlationID) { - String encodedText = BaseEncoding.base64().encode(correlationID); - setJMSCorrelationID(encodedText); - } - - @Override - public String getJMSCorrelationID() { - if (headers.containsKey(JmsBaseConstant.JMS_CORRELATION_ID)) { - return (String) headers.get(JmsBaseConstant.JMS_CORRELATION_ID); - } - return null; - } - - @Override - public void setJMSCorrelationID(String correlationID) { - ExceptionUtil.handleUnSupportedException(); - } - - @Override - public Destination getJMSReplyTo() { - if (headers.containsKey(JmsBaseConstant.JMS_REPLY_TO)) { - return (Destination) headers.get(JmsBaseConstant.JMS_REPLY_TO); - } - return null; - } - - @Override - public void setJMSReplyTo(Destination replyTo) { - ExceptionUtil.handleUnSupportedException(); - } - - @Override - public String toString() { - return ToStringBuilder.reflectionToString(this); - } - - @Override - public Destination getJMSDestination() { - if (headers.containsKey(JmsBaseConstant.JMS_DESTINATION)) { - return (Destination) headers.get(JmsBaseConstant.JMS_DESTINATION); - } - return null; - } - - @Override - public void setJMSDestination(Destination destination) { - ExceptionUtil.handleUnSupportedException(); - } - - @SuppressWarnings("unchecked") - public <T> T getBody(Class<T> clazz) throws JMSException { - if (clazz.isInstance(body)) { - return (T) body; - } - else { - throw new IllegalArgumentException("The class " + clazz - + " is unknown to this implementation"); - } - } - - @Override - public int getJMSDeliveryMode() { - if (headers.containsKey(JmsBaseConstant.JMS_DELIVERY_MODE)) { - return (Integer) headers.get(JmsBaseConstant.JMS_DELIVERY_MODE); - } - return 0; - } - - /** - * Sets the <CODE>DeliveryMode</CODE> value for this message. - * <p/> - * <P>JMS providers set this field when a message is sent. ROCKETMQ only support DeliveryMode.PERSISTENT mode. So do not - * allow User to set this by yourself, but you can get the default mode by <CODE>getJMSDeliveryMode</CODE> method. - * - * @param deliveryMode the delivery mode for this message - * @see javax.jms.Message#getJMSDeliveryMode() - * @see javax.jms.DeliveryMode - */ - - @Override - public void setJMSDeliveryMode(int deliveryMode) { - ExceptionUtil.handleUnSupportedException(); - } - - public boolean isBodyAssignableTo(Class<?> clazz) throws JMSException { - return clazz.isInstance(body); - } - - @Override - public boolean getJMSRedelivered() { - return headers.containsKey(JmsBaseConstant.JMS_REDELIVERED) - && (Boolean) headers.get(JmsBaseConstant.JMS_REDELIVERED); - } - - @Override - public void setJMSRedelivered(boolean redelivered) { - ExceptionUtil.handleUnSupportedException(); - } - - /** - * copy meta data from source message - * - * @param sourceMessage source message - */ - public void copyMetaData(JmsBaseMessage sourceMessage) { - if (!sourceMessage.getHeaders().isEmpty()) { - for (Map.Entry<String, Object> entry : sourceMessage.getHeaders().entrySet()) { - if (!headerExits(entry.getKey())) { - setHeader(entry.getKey(), entry.getValue()); - } - } - } - if (!sourceMessage.getProperties().isEmpty()) { - for (Map.Entry<String, Object> entry : sourceMessage.getProperties().entrySet()) { - if (!propertyExists(entry.getKey())) { - setObjectProperty(entry.getKey(), entry.getValue()); - } - } - } - } - - @Override - public String getJMSType() { - return (String) headers.get(JmsBaseConstant.JMS_TYPE); - } - - @Override - public void setJMSType(String type) { - ExceptionUtil.handleUnSupportedException(); - } - - public Map<String, Object> getHeaders() { - return this.headers; - } - - @Override - public long getJMSExpiration() { - if (headers.containsKey(JmsBaseConstant.JMS_EXPIRATION)) { - return (Long) headers.get(JmsBaseConstant.JMS_EXPIRATION); - } - return 0; - } - - @Override - public void setJMSExpiration(long expiration) { - ExceptionUtil.handleUnSupportedException(); - } - - public boolean headerExits(String name) { - return this.headers.containsKey(name); - } - - @Override - public int getJMSPriority() { - if (headers.containsKey(JmsBaseConstant.JMS_PRIORITY)) { - return (Integer) headers.get(JmsBaseConstant.JMS_PRIORITY); - } - return 5; - } - - @Override - public void setJMSPriority(int priority) { - ExceptionUtil.handleUnSupportedException(); - } - - public void setHeader(String name, Object value) { - this.headers.put(name, value); - } - - public Map<String, Object> getProperties() { - return this.properties; - } - - public void setProperties(Map<String, Object> properties) { - this.properties = properties; - } - - @Override - public void acknowledge() throws JMSException { - throw new UnsupportedOperationException("Unsupported!"); - } - - @Override - public void clearProperties() { - this.properties.clear(); - } - - @Override - public void clearBody() { - this.body = null; - } - - @Override - public boolean propertyExists(String name) { - return properties.containsKey(name); - } - - @Override - public boolean getBooleanProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return value instanceof Boolean ? (Boolean) value : Boolean.valueOf(value.toString()); - } - return false; - } - - @Override - public byte getByteProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return value instanceof Byte ? (Byte) value : Byte.valueOf(value.toString()); - } - return 0; - } - - @Override - public short getShortProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return value instanceof Short ? (Short) value : Short.valueOf(value.toString()); - } - return 0; - } - - @Override - public int getIntProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return value instanceof Integer ? (Integer) value : Integer.valueOf(value.toString()); - } - return 0; - } - - @Override - public long getLongProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return value instanceof Long ? (Long) value : Long.valueOf(value.toString()); - } - return 0L; - } - - @Override - public float getFloatProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return value instanceof Float ? (Float) value : Float.valueOf(value.toString()); - } - return 0f; - } - - @Override - public double getDoubleProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return value instanceof Double ? (Double) value : Double.valueOf(value.toString()); - } - return 0d; - } - - @Override - public String getStringProperty(String name) throws JMSException { - if (propertyExists(name)) { - return getObjectProperty(name).toString(); - } - return null; - } - - @Override - public Object getObjectProperty(String name) throws JMSException { - return this.properties.get(name); - } - - @Override - public Enumeration<?> getPropertyNames() throws JMSException { - final Object[] keys = this.properties.keySet().toArray(); - return new Enumeration<Object>() { - int i; - - @Override - public boolean hasMoreElements() { - return i < keys.length; - } - - @Override - public Object nextElement() { - return keys[i++]; - } - }; - } - - @Override - public void setBooleanProperty(String name, boolean value) { - setObjectProperty(name, value); - } - - @Override - public void setByteProperty(String name, byte value) { - setObjectProperty(name, value); - } - - @Override - public void setShortProperty(String name, short value) { - setObjectProperty(name, value); - } - - @Override - public void setIntProperty(String name, int value) { - setObjectProperty(name, value); - } - - @Override - public void setLongProperty(String name, long value) { - setObjectProperty(name, value); - } - - public void setFloatProperty(String name, float value) { - setObjectProperty(name, value); - } - - @Override - public void setDoubleProperty(String name, double value) { - setObjectProperty(name, value); - } - - @Override - public void setStringProperty(String name, String value) { - setObjectProperty(name, value); - } - - @Override - public void setObjectProperty(String name, Object value) { - if (value instanceof Number || value instanceof String || value instanceof Boolean) { - this.properties.put(name, value); - } - else { - throw new IllegalArgumentException( - "Value should be boolean, byte, short, int, long, float, double, and String."); - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java b/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java deleted file mode 100644 index b1e85b0..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.domain.message; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.IOException; -import javax.jms.BytesMessage; -import javax.jms.JMSException; -import javax.jms.MessageEOFException; -import javax.jms.MessageFormatException; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; -import org.apache.rocketmq.jms.util.ExceptionUtil; - -/** - * The <CODE>BytesMessage</CODE> methods are based largely on those found in <CODE>java.io.DataInputStream</CODE> and - * <CODE>java.io.DataOutputStream</CODE>. <P> Notice:Although the JMS API allows the use of message properties with byte - * messages, they are typically not used, since the inclusion of properties may affect the format. <P> - */ -public class JmsBytesMessage extends JmsBaseMessage implements BytesMessage { - private DataInputStream dataAsInput; - private DataOutputStream dataAsOutput; - private ByteArrayOutputStream bytesOut; - private byte[] bytesIn; - - /** - * Message created for reading - * - * @param data - */ - public JmsBytesMessage(byte[] data) { - this.bytesIn = data; - dataAsInput = new DataInputStream(new ByteArrayInputStream(data, 0, data.length)); - } - - /** - * Message created to be sent - */ - public JmsBytesMessage() { - bytesOut = new ByteArrayOutputStream(); - dataAsOutput = new DataOutputStream(bytesOut); - } - - public long getBodyLength() throws JMSException { - return getData().length; - } - - /** - * @return the data - */ - public byte[] getData() { - if (bytesOut != null) { - return bytesOut.toByteArray(); - } - else { - return bytesIn; - } - - } - - public boolean readBoolean() throws JMSException { - throw new UnsupportedOperationException("Unsupported!"); - } - - public byte readByte() throws JMSException { - throw new UnsupportedOperationException("Unsupported!"); - } - - public int readUnsignedByte() throws JMSException { - throw new UnsupportedOperationException("Unsupported!"); - } - - public short readShort() throws JMSException { - throw new UnsupportedOperationException("Unsupported!"); - } - - public int readUnsignedShort() throws JMSException { - throw new UnsupportedOperationException("Unsupported!"); - } - - public char readChar() throws JMSException { - throw new UnsupportedOperationException("Unsupported!"); - } - - public int readInt() throws JMSException { - throw new UnsupportedOperationException("Unsupported!"); - } - - public long readLong() throws JMSException { - throw new UnsupportedOperationException("Unsupported!"); - } - - public float readFloat() throws JMSException { - throw new UnsupportedOperationException("Unsupported!"); - } - - public double readDouble() throws JMSException { - throw new UnsupportedOperationException("Unsupported!"); - } - - public String readUTF() throws JMSException { - throw new UnsupportedOperationException("Unsupported!"); - } - - public int readBytes(byte[] value) throws JMSException { - return readBytes(value, value.length); - } - - public int readBytes(byte[] value, int length) throws JMSException { - if (length > value.length) { - throw new IndexOutOfBoundsException("length must be smaller than the length of value"); - } - if (dataAsInput == null) { - throw new MessageNotReadableException("Message is not readable! "); - } - try { - int offset = 0; - while (offset < length) { - int read = dataAsInput.read(value, offset, length - offset); - if (read < 0) { - break; - } - offset += read; - } - - if (offset == 0 && length != 0) { - return -1; - } - else { - return offset; - } - } - catch (IOException e) { - throw handleInputException(e); - } - - } - - public void writeBoolean(boolean value) throws JMSException { - ExceptionUtil.handleUnSupportedException(); - } - - public void writeByte(byte value) throws JMSException { - ExceptionUtil.handleUnSupportedException(); - } - - public void writeShort(short value) throws JMSException { - ExceptionUtil.handleUnSupportedException(); - } - - public void writeChar(char value) throws JMSException { - ExceptionUtil.handleUnSupportedException(); - } - - public void writeInt(int value) throws JMSException { - ExceptionUtil.handleUnSupportedException(); - } - - public void writeLong(long value) throws JMSException { - ExceptionUtil.handleUnSupportedException(); - } - - public void writeFloat(float value) throws JMSException { - ExceptionUtil.handleUnSupportedException(); - } - - public void writeDouble(double value) throws JMSException { - ExceptionUtil.handleUnSupportedException(); - } - - public void writeUTF(String value) throws JMSException { - ExceptionUtil.handleUnSupportedException(); - } - - public void writeBytes(byte[] value) throws JMSException { - if (dataAsOutput == null) { - throw new MessageNotWriteableException("Message is not writable! "); - } - try { - dataAsOutput.write(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeBytes(byte[] value, int offset, int length) throws JMSException { - if (dataAsOutput == null) { - throw new MessageNotWriteableException("Message is not writable! "); - } - try { - dataAsOutput.write(value, offset, length); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeObject(Object value) throws JMSException { - ExceptionUtil.handleUnSupportedException(); - } - - public void reset() throws JMSException { - ExceptionUtil.handleUnSupportedException(); - } - - private JMSException handleOutputException(final IOException e) { - JMSException ex = new JMSException(e.getMessage()); - ex.initCause(e); - ex.setLinkedException(e); - return ex; - } - - private JMSException handleInputException(final IOException e) { - JMSException ex; - if (e instanceof EOFException) { - ex = new MessageEOFException(e.getMessage()); - } - else { - ex = new MessageFormatException(e.getMessage()); - } - ex.initCause(e); - ex.setLinkedException(e); - return ex; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java b/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java deleted file mode 100644 index f67da14..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.domain.message; - -import java.io.Serializable; -import javax.jms.JMSException; -import javax.jms.ObjectMessage; - -public class JmsObjectMessage extends JmsBaseMessage implements ObjectMessage { - - public JmsObjectMessage(Serializable object) { - this.body = object; - } - - public JmsObjectMessage() { - - } - - public Serializable getObject() throws JMSException { - return this.body; - } - - public void setObject(Serializable object) throws JMSException { - this.body = object; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java b/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java deleted file mode 100644 index ce19b51..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.domain.message; - -import javax.jms.JMSException; -import javax.jms.TextMessage; - -public class JmsTextMessage extends JmsBaseMessage implements TextMessage { - private String text; - - public JmsTextMessage() { - - } - - public JmsTextMessage(String text) { - setText(text); - } - - public void clearBody() { - this.text = null; - super.clearBody(); - } - - public String getText() throws JMSException { - return this.text; - } - - public void setText(String text) { - this.body = text; - this.text = text; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java b/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java deleted file mode 100644 index bd926e5..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.util; - -import com.google.common.base.Preconditions; -import javax.jms.JMSException; - -public class ExceptionUtil { - public static final boolean SKIP_SET_EXCEPTION - = Boolean.parseBoolean(System.getProperty("skip.set.exception", "false")); - - public static void handleUnSupportedException() { - if (!ExceptionUtil.SKIP_SET_EXCEPTION) { - throw new UnsupportedOperationException("Operation unsupported! If you want to skip this Exception," + - " use '-Dskip.set.exception=true' in JVM options."); - } - } - - public static JMSException convertToJmsException(Exception e, String extra) { - Preconditions.checkNotNull(extra); - Preconditions.checkNotNull(e); - JMSException jmsException = new JMSException(extra); - jmsException.initCause(e); - return jmsException; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java b/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java deleted file mode 100644 index 3cf03f9..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.util; - -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageExt; -import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import javax.jms.BytesMessage; -import javax.jms.ObjectMessage; -import javax.jms.TextMessage; -import org.apache.commons.lang.StringUtils; -import org.apache.rocketmq.jms.domain.JmsBaseConstant; -import org.apache.rocketmq.jms.domain.JmsBaseTopic; -import org.apache.rocketmq.jms.domain.message.JmsBaseMessage; -import org.apache.rocketmq.jms.domain.message.JmsBytesMessage; -import org.apache.rocketmq.jms.domain.message.JmsObjectMessage; -import org.apache.rocketmq.jms.domain.message.JmsTextMessage; - -import static org.apache.rocketmq.jms.domain.JmsBaseMessageProducer.initRocketMQHeaders; - -public class MessageConverter { - public static byte[] getContentFromJms(javax.jms.Message jmsMessage) throws Exception { - byte[] content; - if (jmsMessage instanceof TextMessage) { - if (StringUtils.isEmpty(((TextMessage) jmsMessage).getText())) { - throw new IllegalArgumentException("Message body length is zero"); - } - content = MsgConvertUtil.string2Bytes(((TextMessage) jmsMessage).getText(), - Charsets.UTF_8.toString()); - } - else if (jmsMessage instanceof ObjectMessage) { - if (((ObjectMessage) jmsMessage).getObject() == null) { - throw new IllegalArgumentException("Message body length is zero"); - } - content = MsgConvertUtil.objectSerialize(((ObjectMessage) jmsMessage).getObject()); - } - else if (jmsMessage instanceof BytesMessage) { - JmsBytesMessage bytesMessage = (JmsBytesMessage) jmsMessage; - if (bytesMessage.getBodyLength() == 0) { - throw new IllegalArgumentException("Message body length is zero"); - } - content = bytesMessage.getData(); - } - else { - throw new IllegalArgumentException("Unknown message type " + jmsMessage.getJMSType()); - } - - return content; - } - - public static JmsBaseMessage convert2JMSMessage(MessageExt msg) throws Exception { - JmsBaseMessage message; - if (MsgConvertUtil.MSGMODEL_BYTES.equals( - msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) { - message = new JmsBytesMessage(msg.getBody()); - } - else if (MsgConvertUtil.MSGMODEL_OBJ.equals( - msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) { - message = new JmsObjectMessage(MsgConvertUtil.objectDeserialize(msg.getBody())); - } - else if (MsgConvertUtil.MSGMODEL_TEXT.equals( - msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) { - message = new JmsTextMessage(MsgConvertUtil.bytes2String(msg.getBody(), - Charsets.UTF_8.toString())); - } - else { - // rocketmq producer sends bytesMessage without setting JMS_MSGMODEL. - message = new JmsBytesMessage(msg.getBody()); - } - - //-------------------------set headers------------------------- - Map<String, Object> properties = new HashMap<String, Object>(); - - message.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:" + msg.getMsgId()); - - if (msg.getReconsumeTimes() > 0) { - message.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.TRUE); - } - else { - message.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.FALSE); - } - - Map<String, String> propertiesMap = msg.getProperties(); - if (propertiesMap != null) { - for (String properName : propertiesMap.keySet()) { - String properValue = propertiesMap.get(properName); - if (JmsBaseConstant.JMS_DESTINATION.equals(properName)) { - String destinationStr = properValue; - if (null != destinationStr) { - List<String> msgTuple = Arrays.asList(destinationStr.split(":")); - message.setHeader(JmsBaseConstant.JMS_DESTINATION, - new JmsBaseTopic(msgTuple.get(0), msgTuple.get(1))); - } - } - else if (JmsBaseConstant.JMS_DELIVERY_MODE.equals(properName) || - JmsBaseConstant.JMS_PRIORITY.equals(properName)) { - message.setHeader(properName, properValue); - } - else if (JmsBaseConstant.JMS_TIMESTAMP.equals(properName) || - JmsBaseConstant.JMS_EXPIRATION.equals(properName)) { - message.setHeader(properName, properValue); - } - else if (JmsBaseConstant.JMS_CORRELATION_ID.equals(properName) || - JmsBaseConstant.JMS_TYPE.equals(properName)) { - message.setHeader(properName, properValue); - } - else if (JmsBaseConstant.JMS_MESSAGE_ID.equals(properName) || - JmsBaseConstant.JMS_REDELIVERED.equals(properName)) { - //JMS_MESSAGE_ID should set by msg.getMsgID() - continue; - } - else { - properties.put(properName, properValue); - } - } - } - - //Handle System properties, put into header. - //add what? - message.setProperties(properties); - - return message; - } - - public static Message convert2RMQMessage(JmsBaseMessage jmsMsg) throws Exception { - Message rocketmqMsg = new MessageExt(); - // 1. Transform message body - rocketmqMsg.setBody(MessageConverter.getContentFromJms(jmsMsg)); - - // 2. Transform topic and messageType - JmsBaseTopic destination = (JmsBaseTopic) jmsMsg.getHeaders().get(JmsBaseConstant.JMS_DESTINATION); - String topic = destination.getMessageTopic(); - rocketmqMsg.setTopic(topic); - String messageType = destination.getMessageType(); - Preconditions.checkState(!messageType.contains("||"), - "'||' can not be in the destination when sending a message"); - rocketmqMsg.setTags(messageType); - - // 3. Transform message properties - Properties properties = initRocketMQHeaders(jmsMsg, topic, messageType); - for (String name : properties.stringPropertyNames()) { - String value = properties.getProperty(name); - if (MessageConst.PROPERTY_KEYS.equals(name)) { - rocketmqMsg.setKeys(value); - } else if (MessageConst.PROPERTY_TAGS.equals(name)) { - rocketmqMsg.setTags(value); - } else if (MessageConst.PROPERTY_DELAY_TIME_LEVEL.equals(name)) { - rocketmqMsg.setDelayTimeLevel(Integer.parseInt(value)); - } else if (MessageConst.PROPERTY_WAIT_STORE_MSG_OK.equals(name)) { - rocketmqMsg.setWaitStoreMsgOK(Boolean.parseBoolean(value)); - } else if (MessageConst.PROPERTY_BUYER_ID.equals(name)) { - rocketmqMsg.setBuyerId(value); - } else { - rocketmqMsg.putUserProperty(name, value); - } - } - - return rocketmqMsg; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java b/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java deleted file mode 100644 index ec55bbc..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.util; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; - -public class MsgConvertUtil { - - public static final byte[] EMPTY_BYTES = new byte[0]; - public static final String EMPTY_STRING = ""; - - public static final String JMS_MSGMODEL = "jmsMsgModel"; - /** - * To adapt this scene: "Notify client try to receive ObjectMessage sent by JMS client" Set notify out message - * model, value can be textMessage OR objectMessage - */ - public static final String COMPATIBLE_FIELD_MSGMODEL = "notifyOutMsgModel"; - - public static final String MSGMODEL_TEXT = "textMessage"; - public static final String MSGMODEL_BYTES = "bytesMessage"; - public static final String MSGMODEL_OBJ = "objectMessage"; - - public static final String MSG_TOPIC = "msgTopic"; - public static final String MSG_TYPE = "msgType"; - - public static byte[] objectSerialize(Object object) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(object); - oos.close(); - baos.close(); - return baos.toByteArray(); - } - - public static Serializable objectDeserialize(byte[] bytes) throws IOException, ClassNotFoundException { - ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - ObjectInputStream ois = new ObjectInputStream(bais); - ois.close(); - bais.close(); - return (Serializable) ois.readObject(); - } - - public static final byte[] string2Bytes(String s, String charset) { - if (null == s) { - return EMPTY_BYTES; - } - byte[] bs = null; - try { - bs = s.getBytes(charset); - } - catch (Exception e) { - // ignore - } - return bs; - } - - public static final String bytes2String(byte[] bs, String charset) { - if (null == bs) { - return EMPTY_STRING; - } - String s = null; - try { - s = new String(bs, charset); - } - catch (Exception e) { - // ignore - } - return s; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java b/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java deleted file mode 100644 index 9b29928..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.util; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import java.util.Map; -import org.apache.commons.lang.StringUtils; -import org.apache.rocketmq.jms.domain.CommonConstant; - -public abstract class URISpecParser { - - private static final String DEFAULT_BROKER = "rocketmq"; - - /** - * ConnectionUrl spec is broker://ip:port?key1=value1&key2=value2 - * - * @param uri Just like broker://ip:port?key1=value1&key2=value2 - * @return The parameters' map - */ - public static Map<String, String> parseURI(String uri) { - Preconditions.checkArgument(null != uri && !uri.trim().isEmpty(), "Uri can not be empty!"); - - Map<String, String> results = Maps.newHashMap(); - String broker = uri.substring(0, uri.indexOf(":")); - results.put(CommonConstant.PROVIDER, broker); - - if (broker.equals(DEFAULT_BROKER)) { - //Special handle for alibaba inner mq broker - String queryStr = uri.substring(uri.indexOf("?") + 1, uri.length()); - if (StringUtils.isNotEmpty(queryStr)) { - String[] params = queryStr.split("&"); - for (String param : params) { - if (param.contains("=")) { - String[] values = param.split("=", 2); - results.put(values[0], values[1]); - } - } - } - } - else { - throw new IllegalArgumentException("Broker must be rocketmq"); - } - return results; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/core/src/main/resources/application.conf b/core/src/main/resources/application.conf deleted file mode 100644 index 713c915..0000000 --- a/core/src/main/resources/application.conf +++ /dev/null @@ -1 +0,0 @@ -version = ${project.version} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java b/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java deleted file mode 100644 index d77b13e..0000000 --- a/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.Message; -import javax.jms.MessageListener; -import org.junit.Assert; - -public class JmsTestListener implements MessageListener { - - private int expectd; - private CountDownLatch latch; - private AtomicInteger consumedNum = new AtomicInteger(0); - - public JmsTestListener() { - this.expectd = 10; - } - public JmsTestListener(int expectd) { - this.expectd = expectd; - } - public JmsTestListener(int expected, CountDownLatch latch) { - this.expectd = expected; - this.latch = latch; - } - @Override - public void onMessage(Message message) { - try { - Assert.assertNotNull(message); - Assert.assertNotNull(message.getJMSMessageID()); - if (consumedNum.incrementAndGet() == expectd && latch != null) { - latch.countDown(); - } - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - - public int getConsumedNum() { - return consumedNum.get(); - } - - public void setLatch(CountDownLatch latch) { - this.latch = latch; - } - - public void setExpectd(int expectd) { - this.expectd = expectd; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java b/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java deleted file mode 100644 index 855cb19..0000000 --- a/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import java.lang.reflect.Field; -import java.util.concurrent.ConcurrentMap; -import org.apache.rocketmq.client.producer.MQProducer; -import org.apache.rocketmq.jms.domain.JmsBaseMessageConsumer; -import org.apache.rocketmq.jms.domain.JmsBaseMessageProducer; -import org.apache.rocketmq.jms.domain.RMQPushConsumerExt; -import org.junit.Assert; - -public class JmsTestUtil { - public static MQProducer getMQProducer(String producerId) throws Exception { - Assert.assertNotNull(producerId); - Field field = JmsBaseMessageProducer.class.getDeclaredField("producerMap"); - field.setAccessible(true); - ConcurrentMap<String, MQProducer> producerMap = (ConcurrentMap<String, MQProducer>) field.get(null); - return producerMap.get(producerId); - } - public static RMQPushConsumerExt getRMQPushConsumerExt(String consumerId) throws Exception { - Assert.assertNotNull(consumerId); - Field field = JmsBaseMessageConsumer.class.getDeclaredField("consumerMap"); - field.setAccessible(true); - ConcurrentMap<String, RMQPushConsumerExt> consumerMap = (ConcurrentMap<String, RMQPushConsumerExt>) field.get(null); - return consumerMap.get(consumerId); - } - public static void checkConsumerState(String consumerId, boolean isNull, boolean isStarted) throws Exception { - RMQPushConsumerExt rmqPushConsumerExt = getRMQPushConsumerExt(consumerId); - if (isNull) { - Assert.assertNull(rmqPushConsumerExt); - } else { - Assert.assertNotNull(rmqPushConsumerExt); - Assert.assertEquals(isStarted, rmqPushConsumerExt.isStarted()); - } - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java b/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java deleted file mode 100644 index 9fe9f5e..0000000 --- a/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.domain.message; - -import org.junit.Assert; -import org.junit.Test; - -public class JmsBytesMessageTest { - - private byte[] receiveData = "receive data test".getBytes(); - private byte[] sendData = "send data test".getBytes(); - - @Test - public void testGetData() throws Exception { - JmsBytesMessage readMessage = new JmsBytesMessage(receiveData); - - System.out.println(new String(readMessage.getData())); - Assert.assertEquals(new String(receiveData), new String(readMessage.getData())); - - JmsBytesMessage sendMessage = new JmsBytesMessage(); - sendMessage.writeBytes(sendData, 0, sendData.length); - - System.out.println(new String(sendMessage.getData())); - Assert.assertEquals(new String(sendData), new String(sendMessage.getData())); - - } - - @Test - public void testGetBodyLength() throws Exception { - - JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData); - - System.out.println(bytesMessage.getBodyLength()); - Assert.assertEquals(bytesMessage.getBodyLength(), receiveData.length); - } - - @Test - public void testReadBytes() throws Exception { - JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData); - - Assert.assertEquals(bytesMessage.getBodyLength(), receiveData.length); - byte[] receiveValue = new byte[receiveData.length]; - bytesMessage.readBytes(receiveValue); - - System.out.println(new String(receiveValue)); - Assert.assertEquals(new String(receiveValue), new String(receiveData)); - - } - - @Test - public void testReadBytes1() throws Exception { - JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData); - - byte[] receiveValue1 = new byte[2]; - bytesMessage.readBytes(receiveValue1, 2); - System.out.println(new String(receiveValue1)); - Assert.assertEquals(new String(receiveData).substring(0, 2), new String(receiveValue1)); - - byte[] receiceValue2 = new byte[2]; - bytesMessage.readBytes(receiceValue2, 2); - System.out.println(new String(receiceValue2)); - Assert.assertEquals(new String(receiveData).substring(2, 4), new String(receiceValue2)); - - } - - @Test - public void testWriteBytes() throws Exception { - JmsBytesMessage jmsBytesMessage = new JmsBytesMessage(); - jmsBytesMessage.writeBytes(sendData); - - System.out.println(new String(jmsBytesMessage.getData())); - Assert.assertEquals(new String(jmsBytesMessage.getData()), new String(sendData)); - - } - - @Test - public void testException() throws Exception { - JmsBytesMessage jmsBytesMessage = new JmsBytesMessage(); - - byte[] receiveValue = new byte[receiveData.length]; -// Throws out NullPointerException -// jmsBytesMessage.readBytes(receiveValue); - - JmsBytesMessage sendMessage = new JmsBytesMessage(sendData); -// Throws out NullPointerException -// sendMessage.writeBytes("hello again".getBytes()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java b/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java deleted file mode 100644 index b570142..0000000 --- a/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java +++ /dev/null @@ -1,52 +0,0 @@ -package org.apache.rocketmq.jms.domain.message; - -import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.jms.domain.JmsBaseConstant; -import org.apache.rocketmq.jms.domain.JmsBaseTopic; -import org.apache.rocketmq.jms.util.MessageConverter; -import org.apache.rocketmq.jms.util.MsgConvertUtil; -import org.junit.Assert; -import org.junit.Test; - -public class JmsMessageConvertTest { - @Test - public void testCovert2RMQ() throws Exception { - //init jmsBaseMessage - String topic = "TestTopic"; - String messageType = "TagA"; - - JmsBaseMessage jmsBaseMessage = new JmsTextMessage("testText"); - jmsBaseMessage.setHeader(JmsBaseConstant.JMS_DESTINATION, new JmsBaseTopic(topic, messageType)); - jmsBaseMessage.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:null"); - jmsBaseMessage.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.FALSE); - - jmsBaseMessage.setObjectProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_TEXT); - jmsBaseMessage.setObjectProperty(MsgConvertUtil.MSG_TOPIC, topic); - jmsBaseMessage.setObjectProperty(MsgConvertUtil.MSG_TYPE, messageType); - jmsBaseMessage.setObjectProperty(MessageConst.PROPERTY_TAGS, messageType); - jmsBaseMessage.setObjectProperty(MessageConst.PROPERTY_KEYS, messageType); - - //convert to RMQMessage - MessageExt message = (MessageExt)MessageConverter.convert2RMQMessage(jmsBaseMessage); - - System.out.println(message); - - //then convert back to jmsBaseMessage - JmsBaseMessage jmsBaseMessageBack = MessageConverter.convert2JMSMessage(message); - - JmsTextMessage jmsTextMessage = (JmsTextMessage) jmsBaseMessage; - JmsTextMessage jmsTextMessageBack = (JmsTextMessage) jmsBaseMessageBack; - - Assert.assertEquals(jmsTextMessage.getText(), jmsTextMessageBack.getText()); - Assert.assertEquals(jmsTextMessage.getJMSDestination().toString(), jmsTextMessageBack.getJMSDestination().toString()); - Assert.assertEquals(jmsTextMessage.getJMSMessageID(), jmsTextMessageBack.getJMSMessageID()); - Assert.assertEquals(jmsTextMessage.getJMSRedelivered(), jmsTextMessageBack.getJMSRedelivered()); - Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.JMS_MSGMODEL), jmsTextMessageBack.getHeaders().get(MsgConvertUtil.JMS_MSGMODEL)); - Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.MSG_TOPIC), jmsTextMessageBack.getHeaders().get(MsgConvertUtil.MSG_TOPIC)); - Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.MSG_TYPE), jmsTextMessageBack.getHeaders().get(MsgConvertUtil.MSG_TYPE)); - Assert.assertEquals(jmsTextMessage.getHeaders().get(MessageConst.PROPERTY_TAGS), jmsTextMessageBack.getHeaders().get(MessageConst.PROPERTY_TAGS)); - Assert.assertEquals(jmsTextMessage.getHeaders().get(MessageConst.PROPERTY_KEYS), jmsTextMessageBack.getHeaders().get(MessageConst.PROPERTY_KEYS)); - - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java b/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java deleted file mode 100644 index 6951976..0000000 --- a/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.domain.message; - -import java.io.Serializable; -import javax.jms.JMSException; -import org.junit.Assert; -import org.junit.Test; - -public class JmsObjectMessageTest { - - @Test - public void testGetObject() { - JmsObjectMessage jmsObjectMessage = new JmsObjectMessage(new User("jack", 20)); - try { - Assert.assertEquals(jmsObjectMessage.getObject(), new User("jack", 20)); - } - catch (JMSException e) { - e.printStackTrace(); - } - } - - @Test - public void testGetBody() { - JmsObjectMessage jmsObjectMessage = new JmsObjectMessage(new User("jack", 20)); - - try { - User user = (User)jmsObjectMessage.getBody(Object.class); - System.out.println(user.getName() + ": " + user.getAge()); - Assert.assertEquals(jmsObjectMessage.getBody(Object.class), jmsObjectMessage.getObject()); - } - catch (JMSException e) { - e.printStackTrace(); - } - } - - private class User implements Serializable { - private String name; - private int age; - - private User(String name, int age) { - this.name = name; - this.age = age; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null || getClass() != obj.getClass()) - return false; - - User user = (User)obj; - if (age != user.getAge()) - return false; - if (name != null ? !name.equals(user.getName()) : user.getName() != null) - return false; - return true; - } - - public int getAge() { - return age; - } - - public void setAge(int age) { - this.age = age; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java b/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java deleted file mode 100644 index d3c8287..0000000 --- a/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.domain.message; - -import javax.jms.JMSException; -import org.junit.Assert; -import org.junit.Test; - -public class JmsTextMessageTest { - private String text = "jmsTextMessage test"; - - @Test - public void testGetBody() { - JmsTextMessage jmsTextMessage = new JmsTextMessage(text); - try { - Assert.assertEquals(jmsTextMessage.getBody(String.class), text); - } - catch (JMSException e) { - e.printStackTrace(); - } - } - - @Test - public void testSetGetText() { - JmsTextMessage jmsTextMessage = new JmsTextMessage(); - jmsTextMessage.setText(text); - try { - Assert.assertEquals(jmsTextMessage.getText(), text); - } - catch (JMSException e) { - e.printStackTrace(); - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java b/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java deleted file mode 100644 index 02fe111..0000000 --- a/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration; - -import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.namesrv.NamesrvConfig; -import org.apache.rocketmq.jms.domain.CommonConstant; -import org.apache.rocketmq.namesrv.NamesrvController; -import org.apache.rocketmq.remoting.netty.NettyClientConfig; -import org.apache.rocketmq.remoting.netty.NettyServerConfig; -import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class IntegrationTestBase { - public static Logger logger = LoggerFactory.getLogger(IntegrationTestBase.class); - - protected static Random random = new Random(); - protected static final String SEP = File.separator; - - - protected static String topic = "jms-test"; - protected static String topic2 = "jms-test-2"; - protected static String messageType = "TagA"; - protected static String producerId = "PID-jms-test"; - protected static String consumerId = "CID-jms-test"; - protected static String consumerId2 = "CID-jms-test-2"; - protected static String nameServer; - protected static String text = "English test"; - protected static int consumeThreadNums = 16; - - - - - protected static final String BROKER_NAME_PREFIX = "TestBrokerName_"; - protected static final AtomicInteger BROKER_INDEX = new AtomicInteger(0); - protected static final List<File> TMPE_FILES = new ArrayList<File>(); - protected static final List<BrokerController> BROKER_CONTROLLERS = new ArrayList<BrokerController>(); - protected static final List<NamesrvController> NAMESRV_CONTROLLERS = new ArrayList<NamesrvController>(); - - - private static String createBaseDir() { - String baseDir = System.getProperty("user.home") + SEP + "unitteststore-" + UUID.randomUUID(); - final File file = new File(baseDir); - if (file.exists()) { - System.out.println(String.format("[%s] has already existed, please bake up and remove it for integration tests", baseDir)); - System.exit(1); - } - TMPE_FILES.add(file); - return baseDir; - } - - public static NamesrvController createAndStartNamesrv() { - String baseDir = createBaseDir(); - NamesrvConfig namesrvConfig = new NamesrvConfig(); - NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig(); - namesrvConfig.setKvConfigPath(baseDir + SEP + "namesrv" + SEP + "kvConfig.json"); - - nameServerNettyServerConfig.setListenPort(9000 + random.nextInt(1000)); - NamesrvController namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig); - try { - Assert.assertTrue(namesrvController.initialize()); - logger.info("Name Server Start:{}", nameServerNettyServerConfig.getListenPort()); - namesrvController.start(); - } catch (Exception e) { - System.out.println("Name Server start failed"); - System.exit(1); - } - NAMESRV_CONTROLLERS.add(namesrvController); - return namesrvController; - - } - - - public static BrokerController createAndStartBroker(String nsAddr) { - String baseDir = createBaseDir(); - BrokerConfig brokerConfig = new BrokerConfig(); - NettyServerConfig nettyServerConfig = new NettyServerConfig(); - NettyClientConfig nettyClientConfig = new NettyClientConfig(); - MessageStoreConfig storeConfig = new MessageStoreConfig(); - brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement()); - brokerConfig.setBrokerIP1("127.0.0.1"); - brokerConfig.setNamesrvAddr(nsAddr); - storeConfig.setStorePathRootDir(baseDir); - storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog"); - storeConfig.setHaListenPort(8000 + random.nextInt(1000)); - nettyServerConfig.setListenPort(10000 + random.nextInt(1000)); - BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig); - try { - Assert.assertTrue(brokerController.initialize()); - logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr()); - brokerController.start(); - } catch (Exception e) { - System.out.println("Broker start failed"); - System.exit(1); - } - BROKER_CONTROLLERS.add(brokerController); - return brokerController; - } - - - - protected static DefaultMQAdminExt defaultMQAdminExt; - - static { - //clear the environment - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override public void run() { - if (defaultMQAdminExt != null) { - defaultMQAdminExt.shutdown(); - } - for (NamesrvController namesrvController: NAMESRV_CONTROLLERS) { - if (namesrvController != null) { - namesrvController.shutdown(); - } - } - for (BrokerController brokerController: BROKER_CONTROLLERS) { - if (brokerController != null) { - brokerController.shutdown(); - } - } - for (File file : TMPE_FILES) { - deleteFile(file); - } - } - }); - - - NamesrvController namesrvController = IntegrationTestBase.createAndStartNamesrv(); - nameServer = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort(); - BrokerController brokerController = createAndStartBroker(nameServer); - - defaultMQAdminExt = new DefaultMQAdminExt(); - defaultMQAdminExt.setNamesrvAddr(nameServer); - try { - defaultMQAdminExt.start(); - } catch (MQClientException e) { - System.out.println("DefaultMQAdminExt start failed"); - System.exit(1); - } - - createTopic(topic, brokerController.getBrokerAddr()); - - - } - - public static void deleteFile(File file) { - if (!file.exists()) { - return; - } - if (file.isFile()) { - file.delete(); - } else if (file.isDirectory()) { - File[] files = file.listFiles(); - for (int i = 0;i < files.length;i ++) { - deleteFile(files[i]); - } - file.delete(); - } - } - public static void createTopic(String topic, String addr) { - TopicConfig topicConfig = new TopicConfig(); - topicConfig.setTopicName(topic); - topicConfig.setReadQueueNums(4); - topicConfig.setWriteQueueNums(4); - try { - defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); - } catch (Exception e) { - logger.error("Create topic:{} addr:{} failed", addr, topic); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java b/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java deleted file mode 100644 index 367700a..0000000 --- a/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration; - -import java.net.URI; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.jms.JmsTestListener; -import org.apache.rocketmq.jms.JmsTestUtil; -import org.apache.rocketmq.jms.domain.CommonConstant; -import org.apache.rocketmq.jms.domain.JmsBaseConnectionFactory; -import org.junit.Assert; -import org.junit.Test; - -import static org.apache.rocketmq.jms.JmsTestUtil.getRMQPushConsumerExt; - -public class JmsClientIT extends IntegrationTestBase { - - @Test - public void testConfigInURI() throws Exception { - JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new - URI(String.format("rocketmq://xxx?%s=%s&%s=%s&%s=%s&%s=%s&%s=%s&%s=%s", - CommonConstant.PRODUCERID, producerId, - CommonConstant.CONSUMERID, consumerId, - CommonConstant.NAMESERVER, nameServer, - CommonConstant.CONSUME_THREAD_NUMS, consumeThreadNums, - CommonConstant.SEND_TIMEOUT_MILLIS, 10*1000, - CommonConstant.INSTANCE_NAME, "JMS_TEST"))); - - Connection connection = connectionFactory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - connection.start(); - try { - Destination destination = session.createTopic(topic + ":" + messageType); - session.createConsumer(destination); - session.createProducer(destination); - - DefaultMQPushConsumer rmqPushConsumer = (DefaultMQPushConsumer) getRMQPushConsumerExt(consumerId).getConsumer(); - Assert.assertNotNull(rmqPushConsumer); - Assert.assertEquals(consumerId, rmqPushConsumer.getConsumerGroup()); - Assert.assertEquals("JMS_TEST", rmqPushConsumer.getInstanceName()); - Assert.assertEquals(consumeThreadNums, rmqPushConsumer.getConsumeThreadMax()); - Assert.assertEquals(consumeThreadNums, rmqPushConsumer.getConsumeThreadMin()); - Assert.assertEquals(nameServer, rmqPushConsumer.getNamesrvAddr()); - - DefaultMQProducer mqProducer = (DefaultMQProducer) JmsTestUtil.getMQProducer(producerId); - Assert.assertNotNull(mqProducer); - Assert.assertEquals(producerId, mqProducer.getProducerGroup()); - Assert.assertEquals("JMS_TEST", mqProducer.getInstanceName()); - Assert.assertEquals(10 * 1000, mqProducer.getSendMsgTimeout()); - Assert.assertEquals(nameServer, mqProducer.getNamesrvAddr()); - - Thread.sleep(2000); - } - finally { - connection.close(); - } - - } - - - private Connection createConnection(String producerGroup, String consumerGroup) throws Exception { - JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new - URI(String.format("rocketmq://xxx?%s=%s&%s=%s&%s=%s&%s=%s&%s=%s&%s=%s", - CommonConstant.PRODUCERID, producerGroup, - CommonConstant.CONSUMERID, consumerGroup, - CommonConstant.NAMESERVER, nameServer, - CommonConstant.CONSUME_THREAD_NUMS, consumeThreadNums, - CommonConstant.SEND_TIMEOUT_MILLIS, 10*1000, - CommonConstant.INSTANCE_NAME, "JMS_TEST"))); - return connectionFactory.createConnection(); - } - - @Test - public void testProducerAndConsume_TwoConsumer() throws Exception { - - Connection connection = createConnection(producerId, consumerId); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destinationA = session.createTopic("TopicA"); - Destination destinationB = session.createTopic("TopicB"); - final CountDownLatch countDownLatch = new CountDownLatch(2); - JmsTestListener listenerA = new JmsTestListener(10,countDownLatch); - JmsTestListener listenerB = new JmsTestListener(10, countDownLatch); - - try { - //two consumers - MessageConsumer messageConsumerA = session.createConsumer(destinationA); - messageConsumerA.setMessageListener(listenerA); - MessageConsumer messageConsumerB = session.createConsumer(destinationB); - messageConsumerB.setMessageListener(listenerB); - //producer - MessageProducer messageProducer = session.createProducer(destinationA); - connection.start(); - - for (int i = 0; i < 10; i++) { - TextMessage message = session.createTextMessage(text + i); - Assert.assertNull(message.getJMSMessageID()); - messageProducer.send(message); - Assert.assertNotNull(message.getJMSMessageID()); - } - for (int i = 0; i < 10; i++) { - TextMessage message = session.createTextMessage(text + i); - Assert.assertNull(message.getJMSMessageID()); - messageProducer.send(destinationB, message); - Assert.assertNotNull(message.getJMSMessageID()); - } - - if (countDownLatch.await(30, TimeUnit.SECONDS)) { - Thread.sleep(2000); - } - Assert.assertEquals(10, listenerA.getConsumedNum()); - Assert.assertEquals(10, listenerB.getConsumedNum()); - } - finally { - //Close the connection - connection.close(); - } - - } - - @Test - public void testProducerAndConsume_TagFilter() throws Exception { - Connection connection = createConnection(producerId, consumerId); - Connection anotherConnection = createConnection(producerId, consumerId +"other"); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Session anotherSession = anotherConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Destination destinationA = session.createTopic("topic:tagA"); - Destination destinationB = session.createTopic("topic:tagB"); - final CountDownLatch countDownLatch = new CountDownLatch(2); - JmsTestListener listenerForTagA = new JmsTestListener(10, countDownLatch); - JmsTestListener listenerForAll = new JmsTestListener(40, countDownLatch); - try { - session.createConsumer(destinationA).setMessageListener(listenerForTagA); - anotherSession.createConsumer(session.createTopic("topic")).setMessageListener(listenerForAll); - //producer - MessageProducer messageProducer = session.createProducer(destinationA); - connection.start(); - anotherConnection.start(); - - for (int i = 0; i < 20; i++) { - TextMessage message = session.createTextMessage(text + i); - Assert.assertNull(message.getJMSMessageID()); - messageProducer.send(message); - Assert.assertNotNull(message.getJMSMessageID()); - } - for (int i = 0; i < 20; i++) { - TextMessage message = session.createTextMessage(text + i); - Assert.assertNull(message.getJMSMessageID()); - messageProducer.send(destinationB, message); - Assert.assertNotNull(message.getJMSMessageID()); - } - - if (countDownLatch.await(30, TimeUnit.SECONDS)) { - Thread.sleep(2000); - } - Assert.assertEquals(20, listenerForTagA.getConsumedNum()); - Assert.assertEquals(40, listenerForAll.getConsumedNum()); - } - finally { - //Close the connection - connection.close(); - anotherConnection.close(); - } - - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java b/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java deleted file mode 100644 index 6cbb7b1..0000000 --- a/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration; - -import java.net.URI; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; -import org.apache.rocketmq.jms.domain.JmsBaseConnectionFactory; -import org.apache.rocketmq.jms.domain.JmsBaseMessageConsumer; -import org.apache.rocketmq.jms.domain.RMQPushConsumerExt; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.rocketmq.jms.JmsTestUtil.checkConsumerState; -import static org.apache.rocketmq.jms.JmsTestUtil.getRMQPushConsumerExt; - -public class JmsConsumerIT extends IntegrationTestBase { - - private final Logger logger = LoggerFactory.getLogger(getClass()); - - - private MessageListener listener = new MessageListener() { - @Override - public void onMessage(Message message) { - try { - Assert.assertNotNull(message); - Assert.assertNotNull(message.getJMSMessageID()); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - - - @Test - public void testStartIdempotency() throws Exception { - JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new - URI("rocketmq://xxx?consumerId=" + consumerId + "&nameServer=" + nameServer)); - Connection connection = connectionFactory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - checkConsumerState(consumerId, true, false); - try { - Destination destination = session.createTopic(topic + ":" + messageType); - MessageConsumer consumer = session.createConsumer(destination); - consumer.setMessageListener(listener); - - checkConsumerState(consumerId, false, false); - - ((JmsBaseMessageConsumer) consumer).startConsumer(); - checkConsumerState(consumerId, false, true); - - Destination destination1 = session.createTopic(topic2 + ":" + messageType); - MessageConsumer consumer1 = session.createConsumer(destination1); - consumer1.setMessageListener(listener); - - ((JmsBaseMessageConsumer) consumer1).startConsumer(); - checkConsumerState(consumerId, false, true); - - //the start is idempotent - connection.start(); - connection.start(); - - Thread.sleep(5000); - } - finally { - connection.close(); - } - } - - @Test - public void testReferenceCount() throws Exception { - JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new - URI("rocketmq://xxx?consumerId=" + consumerId + "&nameServer=" + nameServer)); - Connection connection = connectionFactory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - connection.start(); - try { - Destination destination = session.createTopic(topic + ":" + messageType); - MessageConsumer consumer = session.createConsumer(destination); - consumer.setMessageListener(listener); - - RMQPushConsumerExt rmqPushConsumerExt = getRMQPushConsumerExt(consumerId); - Assert.assertNotNull(rmqPushConsumerExt); - Assert.assertEquals(1, rmqPushConsumerExt.getReferenceCount()); - - - MessageConsumer consumer2 = session.createConsumer(destination); - Assert.assertEquals(2, rmqPushConsumerExt.getReferenceCount()); - - MessageConsumer consumer3 = session.createConsumer(session.createTopic(topic + ":" + messageType)); - - Assert.assertEquals(3, rmqPushConsumerExt.getReferenceCount()); - - session.close(); - - Assert.assertEquals(0, rmqPushConsumerExt.getReferenceCount()); - Assert.assertEquals(false, rmqPushConsumerExt.isStarted()); - Assert.assertNull(getRMQPushConsumerExt(consumerId)); - - Thread.sleep(5000); - } - finally { - connection.close(); - } - } - -}
