http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/destination/RocketMQQueue.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/destination/RocketMQQueue.java b/core/src/main/java/org/apache/rocketmq/jms/destination/RocketMQQueue.java deleted file mode 100644 index d7d5b84..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/destination/RocketMQQueue.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.destination; - -import javax.jms.JMSException; -import javax.jms.Queue; - -public class RocketMQQueue implements Queue { - - private String name; - - public RocketMQQueue(String name) { - this.name = name; - } - - @Override - public String getQueueName() throws JMSException { - return this.name; - } - - @Override public String toString() { - return this.name; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/destination/RocketMQTopic.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/destination/RocketMQTopic.java b/core/src/main/java/org/apache/rocketmq/jms/destination/RocketMQTopic.java deleted file mode 100644 index 3214b4c..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/destination/RocketMQTopic.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.destination; - -import javax.jms.JMSException; -import javax.jms.Topic; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RocketMQTopic implements Topic { - private static final Logger log = LoggerFactory.getLogger(RocketMQTopic.class); - - private String name; - - public RocketMQTopic(String name) { - this.name = name; - } - - @Override - public String getTopicName() throws JMSException { - return this.name; - } - - @Override - public String toString() { - return this.name; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java b/core/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java deleted file mode 100644 index 51ce57d..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.exception; - -import javax.jms.JMSException; - -public class DuplicateSubscriptionException extends JMSException { - - public DuplicateSubscriptionException(String reason, String errorCode) { - super(reason, errorCode); - } - - public DuplicateSubscriptionException(String reason) { - super(reason); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java b/core/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java deleted file mode 100644 index 1335eb9..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.exception; - -import javax.jms.JMSException; - -public class JMSClientException extends JMSException { - - public JMSClientException(String reason, String errorCode) { - super(reason, errorCode); - } - - public JMSClientException(String reason) { - super(reason); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/exception/MessageExpiredException.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/exception/MessageExpiredException.java b/core/src/main/java/org/apache/rocketmq/jms/exception/MessageExpiredException.java deleted file mode 100644 index d35b7b1..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/exception/MessageExpiredException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.exception; - -import javax.jms.JMSException; - -public class MessageExpiredException extends JMSException { - - public MessageExpiredException(String reason, String errorCode) { - super(reason, errorCode); - } - - public MessageExpiredException(String reason) { - super(reason); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/exception/UnsupportDeliveryModelException.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/exception/UnsupportDeliveryModelException.java b/core/src/main/java/org/apache/rocketmq/jms/exception/UnsupportDeliveryModelException.java deleted file mode 100644 index 903b75f..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/exception/UnsupportDeliveryModelException.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.exception; - -import javax.jms.JMSRuntimeException; - -public class UnsupportDeliveryModelException extends JMSRuntimeException { - - public UnsupportDeliveryModelException() { - super("Only support PERSISTENT model, and guarantee at-least-once"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/hook/ReceiveMessageHook.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/hook/ReceiveMessageHook.java b/core/src/main/java/org/apache/rocketmq/jms/hook/ReceiveMessageHook.java deleted file mode 100644 index 2bf9bd3..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/hook/ReceiveMessageHook.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.hook; - -import javax.jms.JMSException; -import javax.jms.Message; -import org.apache.rocketmq.jms.exception.MessageExpiredException; -import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum; - -public class ReceiveMessageHook { - - public void before(Message message) throws JMSException { - - validate(message); - - setProviderProperties(message); - } - - private void validate(Message message) throws JMSException { - if (message.getJMSExpiration() != 0 && System.currentTimeMillis() > message.getJMSExpiration()) { - throw new MessageExpiredException(String.format("This message[id=%s] has been expired", message.getJMSMessageID())); - } - } - - public void setProviderProperties(Message message) throws JMSException { - //JMSXRcvTimestamp - message.setLongProperty(JMSPropertiesEnum.JMSXRcvTimestamp.name(), System.currentTimeMillis()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java b/core/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java deleted file mode 100644 index 0429973..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.hook; - -import java.util.UUID; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import org.apache.rocketmq.jms.RocketMQProducer; -import org.apache.rocketmq.jms.exception.UnsupportDeliveryModelException; -import org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum; - -import static org.apache.rocketmq.jms.Constant.MESSAGE_ID_PREFIX; -import static org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum.JMSXUserID; - -/** - * Hook that executes before sending message. - */ -public class SendMessageHook { - - private RocketMQProducer producer; - - public SendMessageHook() { - } - - public SendMessageHook(RocketMQProducer producer) { - this.producer = producer; - } - - public void before(Message message, Destination destination, int deliveryMode, int priority, - long timeToLive) throws JMSException { - - validate(deliveryMode); - - setProviderHeader(message, destination, deliveryMode, priority, timeToLive); - - setProviderProperties(message); - } - - private void setProviderHeader(Message message, Destination destination, int deliveryMode, int priority, - long timeToLive) throws JMSException { - // destination - message.setJMSDestination(destination); - - // delivery mode - message.setJMSDeliveryMode(deliveryMode); - - // expiration - if (timeToLive != 0) { - message.setJMSExpiration(System.currentTimeMillis() + timeToLive); - } - else { - message.setJMSExpiration(0L); - } - - // delivery time - message.setJMSDeliveryTime(message.getJMSTimestamp() + this.producer.getDeliveryDelay()); - - // priority - message.setJMSPriority(priority); - - // messageID is also required in async model, so {@link MessageExt#getMsgId()} can't be used. - if (!this.producer.getDisableMessageID()) { - message.setJMSMessageID(new StringBuffer(MESSAGE_ID_PREFIX).append(UUID.randomUUID().toString()).toString()); - } - - // timestamp - if (!this.producer.getDisableMessageTimestamp()) { - message.setJMSTimestamp(System.currentTimeMillis()); - } - } - - private void validate(int deliveryMode) { - if (deliveryMode != JMSHeaderEnum.JMS_DELIVERY_MODE_DEFAULT_VALUE) { - throw new UnsupportDeliveryModelException(); - } - } - - public void setProviderProperties(Message message) throws JMSException { - // JMSXUserID - if (this.producer.getUserName() != null) { - message.setStringProperty(JMSXUserID.name(), this.producer.getUserName()); - } - } - - public void setProducer(RocketMQProducer producer) { - this.producer = producer; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/msg/AbstractJMSMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/AbstractJMSMessage.java b/core/src/main/java/org/apache/rocketmq/jms/msg/AbstractJMSMessage.java deleted file mode 100644 index fecff14..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/AbstractJMSMessage.java +++ /dev/null @@ -1,400 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg; - -import com.google.common.collect.Maps; -import com.google.common.io.BaseEncoding; -import java.util.Collections; -import java.util.Enumeration; -import java.util.Map; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageNotWriteableException; -import org.apache.commons.lang.builder.ToStringBuilder; -import org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum; - -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSCorrelationID; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSDeliveryMode; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSDeliveryTime; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSDestination; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSExpiration; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSMessageID; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSPriority; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSRedelivered; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSReplyTo; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSTimestamp; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSType; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_DELIVERY_MODE_DEFAULT_VALUE; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_DELIVERY_TIME_DEFAULT_VALUE; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_EXPIRATION_DEFAULT_VALUE; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_PRIORITY_DEFAULT_VALUE; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_REDELIVERED_DEFAULT_VALUE; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_TIMESTAMP_DEFAULT_VALUE; -import static org.apache.rocketmq.jms.support.ObjectTypeCast.cast2Boolean; -import static org.apache.rocketmq.jms.support.ObjectTypeCast.cast2Integer; -import static org.apache.rocketmq.jms.support.ObjectTypeCast.cast2Long; -import static org.apache.rocketmq.jms.support.ObjectTypeCast.cast2Object; -import static org.apache.rocketmq.jms.support.ObjectTypeCast.cast2String; - -public abstract class AbstractJMSMessage implements javax.jms.Message { - - protected Map<JMSHeaderEnum, Object> headers = Maps.newHashMap(); - protected Map<String, Object> properties = Maps.newHashMap(); - - protected boolean writeOnly; - - @Override - public String getJMSMessageID() { - return cast2String(headers.get(JMSMessageID)); - } - - @Override - public void setJMSMessageID(String id) { - setHeader(JMSMessageID, id); - } - - @Override - public long getJMSTimestamp() { - if (headers.containsKey(JMSTimestamp)) { - return cast2Long(headers.get(JMSTimestamp)); - } - return JMS_TIMESTAMP_DEFAULT_VALUE; - } - - @Override - public void setJMSTimestamp(long timestamp) { - setHeader(JMSTimestamp, timestamp); - } - - @Override - public byte[] getJMSCorrelationIDAsBytes() { - String jmsCorrelationID = getJMSCorrelationID(); - if (jmsCorrelationID != null) { - try { - return BaseEncoding.base64().decode(jmsCorrelationID); - } - catch (Exception e) { - return jmsCorrelationID.getBytes(); - } - } - return null; - } - - @Override - public void setJMSCorrelationIDAsBytes(byte[] correlationID) { - String encodedText = BaseEncoding.base64().encode(correlationID); - setJMSCorrelationID(encodedText); - } - - @Override - public String getJMSCorrelationID() { - return cast2String(headers.get(JMSCorrelationID)); - } - - @Override - public void setJMSCorrelationID(String correlationID) { - setHeader(JMSCorrelationID, correlationID); - } - - @Override - public Destination getJMSReplyTo() { - return cast2Object(headers.get(JMSReplyTo), Destination.class); - } - - @Override - public void setJMSReplyTo(Destination replyTo) { - setHeader(JMSReplyTo, replyTo); - } - - @Override - public String toString() { - return ToStringBuilder.reflectionToString(this); - } - - @Override - public Destination getJMSDestination() { - return cast2Object(headers.get(JMSDestination), Destination.class); - } - - @Override - public void setJMSDestination(Destination destination) { - setHeader(JMSDestination, destination); - } - - @SuppressWarnings("unchecked") - public abstract <T> T getBody(Class<T> clazz) throws JMSException; - - public abstract byte[] getBody() throws JMSException; - - @Override - public int getJMSDeliveryMode() { - if (headers.containsKey(JMSDeliveryMode)) { - return cast2Integer(headers.get(JMSDeliveryMode)); - } - return JMS_DELIVERY_MODE_DEFAULT_VALUE; - } - - @Override - public void setJMSDeliveryMode(int deliveryMode) { - setHeader(JMSDeliveryMode, deliveryMode); - } - - @Override - public boolean getJMSRedelivered() { - if (headers.containsKey(JMSRedelivered)) { - return cast2Boolean(headers.get(JMSRedelivered)); - } - return JMS_REDELIVERED_DEFAULT_VALUE; - } - - @Override - public void setJMSRedelivered(boolean redelivered) { - setHeader(JMSRedelivered, redelivered); - } - - @Override - public String getJMSType() { - return cast2String(headers.get(JMSType)); - } - - @Override - public void setJMSType(String type) { - setHeader(JMSType, type); - } - - public Map<JMSHeaderEnum, Object> getHeaders() { - return this.headers; - } - - @Override - public long getJMSExpiration() { - if (headers.containsKey(JMSExpiration)) { - return cast2Long(headers.get(JMSExpiration)); - } - return JMS_EXPIRATION_DEFAULT_VALUE; - } - - @Override - public void setJMSExpiration(long expiration) { - setHeader(JMSExpiration, expiration); - } - - @Override - public int getJMSPriority() { - if (headers.containsKey(JMSPriority)) { - return cast2Integer(headers.get(JMSPriority)); - } - return JMS_PRIORITY_DEFAULT_VALUE; - } - - @Override - public void setJMSPriority(int priority) { - setHeader(JMSPriority, priority); - } - - @Override - public long getJMSDeliveryTime() throws JMSException { - if (headers.containsKey(JMSDeliveryTime)) { - return cast2Long(headers.get(JMSDeliveryTime)); - } - return JMS_DELIVERY_TIME_DEFAULT_VALUE; - } - - @Override - public void setJMSDeliveryTime(long deliveryTime) throws JMSException { - setHeader(JMSDeliveryTime, deliveryTime); - } - - private void setHeader(JMSHeaderEnum name, Object value) { - this.headers.put(name, value); - } - - public Map<String, Object> getProperties() { - return this.properties; - } - - public void setProperties(Map<String, Object> properties) { - this.properties = properties; - } - - @Override - public void acknowledge() throws JMSException { - //todo - throw new UnsupportedOperationException("Unsupported!"); - } - - @Override - public void clearProperties() { - this.properties.clear(); - } - - @Override - public void clearBody() { - this.writeOnly = true; - } - - @Override - public boolean propertyExists(String name) { - return properties.containsKey(name); - } - - @Override - public boolean getBooleanProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return Boolean.valueOf(value.toString()); - } - return false; - } - - @Override - public byte getByteProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return Byte.valueOf(value.toString()); - } - return 0; - } - - @Override - public short getShortProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return Short.valueOf(value.toString()); - } - return 0; - } - - @Override - public int getIntProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return Integer.valueOf(value.toString()); - } - return 0; - } - - @Override - public long getLongProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return Long.valueOf(value.toString()); - } - return 0L; - } - - @Override - public float getFloatProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return Float.valueOf(value.toString()); - } - return 0f; - } - - @Override - public double getDoubleProperty(String name) throws JMSException { - if (propertyExists(name)) { - Object value = getObjectProperty(name); - return Double.valueOf(value.toString()); - } - return 0d; - } - - @Override - public String getStringProperty(String name) throws JMSException { - if (propertyExists(name)) { - return getObjectProperty(name).toString(); - } - return null; - } - - @Override - public Object getObjectProperty(String name) throws JMSException { - return this.properties.get(name); - } - - @Override - public Enumeration<?> getPropertyNames() throws JMSException { - return Collections.enumeration(this.properties.keySet()); - } - - @Override - public void setBooleanProperty(String name, boolean value) { - setObjectProperty(name, value); - } - - @Override - public void setByteProperty(String name, byte value) { - setObjectProperty(name, value); - } - - @Override - public void setShortProperty(String name, short value) { - setObjectProperty(name, value); - } - - @Override - public void setIntProperty(String name, int value) { - setObjectProperty(name, value); - } - - @Override - public void setLongProperty(String name, long value) { - setObjectProperty(name, value); - } - - public void setFloatProperty(String name, float value) { - setObjectProperty(name, value); - } - - @Override - public void setDoubleProperty(String name, double value) { - setObjectProperty(name, value); - } - - @Override - public void setStringProperty(String name, String value) { - setObjectProperty(name, value); - } - - @Override - public abstract boolean isBodyAssignableTo(Class c) throws JMSException; - - @Override - public void setObjectProperty(String name, Object value) { - if (value instanceof Number || value instanceof String || value instanceof Boolean) { - this.properties.put(name, value); - } - else { - throw new IllegalArgumentException( - "Value should be boolean, byte, short, int, long, float, double, and String."); - } - } - - protected boolean isWriteOnly() { - return writeOnly; - } - - protected void checkIsWriteOnly() throws MessageNotWriteableException { - if (!writeOnly) { - throw new MessageNotWriteableException("Not writable"); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java deleted file mode 100644 index a409118..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java +++ /dev/null @@ -1,491 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.util.Arrays; -import javax.jms.IllegalStateRuntimeException; -import javax.jms.JMSException; -import javax.jms.MessageEOFException; -import javax.jms.MessageFormatException; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; -import org.apache.rocketmq.jms.support.PrimitiveTypeCast; - -import static java.lang.String.format; - -/** - * RocketMQ ByteMessage. - */ -public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.BytesMessage { - - private byte[] bytesIn; - private DataInputStream dataAsInput; - - private ByteArrayOutputStream bytesOut; - private DataOutputStream dataAsOutput; - - protected boolean readOnly; - - /** - * Message created for reading - * - * @param data - */ - public JMSBytesMessage(byte[] data) { - this.bytesIn = data; - this.dataAsInput = new DataInputStream(new ByteArrayInputStream(data, 0, data.length)); - this.readOnly = true; - this.writeOnly = false; - } - - /** - * Message created to be sent - */ - public JMSBytesMessage() { - this.bytesOut = new ByteArrayOutputStream(); - this.dataAsOutput = new DataOutputStream(this.bytesOut); - this.readOnly = false; - this.writeOnly = true; - } - - @Override public byte[] getBody(Class clazz) throws JMSException { - byte[] result; - if (isBodyAssignableTo(clazz)) { - if (isWriteOnly()) { - result = bytesOut.toByteArray(); - this.reset(); - return result; - } - else if (isReadOnly()) { - result = Arrays.copyOf(bytesIn, bytesIn.length); - this.reset(); - return result; - } - else { - throw new IllegalStateRuntimeException("Message must be in write only or read only status"); - } - } - - throw new MessageFormatException(format("The type[%s] can't be casted to byte[]", clazz.toString())); - } - - @Override public byte[] getBody() throws JMSException { - return getBody(byte[].class); - } - - @Override public boolean isBodyAssignableTo(Class c) throws JMSException { - return byte[].class.isAssignableFrom(c); - } - - @Override public long getBodyLength() throws JMSException { - if (isWriteOnly()) { - return bytesOut.size(); - } - else if (isReadOnly()) { - return bytesIn.length; - } - else { - throw new IllegalStateRuntimeException("Message must be in write only or read only status"); - } - } - - public boolean readBoolean() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readBoolean(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - private void checkIsReadOnly() throws MessageNotReadableException { - if (!isReadOnly()) { - throw new MessageNotReadableException("Not readable"); - } - if (dataAsInput == null) { - throw new MessageNotReadableException("No data to read"); - } - } - - public byte readByte() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readByte(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public int readUnsignedByte() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readUnsignedByte(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public short readShort() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readShort(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public int readUnsignedShort() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readUnsignedShort(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public char readChar() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readChar(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public int readInt() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readInt(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public long readLong() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readLong(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public float readFloat() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readFloat(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public double readDouble() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readDouble(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public String readUTF() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readUTF(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public int readBytes(byte[] value) throws JMSException { - checkIsReadOnly(); - - return readBytes(value, value.length); - } - - public int readBytes(byte[] value, int length) throws JMSException { - checkIsReadOnly(); - - if (length > value.length) { - throw new IndexOutOfBoundsException("length must be smaller than the length of value"); - } - if (dataAsInput == null) { - throw new MessageNotReadableException("Message is not readable! "); - } - try { - int offset = 0; - while (offset < length) { - int read = dataAsInput.read(value, offset, length - offset); - if (read < 0) { - break; - } - offset += read; - } - - if (offset == 0 && length != 0) { - return -1; - } - else { - return offset; - } - } - catch (IOException e) { - throw handleInputException(e); - } - - } - - public void writeBoolean(boolean value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - try { - dataAsOutput.writeBoolean(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - private void initializeWriteIfNecessary() { - if (bytesOut == null) { - bytesOut = new ByteArrayOutputStream(); - } - if (dataAsOutput == null) { - dataAsOutput = new DataOutputStream(bytesOut); - } - } - - public void writeByte(byte value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - try { - dataAsOutput.writeByte(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeShort(short value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - try { - dataAsOutput.writeShort(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeChar(char value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - try { - dataAsOutput.writeChar(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeInt(int value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - try { - dataAsOutput.writeInt(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeLong(long value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - try { - dataAsOutput.writeLong(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeFloat(float value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - try { - dataAsOutput.writeFloat(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeDouble(double value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - try { - dataAsOutput.writeDouble(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeUTF(String value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - try { - dataAsOutput.writeUTF(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeBytes(byte[] value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - if (dataAsOutput == null) { - throw new MessageNotWriteableException("Message is not writable! "); - } - try { - dataAsOutput.write(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeBytes(byte[] value, int offset, int length) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - if (dataAsOutput == null) { - throw new MessageNotWriteableException("Message is not writable! "); - } - try { - dataAsOutput.write(value, offset, length); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeObject(Object value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - if (!PrimitiveTypeCast.isPrimitiveType(value)) { - throw new JMSException("Object must be primitive type"); - } - - try { - dataAsOutput.writeBytes(String.valueOf(value)); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void reset() throws JMSException { - try { - if (bytesOut != null) { - bytesOut.reset(); - } - if (this.dataAsInput != null) { - this.dataAsInput.reset(); - } - - this.readOnly = true; - } - catch (IOException e) { - throw new JMSException(e.getMessage()); - } - } - - @Override public void clearBody() { - super.clearBody(); - this.bytesOut = null; - this.dataAsOutput = null; - this.dataAsInput = null; - this.bytesIn = null; - } - - private JMSException handleOutputException(final IOException e) { - JMSException ex = new JMSException(e.getMessage()); - ex.initCause(e); - ex.setLinkedException(e); - return ex; - } - - private JMSException handleInputException(final IOException e) { - JMSException ex; - if (e instanceof EOFException) { - ex = new MessageEOFException(e.getMessage()); - } - else { - ex = new MessageFormatException(e.getMessage()); - } - ex.initCause(e); - ex.setLinkedException(e); - return ex; - } - - protected boolean isReadOnly() { - return readOnly; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java deleted file mode 100644 index dddfb58..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg; - -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Map; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.MessageFormatException; -import javax.jms.MessageNotWriteableException; -import org.apache.commons.lang.StringUtils; -import org.apache.rocketmq.jms.msg.serialize.MapSerialize; - -import static java.lang.String.format; -import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Boolean; -import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Byte; -import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2ByteArray; -import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Char; -import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Double; -import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Float; -import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Int; -import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Long; -import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Short; -import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2String; - -/** - * Message can only be accessed by a thread at a time. - */ -public class JMSMapMessage extends AbstractJMSMessage implements MapMessage { - - private Map<String, Object> map; - - protected boolean readOnly; - - public JMSMapMessage(Map<String, Object> map) { - this.map = map; - } - - public JMSMapMessage() { - this.map = new HashMap(); - } - - @Override public Map<String, Object> getBody(Class clazz) throws JMSException { - if (isBodyAssignableTo(clazz)) { - return this.map; - } - - throw new MessageFormatException(format("The type[%s] can't be casted to byte[]", clazz.toString())); - } - - @Override public byte[] getBody() throws JMSException { - return MapSerialize.instance().serialize(this.map); - } - - @Override public boolean isBodyAssignableTo(Class c) throws JMSException { - return Map.class.isAssignableFrom(c); - } - - @Override public boolean getBoolean(String name) throws JMSException { - checkName(name); - - return cast2Boolean(map.get(name)); - } - - private void checkName(String name) throws JMSException { - if (StringUtils.isBlank(name)) { - throw new JMSException("Name is required"); - } - } - - @Override public byte getByte(String name) throws JMSException { - checkName(name); - - return cast2Byte(map.get(name)); - } - - @Override public short getShort(String name) throws JMSException { - checkName(name); - - return cast2Short(map.get(name)); - } - - @Override public char getChar(String name) throws JMSException { - checkName(name); - - return cast2Char(map.get(name)); - } - - @Override public int getInt(String name) throws JMSException { - checkName(name); - - return cast2Int(map.get(name)); - } - - @Override public long getLong(String name) throws JMSException { - checkName(name); - - return cast2Long(map.get(name)); - } - - @Override public float getFloat(String name) throws JMSException { - checkName(name); - - return cast2Float(map.get(name)); - } - - @Override public double getDouble(String name) throws JMSException { - checkName(name); - - return cast2Double(map.get(name)); - } - - @Override public String getString(String name) throws JMSException { - checkName(name); - - return cast2String(map.get(name)); - } - - @Override public byte[] getBytes(String name) throws JMSException { - checkName(name); - - return cast2ByteArray(map.get(name)); - } - - @Override public Object getObject(String name) throws JMSException { - checkName(name); - - return map.get(name); - } - - @Override public Enumeration getMapNames() throws JMSException { - return Collections.enumeration(map.keySet()); - } - - @Override public void setBoolean(String name, boolean value) throws JMSException { - putProperty(name, value); - } - - private void putProperty(String name, Object obj) throws JMSException { - if (isReadOnly()) { - throw new MessageNotWriteableException("Message is not writable"); - } - - checkName(name); - - map.put(name, obj); - } - - @Override public void setByte(String name, byte value) throws JMSException { - putProperty(name, value); - } - - @Override public void setShort(String name, short value) throws JMSException { - putProperty(name, value); - } - - @Override public void setChar(String name, char value) throws JMSException { - putProperty(name, value); - } - - @Override public void setInt(String name, int value) throws JMSException { - putProperty(name, value); - } - - @Override public void setLong(String name, long value) throws JMSException { - putProperty(name, value); - } - - @Override public void setFloat(String name, float value) throws JMSException { - putProperty(name, value); - } - - @Override public void setDouble(String name, double value) throws JMSException { - putProperty(name, value); - } - - @Override public void setString(String name, String value) throws JMSException { - putProperty(name, value); - } - - @Override public void setBytes(String name, byte[] value) throws JMSException { - putProperty(name, value); - } - - @Override public void setBytes(String name, byte[] value, int offset, int length) throws JMSException { - putProperty(name, value); - } - - @Override public void setObject(String name, Object value) throws JMSException { - putProperty(name, value); - } - - @Override public boolean itemExists(String name) throws JMSException { - checkName(name); - - return map.containsKey(name); - } - - @Override public void clearBody() { - super.clearBody(); - this.map.clear(); - this.readOnly = false; - } - - protected boolean isReadOnly() { - return this.readOnly; - } - - public void setReadOnly(boolean readOnly) { - this.readOnly = readOnly; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java deleted file mode 100644 index 4f29d33..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg; - -import java.io.Serializable; -import javax.jms.JMSException; -import org.apache.rocketmq.jms.msg.serialize.ObjectSerialize; - -public class JMSObjectMessage extends AbstractJMSMessage implements javax.jms.ObjectMessage { - - private Serializable body; - - public JMSObjectMessage(Serializable object) { - this.body = object; - } - - public JMSObjectMessage() { - - } - - @Override public Serializable getBody(Class clazz) throws JMSException { - return body; - } - - @Override public byte[] getBody() throws JMSException { - return ObjectSerialize.instance().serialize(body); - } - - @Override public boolean isBodyAssignableTo(Class c) throws JMSException { - return true; - } - - public Serializable getObject() throws JMSException { - return this.body; - } - - public void setObject(Serializable object) throws JMSException { - this.body = object; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java deleted file mode 100644 index 5fd67a3..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg; - -import javax.jms.JMSException; -import javax.jms.MessageFormatException; -import org.apache.rocketmq.jms.msg.serialize.StringSerialize; - -import static java.lang.String.format; - -public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage { - - private String text; - - public JMSTextMessage() { - - } - - public JMSTextMessage(String text) { - setText(text); - } - - @Override public String getBody(Class clazz) throws JMSException { - if (isBodyAssignableTo(clazz)) { - return text; - } - - throw new MessageFormatException(format("The type[%s] can't be casted to byte[]", clazz.toString())); - } - - @Override public byte[] getBody() throws JMSException { - return StringSerialize.instance().serialize(this.text); - } - - @Override public boolean isBodyAssignableTo(Class c) throws JMSException { - return String.class.isAssignableFrom(c); - } - - public void clearBody() { - super.clearBody(); - this.text = null; - } - - public String getText() throws JMSException { - return this.text; - } - - public void setText(String text) { - this.text = text; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java b/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java deleted file mode 100644 index ca2cbed..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg.convert; - -import java.util.Map; -import javax.jms.JMSException; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.jms.msg.AbstractJMSMessage; -import org.apache.rocketmq.jms.support.JMSUtils; - -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSExpiration; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSMessageID; -import static org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum.MSG_MODEL_NAME; -import static org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum.toMsgModelEnum; - -public class JMS2RMQMessageConvert { - - public static final String USER_PROPERTY_PREFIX = "USER:"; - - public static MessageExt convert(AbstractJMSMessage jmsMsg) throws Exception { - MessageExt rmqMsg = new MessageExt(); - - handleHeader(jmsMsg, rmqMsg); - - handleBody(jmsMsg, rmqMsg); - - handleProperties(jmsMsg, rmqMsg); - - return rmqMsg; - } - - private static void handleHeader(AbstractJMSMessage jmsMsg, MessageExt rmqMsg) { - rmqMsg.setTopic(JMSUtils.getDestinationName(jmsMsg.getJMSDestination())); - rmqMsg.putUserProperty(JMSMessageID.name(), jmsMsg.getJMSMessageID()); - rmqMsg.setBornTimestamp(jmsMsg.getJMSTimestamp()); - rmqMsg.putUserProperty(JMSExpiration.name(), String.valueOf(jmsMsg.getJMSExpiration())); - rmqMsg.setKeys(jmsMsg.getJMSMessageID()); - } - - private static void handleProperties(AbstractJMSMessage jmsMsg, MessageExt rmqMsg) { - Map<String, Object> userProps = jmsMsg.getProperties(); - for (Map.Entry<String, Object> entry : userProps.entrySet()) { - rmqMsg.putUserProperty(new StringBuffer(USER_PROPERTY_PREFIX).append(entry.getKey()).toString(), entry.getValue().toString()); - } - } - - private static void handleBody(AbstractJMSMessage jmsMsg, MessageExt rmqMsg) throws JMSException { - rmqMsg.putUserProperty(MSG_MODEL_NAME, toMsgModelEnum(jmsMsg).name()); - rmqMsg.setBody(jmsMsg.getBody()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java b/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java deleted file mode 100644 index 4adb692..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg.convert; - -import java.util.Map; -import javax.jms.JMSException; -import javax.jms.Message; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.jms.destination.RocketMQTopic; -import org.apache.rocketmq.jms.msg.AbstractJMSMessage; -import org.apache.rocketmq.jms.msg.JMSBytesMessage; -import org.apache.rocketmq.jms.msg.JMSMapMessage; -import org.apache.rocketmq.jms.msg.JMSObjectMessage; -import org.apache.rocketmq.jms.msg.JMSTextMessage; -import org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum; -import org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum; -import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum; -import org.apache.rocketmq.jms.msg.serialize.MapSerialize; -import org.apache.rocketmq.jms.msg.serialize.ObjectSerialize; -import org.apache.rocketmq.jms.msg.serialize.StringSerialize; - -import static java.lang.String.format; -import static org.apache.rocketmq.jms.msg.convert.JMS2RMQMessageConvert.USER_PROPERTY_PREFIX; -import static org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum.MSG_MODEL_NAME; - -public class RMQ2JMSMessageConvert { - - public static Message convert(MessageExt rmqMsg) throws JMSException { - if (rmqMsg == null) { - throw new IllegalArgumentException("RocketMQ message could not be null"); - } - if (rmqMsg.getBody() == null) { - throw new IllegalArgumentException("RocketMQ message body could not be null"); - } - - AbstractJMSMessage jmsMsg = newAbstractJMSMessage(rmqMsg.getUserProperty(MSG_MODEL_NAME), rmqMsg.getBody()); - - setHeader(rmqMsg, jmsMsg); - - setProperties(rmqMsg, jmsMsg); - - return jmsMsg; - } - - private static AbstractJMSMessage newAbstractJMSMessage(String msgModel, byte[] body) throws JMSException { - AbstractJMSMessage message; - switch (JMSMessageModelEnum.valueOf(msgModel)) { - case BYTE: - return new JMSBytesMessage(body); - case MAP: - message = new JMSMapMessage(MapSerialize.instance().deserialize(body)); - break; - case OBJECT: - message = new JMSObjectMessage(ObjectSerialize.instance().deserialize(body)); - break; - case STRING: - message = new JMSTextMessage(StringSerialize.instance().deserialize(body)); - break; - default: - throw new JMSException(format("The type[%s] is not supported", msgModel)); - } - - return message; - } - - private static void setHeader(MessageExt rmqMsg, AbstractJMSMessage jmsMsg) { - jmsMsg.setJMSMessageID(rmqMsg.getUserProperty(JMSHeaderEnum.JMSMessageID.name())); - jmsMsg.setJMSTimestamp(rmqMsg.getBornTimestamp()); - jmsMsg.setJMSExpiration(Long.valueOf(rmqMsg.getUserProperty(JMSHeaderEnum.JMSExpiration.name()))); - jmsMsg.setJMSRedelivered(rmqMsg.getReconsumeTimes() > 0 ? true : false); - //todo: what about Queue? - jmsMsg.setJMSDestination(new RocketMQTopic(rmqMsg.getTopic())); - } - - private static void setProperties(MessageExt rmqMsg, AbstractJMSMessage jmsMsg) { - jmsMsg.setIntProperty(JMSPropertiesEnum.JMSXDeliveryCount.name(), rmqMsg.getReconsumeTimes() + 1); - - Map<String, String> propertiesMap = rmqMsg.getProperties(); - if (propertiesMap != null) { - for (String properName : propertiesMap.keySet()) { - if (properName.startsWith(USER_PROPERTY_PREFIX)) { - String properValue = propertiesMap.get(properName); - jmsMsg.setStringProperty(properName.substring(USER_PROPERTY_PREFIX.length()), properValue); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSHeaderEnum.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSHeaderEnum.java b/core/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSHeaderEnum.java deleted file mode 100644 index cb27675..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSHeaderEnum.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg.enums; - -import javax.jms.Message; - -public enum JMSHeaderEnum { - - JMSDestination, - JMSDeliveryMode, - JMSMessageID, - JMSTimestamp, - JMSCorrelationID, - JMSReplyTo, - JMSRedelivered, - JMSType, - JMSExpiration, - JMSPriority, - JMSDeliveryTime; - - public static final int JMS_DELIVERY_MODE_DEFAULT_VALUE = Message.DEFAULT_DELIVERY_MODE; - public static final long JMS_TIME_TO_LIVE_DEFAULT_VALUE = Message.DEFAULT_TIME_TO_LIVE; - public static final int JMS_PRIORITY_DEFAULT_VALUE = Message.DEFAULT_PRIORITY; - public static final long JMS_DELIVERY_TIME_DEFAULT_VALUE = Message.DEFAULT_DELIVERY_DELAY; - public static final boolean JMS_REDELIVERED_DEFAULT_VALUE = false; - public static final int JMS_TIMESTAMP_DEFAULT_VALUE = 0; - public static final int JMS_EXPIRATION_DEFAULT_VALUE = 0; - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java b/core/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java deleted file mode 100644 index f7dc15a..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg.enums; - -import org.apache.rocketmq.jms.msg.AbstractJMSMessage; -import org.apache.rocketmq.jms.msg.JMSBytesMessage; -import org.apache.rocketmq.jms.msg.JMSMapMessage; -import org.apache.rocketmq.jms.msg.JMSObjectMessage; -import org.apache.rocketmq.jms.msg.JMSTextMessage; - -public enum JMSMessageModelEnum { - BYTE(JMSBytesMessage.class), - MAP(JMSMapMessage.class), - OBJECT(JMSObjectMessage.class), - STRING(JMSTextMessage.class); - - public static final String MSG_MODEL_NAME = "MsgModel"; - - private Class jmsClass; - - JMSMessageModelEnum(Class jmsClass) { - this.jmsClass = jmsClass; - } - - public static JMSMessageModelEnum toMsgModelEnum(AbstractJMSMessage jmsMsg) { - for (JMSMessageModelEnum e : values()) { - if (e.getJmsClass().isInstance(jmsMsg)) { - return e; - } - } - - throw new IllegalArgumentException(String.format("Not supported class[%s]", jmsMsg.getClass().getSimpleName())); - } - - public Class getJmsClass() { - return jmsClass; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java b/core/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java deleted file mode 100644 index dd5955b..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg.enums; - -public enum JMSPropertiesEnum { - JMSXUserID, - JMSXDeliveryCount, - JMSXGroupID, - JMSXGroupSeq, - JMSXRcvTimestamp -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java deleted file mode 100644 index 7c7f1ea..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg.serialize; - -import com.alibaba.fastjson.JSON; -import java.util.HashMap; -import java.util.Map; -import javax.jms.JMSException; - -public class MapSerialize implements Serialize<Map> { - - private static MapSerialize ins = new MapSerialize(); - - public static MapSerialize instance() { - return ins; - } - - @Override public byte[] serialize(Map map) throws JMSException { - return JSON.toJSONBytes(map); - } - - private MapSerialize() { - } - - @Override public Map deserialize(byte[] bytes) throws JMSException { - return JSON.parseObject(bytes, HashMap.class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java deleted file mode 100644 index 5e72955..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg.serialize; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import javax.jms.JMSException; -import org.apache.commons.lang.exception.ExceptionUtils; - -public class ObjectSerialize implements Serialize<Object> { - - private static ObjectSerialize ins = new ObjectSerialize(); - - public static ObjectSerialize instance() { - return ins; - } - - private ObjectSerialize() { - } - - public byte[] serialize(Object object) throws JMSException { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(object); - oos.close(); - baos.close(); - return baos.toByteArray(); - } - catch (IOException e) { - throw new JMSException(ExceptionUtils.getStackTrace(e)); - } - } - - public Serializable deserialize(byte[] bytes) throws JMSException { - try { - ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - ObjectInputStream ois = new ObjectInputStream(bais); - ois.close(); - bais.close(); - return (Serializable) ois.readObject(); - } - catch (IOException e) { - throw new JMSException(e.getMessage()); - } - catch (ClassNotFoundException e) { - throw new JMSException(e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java deleted file mode 100644 index 78a499c..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg.serialize; - -import javax.jms.JMSException; - -public interface Serialize<T> { - - byte[] serialize(T t) throws JMSException; - - T deserialize(byte[] bytes) throws JMSException; -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java b/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java deleted file mode 100644 index 9ee0d3b..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg.serialize; - -import com.google.common.base.Charsets; -import java.nio.charset.Charset; -import javax.jms.JMSException; - -public class StringSerialize implements Serialize<String> { - - private static final String EMPTY_STRING = ""; - private static final byte[] EMPTY_BYTES = new byte[0]; - private static final Charset DEFAULT_CHARSET = Charsets.UTF_8; - private static StringSerialize ins = new StringSerialize(); - - public static StringSerialize instance() { - return ins; - } - - private StringSerialize() { - } - - @Override public byte[] serialize(String s) throws JMSException { - if (null == s) { - return EMPTY_BYTES; - } - try { - return s.getBytes(DEFAULT_CHARSET); - } - catch (Exception e) { - throw new JMSException(e.getMessage()); - } - } - - @Override public String deserialize(byte[] bytes) throws JMSException { - if (null == bytes) { - return EMPTY_STRING; - } - try { - return new String(bytes, DEFAULT_CHARSET); - } - catch (Exception e) { - throw new JMSException(e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java b/core/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java deleted file mode 100644 index bed8165..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.support; - -import java.util.UUID; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.JMSRuntimeException; -import javax.jms.Queue; -import javax.jms.Topic; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.exception.ExceptionUtils; -import org.apache.rocketmq.jms.RocketMQConsumer; - -public class JMSUtils { - - public static String getDestinationName(Destination destination) { - try { - String topicName; - if (destination instanceof Topic) { - topicName = ((Topic) destination).getTopicName(); - } - else if (destination instanceof Queue) { - topicName = ((Queue) destination).getQueueName(); - } - else { - throw new JMSException(String.format("Unsupported Destination type:", destination.getClass())); - } - return topicName; - } - catch (JMSException e) { - throw new JMSRuntimeException(e.getMessage()); - } - } - - public static String getConsumerGroup(RocketMQConsumer consumer) { - try { - return getConsumerGroup(consumer.getSubscriptionName(), - consumer.getSession().getConnection().getClientID(), - consumer.isShared() - ); - } - catch (JMSException e) { - throw new JMSRuntimeException(ExceptionUtils.getStackTrace(e)); - } - } - - public static String getConsumerGroup(String subscriptionName, String clientID, boolean shared) { - StringBuffer consumerGroup = new StringBuffer(); - - if (StringUtils.isNotBlank(subscriptionName)) { - consumerGroup.append(subscriptionName); - } - - if (StringUtils.isNotBlank(clientID)) { - if (consumerGroup.length() != 0) { - consumerGroup.append("-"); - } - consumerGroup.append(clientID); - } - - if (shared) { - if (consumerGroup.length() != 0) { - consumerGroup.append("-"); - } - consumerGroup.append(uuid()); - } - - if (consumerGroup.length() == 0) { - consumerGroup.append(uuid()); - } - - return consumerGroup.toString(); - } - - public static String uuid() { - return UUID.randomUUID().toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java b/core/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java deleted file mode 100644 index 3ff1d69..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.support; - -/** - * Converter that convert object directly, which means Integer can only be - * converted to Integer,rather than Integer and Long. - */ -public class ObjectTypeCast { - - public static String cast2String(Object obj) { - if (obj == null) { - return null; - } - if (String.class.isInstance(obj)) { - return (String) obj; - } - throw new ClassCastException("To casted object is " + obj.getClass() + ", not String.class"); - } - - public static Long cast2Long(Object obj) { - if (obj == null) { - return null; - } - if (Long.class.isInstance(obj)) { - return (Long) obj; - } - throw new ClassCastException("To casted object is " + obj.getClass() + ", not Long.class"); - } - - public static Integer cast2Integer(Object obj) { - if (obj == null) { - return null; - } - if (Integer.class.isInstance(obj)) { - return (Integer) obj; - } - throw new ClassCastException("To casted object is " + obj.getClass() + ", not Integer.class"); - } - - public static Boolean cast2Boolean(Object obj) { - if (obj == null) { - return null; - } - if (Boolean.class.isInstance(obj)) { - return (Boolean) obj; - } - throw new ClassCastException("To casted object is " + obj.getClass() + ", not Boolean.class"); - } - - public static <T> T cast2Object(Object obj, Class<T> target) { - if (obj == null) { - return null; - } - if (target.isInstance(obj)) { - return (T) obj; - } - throw new ClassCastException("To casted object is " + obj.getClass() + ", not " + target.getSimpleName() + ".class"); - } -}