http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionForContextImpl.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionForContextImpl.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionForContextImpl.java new file mode 100644 index 0000000..e7f3546 --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionForContextImpl.java @@ -0,0 +1,91 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +/** + * + */ +package org.apache.activemq.jms.client; + +import javax.jms.JMSContext; +import javax.jms.JMSException; +import javax.jms.JMSRuntimeException; +import javax.jms.Session; +import javax.jms.XAJMSContext; + +import org.apache.activemq.api.jms.ActiveMQJMSConstants; +import org.apache.activemq.utils.ReferenceCounter; +import org.apache.activemq.utils.ReferenceCounterUtil; + +public abstract class ActiveMQConnectionForContextImpl implements ActiveMQConnectionForContext +{ + + final Runnable closeRunnable = new Runnable() + { + public void run() + { + try + { + close(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + }; + + final ReferenceCounter refCounter = new ReferenceCounterUtil(closeRunnable); + + protected final ThreadAwareContext threadAwareContext = new ThreadAwareContext(); + + public JMSContext createContext(int sessionMode) + { + switch (sessionMode) + { + case Session.AUTO_ACKNOWLEDGE: + case Session.CLIENT_ACKNOWLEDGE: + case Session.DUPS_OK_ACKNOWLEDGE: + case Session.SESSION_TRANSACTED: + case ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE: + case ActiveMQJMSConstants.PRE_ACKNOWLEDGE: + break; + default: + throw new JMSRuntimeException("Invalid ackmode: " + sessionMode); + } + refCounter.increment(); + + return new ActiveMQJMSContext(this, sessionMode, threadAwareContext); + } + + public XAJMSContext createXAContext() + { + refCounter.increment(); + + return new ActiveMQXAJMSContext(this, threadAwareContext); + } + + @Override + public void closeFromContext() + { + refCounter.decrement(); + } + + protected void incrementRefCounter() + { + refCounter.increment(); + } + + public ThreadAwareContext getThreadAwareContext() + { + return threadAwareContext; + } +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionMetaData.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionMetaData.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionMetaData.java new file mode 100644 index 0000000..1bbcf19 --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionMetaData.java @@ -0,0 +1,97 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import java.util.Enumeration; +import java.util.Vector; + +import javax.jms.ConnectionMetaData; +import javax.jms.JMSException; + +import org.apache.activemq.core.version.Version; + +/** + * ActiveMQ implementation of a JMS ConnectionMetaData. + * + * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> + * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> + * + */ +public class ActiveMQConnectionMetaData implements ConnectionMetaData +{ + // Constants ----------------------------------------------------- + + private static final String ACTIVEMQ = "ActiveMQ"; + + // Static -------------------------------------------------------- + + // Attributes ---------------------------------------------------- + + private final Version serverVersion; + + // Constructors -------------------------------------------------- + + /** + * Create a new ActiveMQConnectionMetaData object. + */ + public ActiveMQConnectionMetaData(final Version serverVersion) + { + this.serverVersion = serverVersion; + } + + // ConnectionMetaData implementation ----------------------------- + + public String getJMSVersion() throws JMSException + { + return "2.0"; + } + + public int getJMSMajorVersion() throws JMSException + { + return 2; + } + + public int getJMSMinorVersion() throws JMSException + { + return 0; + } + + public String getJMSProviderName() throws JMSException + { + return ActiveMQConnectionMetaData.ACTIVEMQ; + } + + public String getProviderVersion() throws JMSException + { + return serverVersion.getFullVersion(); + } + + public int getProviderMajorVersion() throws JMSException + { + return serverVersion.getMajorVersion(); + } + + public int getProviderMinorVersion() throws JMSException + { + return serverVersion.getMinorVersion(); + } + + public Enumeration getJMSXPropertyNames() throws JMSException + { + Vector<Object> v = new Vector<Object>(); + v.add("JMSXGroupID"); + v.add("JMSXGroupSeq"); + v.add("JMSXDeliveryCount"); + return v.elements(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQDestination.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQDestination.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQDestination.java new file mode 100644 index 0000000..49e8ba2 --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQDestination.java @@ -0,0 +1,376 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import java.io.Serializable; +import java.util.UUID; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.JMSRuntimeException; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; + +import org.apache.activemq.api.core.Pair; +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.jms.referenceable.DestinationObjectFactory; +import org.apache.activemq.jms.referenceable.SerializableObjectRefAddr; + +/** + * ActiveMQ implementation of a JMS Destination. + * + * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> + * + */ +public class ActiveMQDestination implements Destination, Serializable, Referenceable +{ + // Constants ----------------------------------------------------- + + // Static -------------------------------------------------------- + + /** + * + */ + private static final long serialVersionUID = 5027962425462382883L; + + public static final String JMS_QUEUE_ADDRESS_PREFIX = "jms.queue."; + + public static final String JMS_TEMP_QUEUE_ADDRESS_PREFIX = "jms.tempqueue."; + + public static final String JMS_TOPIC_ADDRESS_PREFIX = "jms.topic."; + + public static final String JMS_TEMP_TOPIC_ADDRESS_PREFIX = "jms.temptopic."; + + private static final char SEPARATOR = '.'; + + private static String escape(final String input) + { + if (input == null) + { + return ""; + } + return input.replace("\\", "\\\\").replace(".", "\\."); + } + + public static Destination fromAddress(final String address) + { + if (address.startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX)) + { + String name = address.substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()); + + return createQueue(name); + } + else if (address.startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX)) + { + String name = address.substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()); + + return createTopic(name); + } + else if (address.startsWith(ActiveMQDestination.JMS_TEMP_QUEUE_ADDRESS_PREFIX)) + { + String name = address.substring(ActiveMQDestination.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length()); + + return new ActiveMQTemporaryQueue(address, name, null); + } + else if (address.startsWith(ActiveMQDestination.JMS_TEMP_TOPIC_ADDRESS_PREFIX)) + { + String name = address.substring(ActiveMQDestination.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length()); + + return new ActiveMQTemporaryTopic(address, name, null); + } + else + { + throw new JMSRuntimeException("Invalid address " + address); + } + } + + public static String createQueueNameForDurableSubscription(final boolean isDurable, final String clientID, final String subscriptionName) + { + if (clientID != null) + { + if (isDurable) + { + return ActiveMQDestination.escape(clientID) + SEPARATOR + + ActiveMQDestination.escape(subscriptionName); + } + else + { + return "nonDurable" + SEPARATOR + + ActiveMQDestination.escape(clientID) + SEPARATOR + + ActiveMQDestination.escape(subscriptionName); + } + } + else + { + if (isDurable) + { + return ActiveMQDestination.escape(subscriptionName); + } + else + { + return "nonDurable" + SEPARATOR + + ActiveMQDestination.escape(subscriptionName); + } + } + } + + public static String createQueueNameForSharedSubscription(final boolean isDurable, final String clientID, final String subscriptionName) + { + if (clientID != null) + { + return (isDurable ? "Durable" : "nonDurable") + SEPARATOR + + ActiveMQDestination.escape(clientID) + SEPARATOR + + ActiveMQDestination.escape(subscriptionName); + } + else + { + return (isDurable ? "Durable" : "nonDurable") + SEPARATOR + + ActiveMQDestination.escape(subscriptionName); + } + } + + public static Pair<String, String> decomposeQueueNameForDurableSubscription(final String queueName) + { + StringBuffer[] parts = new StringBuffer[2]; + int currentPart = 0; + + parts[0] = new StringBuffer(); + parts[1] = new StringBuffer(); + + int pos = 0; + while (pos < queueName.length()) + { + char ch = queueName.charAt(pos); + pos++; + + if (ch == SEPARATOR) + { + currentPart++; + if (currentPart >= parts.length) + { + throw new JMSRuntimeException("Invalid message queue name: " + queueName); + } + + continue; + } + + if (ch == '\\') + { + if (pos >= queueName.length()) + { + throw new JMSRuntimeException("Invalid message queue name: " + queueName); + } + ch = queueName.charAt(pos); + pos++; + } + + parts[currentPart].append(ch); + } + + if (currentPart != 1) + { + throw new JMSRuntimeException("Invalid message queue name: " + queueName); + } + + Pair<String, String> pair = new Pair<String, String>(parts[0].toString(), parts[1].toString()); + + return pair; + } + + public static SimpleString createQueueAddressFromName(final String name) + { + return new SimpleString(JMS_QUEUE_ADDRESS_PREFIX + name); + } + + public static SimpleString createTopicAddressFromName(final String name) + { + return new SimpleString(JMS_TOPIC_ADDRESS_PREFIX + name); + } + + public static ActiveMQQueue createQueue(final String name) + { + return new ActiveMQQueue(name); + } + + public static ActiveMQTopic createTopic(final String name) + { + return new ActiveMQTopic(name); + } + + public static ActiveMQTemporaryQueue createTemporaryQueue(final String name, final ActiveMQSession session) + { + return new ActiveMQTemporaryQueue(JMS_TEMP_QUEUE_ADDRESS_PREFIX.concat(name), name, session); + } + + public static ActiveMQTemporaryQueue createTemporaryQueue(final String name) + { + return createTemporaryQueue(name, null); + } + + public static ActiveMQTemporaryQueue createTemporaryQueue(final ActiveMQSession session) + { + String name = UUID.randomUUID().toString(); + + return createTemporaryQueue(name, session); + } + + public static ActiveMQTemporaryTopic createTemporaryTopic(final ActiveMQSession session) + { + String name = UUID.randomUUID().toString(); + + return createTemporaryTopic(name, session); + } + + public static ActiveMQTemporaryTopic createTemporaryTopic(String name, final ActiveMQSession session) + { + return new ActiveMQTemporaryTopic(JMS_TEMP_TOPIC_ADDRESS_PREFIX.concat(name), name, session); + } + + public static ActiveMQTemporaryTopic createTemporaryTopic(String name) + { + return createTemporaryTopic(name, null); + } + + // Attributes ---------------------------------------------------- + + /** + * The JMS name + */ + protected final String name; + + /** + * The core address + */ + private final String address; + + /** + * SimpleString version of address + */ + private final SimpleString simpleAddress; + + private final boolean temporary; + + private final boolean queue; + + private final transient ActiveMQSession session; + + // Constructors -------------------------------------------------- + + protected ActiveMQDestination(final String address, final String name, + final boolean temporary, + final boolean queue, + final ActiveMQSession session) + { + this.address = address; + + this.name = name; + + simpleAddress = new SimpleString(address); + + this.temporary = temporary; + + this.queue = queue; + + this.session = session; + } + + // Referenceable implementation --------------------------------------- + + public Reference getReference() throws NamingException + { + return new Reference(this.getClass().getCanonicalName(), + new SerializableObjectRefAddr("ActiveMQ-DEST", this), + DestinationObjectFactory.class.getCanonicalName(), + null); + } + + public void delete() throws JMSException + { + if (session != null) + { + if (session.getCoreSession().isClosed()) + { + // Temporary queues will be deleted when the connection is closed.. nothing to be done then! + return; + } + if (queue) + { + session.deleteTemporaryQueue(this); + } + else + { + session.deleteTemporaryTopic(this); + } + } + } + + public boolean isQueue() + { + return queue; + } + + // Public -------------------------------------------------------- + + public String getAddress() + { + return address; + } + + public SimpleString getSimpleAddress() + { + return simpleAddress; + } + + public String getName() + { + return name; + } + + public boolean isTemporary() + { + return temporary; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + + if (!(o instanceof ActiveMQDestination)) + { + return false; + } + + ActiveMQDestination that = (ActiveMQDestination)o; + + return address.equals(that.address); + } + + @Override + public int hashCode() + { + return address.hashCode(); + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSClientBundle.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSClientBundle.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSClientBundle.java new file mode 100644 index 0000000..2b4a76b --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSClientBundle.java @@ -0,0 +1,97 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + + +import javax.jms.IllegalStateException; +import javax.jms.IllegalStateRuntimeException; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.JMSRuntimeException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; + +import org.apache.activemq.api.core.ActiveMQIllegalStateException; +import org.apache.activemq.api.core.ActiveMQInvalidFilterExpressionException; +import org.apache.activemq.api.core.ActiveMQNonExistentQueueException; +import org.apache.activemq.api.core.SimpleString; +import org.jboss.logging.annotations.Cause; +import org.jboss.logging.annotations.Message; +import org.jboss.logging.annotations.MessageBundle; +import org.jboss.logging.Messages; + +/** + * @author <a href="mailto:andy.tay...@jboss.org">Andy Taylor</a> + * 3/12/12 + * + * Logger Code 12 + * + * each message id must be 6 digits long starting with 10, the 3rd digit should be 9 + * + * so 129000 to 129999 + */ +@MessageBundle(projectCode = "AMQ") +public interface ActiveMQJMSClientBundle +{ + ActiveMQJMSClientBundle BUNDLE = Messages.getBundle(ActiveMQJMSClientBundle.class); + + @Message(id = 129000, value = "Invalid filter: {0}", format = Message.Format.MESSAGE_FORMAT) + ActiveMQInvalidFilterExpressionException invalidFilter(@Cause Throwable e, SimpleString filter); + + @Message(id = 129001, value = "Invalid Subscription Name. It is required to set the subscription name", format = Message.Format.MESSAGE_FORMAT) + ActiveMQIllegalStateException invalidSubscriptionName(); + + @Message(id = 129002, value = "Destination {0} does not exist", format = Message.Format.MESSAGE_FORMAT) + ActiveMQNonExistentQueueException destinationDoesNotExist(SimpleString destination); + + @Message(id = 129003, value = "name cannot be null", format = Message.Format.MESSAGE_FORMAT) + IllegalArgumentException nameCannotBeNull(); + + @Message(id = 129004, value = "name cannot be empty", format = Message.Format.MESSAGE_FORMAT) + IllegalArgumentException nameCannotBeEmpty(); + + @Message(id = 129005, value = "It is illegal to call this method from within a Message Listener", format = Message.Format.MESSAGE_FORMAT) + IllegalStateRuntimeException callingMethodFromListenerRuntime(); + + @Message(id = 129006, value = "It is illegal to call this method from within a Message Listener", format = Message.Format.MESSAGE_FORMAT) + IllegalStateException callingMethodFromListener(); + + @Message(id = 129007, value = "It is illegal to call this method from within a Completion Listener", format = Message.Format.MESSAGE_FORMAT) + IllegalStateRuntimeException callingMethodFromCompletionListenerRuntime(); + + @Message(id = 129008, value = "It is illegal to call this method from within a Completion Listener", format = Message.Format.MESSAGE_FORMAT) + IllegalStateException callingMethodFromCompletionListener(); + + @Message(id = 129009, value = "Null {0} is not allowed", format = Message.Format.MESSAGE_FORMAT) + IllegalArgumentException nullArgumentNotAllowed(String type); + + @Message(id = 129010, value = "Topic (Destination) cannot be null", format = Message.Format.MESSAGE_FORMAT) + InvalidDestinationException nullTopic(); + + @Message(id = 129011, value = "LargeMessage streaming is only possible on ByteMessage or StreamMessage", + format = Message.Format.MESSAGE_FORMAT) + IllegalStateException onlyValidForByteOrStreamMessages(); + + @Message(id = 129012, value = "The property name ''{0}'' is not a valid java identifier.", + format = Message.Format.MESSAGE_FORMAT) + JMSRuntimeException invalidJavaIdentifier(String propertyName); + + @Message(id = 129013, value = "Message is read-only", format = Message.Format.MESSAGE_FORMAT) + MessageNotWriteableException messageNotWritable(); + + @Message(id = 129014, value = "Message is write-only", format = Message.Format.MESSAGE_FORMAT) + MessageNotReadableException messageNotReadable(); + + @Message(id = 129015, value = "Illegal deliveryMode value: {0}", format = Message.Format.MESSAGE_FORMAT) + JMSException illegalDeliveryMode(int deliveryMode); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSClientLogger.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSClientLogger.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSClientLogger.java new file mode 100644 index 0000000..f044e4c --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSClientLogger.java @@ -0,0 +1,79 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import org.jboss.logging.BasicLogger; +import org.jboss.logging.Logger; +import org.jboss.logging.annotations.Cause; +import org.jboss.logging.annotations.LogMessage; +import org.jboss.logging.annotations.Message; +import org.jboss.logging.annotations.MessageLogger; + +/** + * @author <a href="mailto:andy.tay...@jboss.org">Andy Taylor</a> + * 3/15/12 + * + * Logger Code 12 + * + * each message id must be 6 digits long starting with 12, the 3rd digit donates the level so + * + * INF0 1 + * WARN 2 + * DEBUG 3 + * ERROR 4 + * TRACE 5 + * FATAL 6 + * + * so an INFO message would be 121000 to 121999 + */ +@MessageLogger(projectCode = "AMQ") +public interface ActiveMQJMSClientLogger extends BasicLogger +{ + /** + * The default logger. + */ + ActiveMQJMSClientLogger LOGGER = Logger.getMessageLogger(ActiveMQJMSClientLogger.class, ActiveMQJMSClientLogger.class.getPackage().getName()); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 122000, value = "I''m closing a JMS connection you left open. Please make sure you close all JMS connections explicitly before letting them go out of scope! see stacktrace to find out where it was created" , format = Message.Format.MESSAGE_FORMAT) + void connectionLeftOpen(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 122001, value = "Unhandled exception thrown from onMessage" , format = Message.Format.MESSAGE_FORMAT) + void onMessageError(@Cause Exception e); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 124000, value = "Failed to call JMS exception listener" , format = Message.Format.MESSAGE_FORMAT) + void errorCallingExcListener(@Cause Exception e); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 124002, value = "Queue Browser failed to create message" , format = Message.Format.MESSAGE_FORMAT) + void errorCreatingMessage(@Cause Throwable e); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 124003, value = "Message Listener failed to prepare message for receipt" , format = Message.Format.MESSAGE_FORMAT) + void errorPreparingMessageForReceipt(@Cause Throwable e); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 124004, value = "Message Listener failed to process message" , format = Message.Format.MESSAGE_FORMAT) + void errorProcessingMessage(@Cause Throwable e); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 124005, value = "Message Listener failed to recover session" , format = Message.Format.MESSAGE_FORMAT) + void errorRecoveringSession(@Cause Throwable e); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 124006, value = "Failed to call Failover listener" , format = Message.Format.MESSAGE_FORMAT) + void errorCallingFailoverListener(@Cause Exception e); + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSConnectionFactory.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSConnectionFactory.java new file mode 100644 index 0000000..cb7f01a --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSConnectionFactory.java @@ -0,0 +1,67 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.QueueConnectionFactory; +import javax.jms.TopicConnectionFactory; + +import org.apache.activemq.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq.api.core.TransportConfiguration; +import org.apache.activemq.api.core.client.ServerLocator; + + +/** + * A class that represents a ConnectionFactory. + * + * @author <a href="mailto:h...@redhat.com">Howard Gao</a> + */ +public class ActiveMQJMSConnectionFactory extends ActiveMQConnectionFactory implements TopicConnectionFactory, QueueConnectionFactory +{ + + private static final long serialVersionUID = -2810634789345348326L; + + /** + * + */ + public ActiveMQJMSConnectionFactory() + { + super(); + } + + /** + * @param serverLocator + */ + public ActiveMQJMSConnectionFactory(ServerLocator serverLocator) + { + super(serverLocator); + } + + /** + * @param ha + * @param groupConfiguration + */ + public ActiveMQJMSConnectionFactory(boolean ha, final DiscoveryGroupConfiguration groupConfiguration) + { + super(ha, groupConfiguration); + } + + /** + * @param ha + * @param initialConnectors + */ + public ActiveMQJMSConnectionFactory(boolean ha, TransportConfiguration... initialConnectors) + { + super(ha, initialConnectors); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSConsumer.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSConsumer.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSConsumer.java new file mode 100644 index 0000000..1039758 --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSConsumer.java @@ -0,0 +1,198 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.JMSConsumer; +import javax.jms.JMSException; +import javax.jms.JMSRuntimeException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; + +/** + * @author <a href="http://jmesnil.net/">Jeff Mesnil</a> (c) 2013 Red Hat inc. + */ +public class ActiveMQJMSConsumer implements JMSConsumer +{ + + private final ActiveMQJMSContext context; + private final MessageConsumer consumer; + + ActiveMQJMSConsumer(ActiveMQJMSContext context, MessageConsumer consumer) + { + this.context = context; + this.consumer = consumer; + } + + @Override + public String getMessageSelector() + { + try + { + return consumer.getMessageSelector(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public MessageListener getMessageListener() throws JMSRuntimeException + { + try + { + return consumer.getMessageListener(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public void setMessageListener(MessageListener listener) throws JMSRuntimeException + { + try + { + consumer.setMessageListener(new MessageListenerWrapper(listener)); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public Message receive() + { + try + { + return context.setLastMessage(this, consumer.receive()); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public Message receive(long timeout) + { + try + { + return context.setLastMessage(this, consumer.receive(timeout)); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public Message receiveNoWait() + { + try + { + return context.setLastMessage(this, consumer.receiveNoWait()); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public void close() + { + try + { + consumer.close(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public <T> T receiveBody(Class<T> c) + { + try + { + Message message = consumer.receive(); + context.setLastMessage(ActiveMQJMSConsumer.this, message); + return message == null ? null : message.getBody(c); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public <T> T receiveBody(Class<T> c, long timeout) + { + try + { + Message message = consumer.receive(timeout); + context.setLastMessage(ActiveMQJMSConsumer.this, message); + return message == null ? null : message.getBody(c); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public <T> T receiveBodyNoWait(Class<T> c) + { + try + { + Message message = consumer.receiveNoWait(); + context.setLastMessage(ActiveMQJMSConsumer.this, message); + return message == null ? null : message.getBody(c); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + final class MessageListenerWrapper implements MessageListener + { + private final MessageListener wrapped; + + public MessageListenerWrapper(MessageListener wrapped) + { + this.wrapped = wrapped; + } + + @Override + public void onMessage(Message message) + { + context.setLastMessage(ActiveMQJMSConsumer.this, message); + + context.getThreadAwareContext().setCurrentThread(false); + try + { + wrapped.onMessage(message); + } + finally + { + context.getThreadAwareContext().clearCurrentThread(false); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSContext.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSContext.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSContext.java new file mode 100644 index 0000000..e864347 --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSContext.java @@ -0,0 +1,769 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.BytesMessage; +import javax.jms.ConnectionMetaData; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.IllegalStateRuntimeException; +import javax.jms.JMSConsumer; +import javax.jms.JMSContext; +import javax.jms.JMSException; +import javax.jms.JMSProducer; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.XAConnection; +import javax.jms.XASession; +import javax.transaction.xa.XAResource; +import java.io.Serializable; + +/** + * ActiveMQ implementation of a JMSContext. + * + * @author <a href="http://jmesnil.net/">Jeff Mesnil</a> (c) 2013 Red Hat inc + */ +public class ActiveMQJMSContext implements JMSContext +{ + private static final boolean DEFAULT_AUTO_START = true; + private final int sessionMode; + + private final ThreadAwareContext threadAwareContext; + + /** + * Client ACK needs to hold last acked messages, so context.ack calls will be respected. + */ + private volatile Message lastMessagesWaitingAck; + + private final ActiveMQConnectionForContext connection; + private Session session; + private boolean autoStart = ActiveMQJMSContext.DEFAULT_AUTO_START; + private MessageProducer innerProducer; + private boolean xa; + private boolean closed; + + ActiveMQJMSContext(final ActiveMQConnectionForContext connection, final int ackMode, final boolean xa, ThreadAwareContext threadAwareContext) + { + this.connection = connection; + this.sessionMode = ackMode; + this.xa = xa; + this.threadAwareContext = threadAwareContext; + } + + public ActiveMQJMSContext(ActiveMQConnectionForContext connection, int ackMode, ThreadAwareContext threadAwareContext) + { + this(connection, ackMode, false, threadAwareContext); + } + + public ActiveMQJMSContext(ActiveMQConnectionForContext connection, ThreadAwareContext threadAwareContext) + { + this(connection, SESSION_TRANSACTED, true, threadAwareContext); + } + + // XAJMSContext implementation ------------------------------------- + + public JMSContext getContext() + { + return this; + } + + public Session getSession() + { + return session; + } + + public XAResource getXAResource() + { + checkSession(); + return ((XASession) session).getXAResource(); + } + + // JMSContext implementation ------------------------------------- + + @Override + public JMSContext createContext(int sessionMode) + { + return connection.createContext(sessionMode); + } + + @Override + public JMSProducer createProducer() + { + checkSession(); + try + { + return new ActiveMQJMSProducer(this, getInnerProducer()); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + private synchronized MessageProducer getInnerProducer() throws JMSException + { + if (innerProducer == null) + { + innerProducer = session.createProducer(null); + } + + return innerProducer; + } + + /** + * + */ + private void checkSession() + { + if (session == null) + { + synchronized (this) + { + if (closed) + throw new IllegalStateRuntimeException("Context is closed"); + if (session == null) + { + try + { + if (xa) + { + session = ((XAConnection) connection).createXASession(); + } + else + { + session = connection.createSession(sessionMode); + } + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + } + } + } + + @Override + public String getClientID() + { + try + { + return connection.getClientID(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public void setClientID(String clientID) + { + try + { + connection.setClientID(clientID); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public ConnectionMetaData getMetaData() + { + try + { + return connection.getMetaData(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public ExceptionListener getExceptionListener() + { + try + { + return connection.getExceptionListener(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public void setExceptionListener(ExceptionListener listener) + { + try + { + connection.setExceptionListener(listener); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public void start() + { + try + { + connection.start(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public void stop() + { + threadAwareContext.assertNotMessageListenerThreadRuntime(); + try + { + connection.stop(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public void setAutoStart(boolean autoStart) + { + this.autoStart = autoStart; + } + + @Override + public boolean getAutoStart() + { + return autoStart; + } + + @Override + public void close() + { + threadAwareContext.assertNotCompletionListenerThreadRuntime(); + threadAwareContext.assertNotMessageListenerThreadRuntime(); + try + { + synchronized (this) + { + if (session != null) + session.close(); + connection.closeFromContext(); + closed = true; + } + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public BytesMessage createBytesMessage() + { + checkSession(); + try + { + return session.createBytesMessage(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public MapMessage createMapMessage() + { + checkSession(); + try + { + return session.createMapMessage(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public Message createMessage() + { + checkSession(); + try + { + return session.createMessage(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public ObjectMessage createObjectMessage() + { + checkSession(); + try + { + return session.createObjectMessage(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public ObjectMessage createObjectMessage(Serializable object) + { + checkSession(); + try + { + return session.createObjectMessage(object); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public StreamMessage createStreamMessage() + { + checkSession(); + try + { + return session.createStreamMessage(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public TextMessage createTextMessage() + { + checkSession(); + try + { + return session.createTextMessage(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public TextMessage createTextMessage(String text) + { + checkSession(); + try + { + return session.createTextMessage(text); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public boolean getTransacted() + { + checkSession(); + try + { + return session.getTransacted(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public int getSessionMode() + { + return sessionMode; + } + + @Override + public void commit() + { + threadAwareContext.assertNotCompletionListenerThreadRuntime(); + checkSession(); + try + { + session.commit(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public void rollback() + { + threadAwareContext.assertNotCompletionListenerThreadRuntime(); + checkSession(); + try + { + session.rollback(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public void recover() + { + checkSession(); + try + { + session.recover(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public JMSConsumer createConsumer(Destination destination) + { + checkSession(); + try + { + ActiveMQJMSConsumer consumer = new ActiveMQJMSConsumer(this, session.createConsumer(destination)); + checkAutoStart(); + return consumer; + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public JMSConsumer createConsumer(Destination destination, String messageSelector) + { + checkSession(); + try + { + ActiveMQJMSConsumer consumer = new ActiveMQJMSConsumer(this, session.createConsumer(destination, messageSelector)); + checkAutoStart(); + return consumer; + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public JMSConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) + { + checkSession(); + try + { + ActiveMQJMSConsumer consumer = new ActiveMQJMSConsumer(this, session.createConsumer(destination, messageSelector, noLocal)); + checkAutoStart(); + return consumer; + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public Queue createQueue(String queueName) + { + checkSession(); + try + { + return session.createQueue(queueName); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public Topic createTopic(String topicName) + { + checkSession(); + try + { + return session.createTopic(topicName); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public JMSConsumer createDurableConsumer(Topic topic, String name) + { + checkSession(); + try + { + ActiveMQJMSConsumer consumer = new ActiveMQJMSConsumer(this, session.createDurableConsumer(topic, name)); + checkAutoStart(); + return consumer; + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public JMSConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) + { + checkSession(); + try + { + ActiveMQJMSConsumer consumer = new ActiveMQJMSConsumer(this, session.createDurableConsumer(topic, name, messageSelector, noLocal)); + checkAutoStart(); + return consumer; + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public JMSConsumer createSharedDurableConsumer(Topic topic, String name) + { + checkSession(); + try + { + ActiveMQJMSConsumer consumer = new ActiveMQJMSConsumer(this, session.createSharedDurableConsumer(topic, name)); + checkAutoStart(); + return consumer; + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public JMSConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector) + { + checkSession(); + try + { + ActiveMQJMSConsumer consumer = new ActiveMQJMSConsumer(this, session.createSharedDurableConsumer(topic, name, messageSelector)); + checkAutoStart(); + return consumer; + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public JMSConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) + { + checkSession(); + try + { + ActiveMQJMSConsumer consumer = new ActiveMQJMSConsumer(this, session.createSharedConsumer(topic, sharedSubscriptionName)); + checkAutoStart(); + return consumer; + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public JMSConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, String messageSelector) + { + checkSession(); + try + { + ActiveMQJMSConsumer consumer = new ActiveMQJMSConsumer(this, session.createSharedConsumer(topic, sharedSubscriptionName, messageSelector)); + checkAutoStart(); + return consumer; + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public QueueBrowser createBrowser(Queue queue) + { + checkSession(); + try + { + QueueBrowser browser = session.createBrowser(queue); + checkAutoStart(); + return browser; + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public QueueBrowser createBrowser(Queue queue, String messageSelector) + { + checkSession(); + try + { + QueueBrowser browser = session.createBrowser(queue, messageSelector); + checkAutoStart(); + return browser; + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public TemporaryQueue createTemporaryQueue() + { + checkSession(); + try + { + return session.createTemporaryQueue(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public TemporaryTopic createTemporaryTopic() + { + checkSession(); + try + { + return session.createTemporaryTopic(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public void unsubscribe(String name) + { + checkSession(); + try + { + session.unsubscribe(name); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public void acknowledge() + { + checkSession(); + if (closed) + throw new IllegalStateRuntimeException("Context is closed"); + try + { + if (lastMessagesWaitingAck != null) + { + lastMessagesWaitingAck.acknowledge(); + } + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + /** + * This is to be used on tests only. It's not part of the interface and it's not guaranteed to be kept + * on the API contract. + * + * @return + */ + public Session getUsedSession() + { + return this.session; + } + + private synchronized void checkAutoStart() throws JMSException + { + if (closed) + throw new IllegalStateRuntimeException("Context is closed"); + if (autoStart) + { + connection.start(); + } + } + + /** + * this is to ensure Context.acknowledge would work on ClientACK + */ + Message setLastMessage(final JMSConsumer consumer, final Message lastMessageReceived) + { + if (sessionMode == CLIENT_ACKNOWLEDGE) + { + lastMessagesWaitingAck = lastMessageReceived; + } + return lastMessageReceived; + } + + public ThreadAwareContext getThreadAwareContext() + { + return threadAwareContext; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSProducer.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSProducer.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSProducer.java new file mode 100644 index 0000000..44646aa --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSProducer.java @@ -0,0 +1,800 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.BytesMessage; +import javax.jms.CompletionListener; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.JMSProducer; +import javax.jms.JMSRuntimeException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageFormatRuntimeException; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.TextMessage; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.activemq.api.core.ActiveMQPropertyConversionException; +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.utils.TypedProperties; + +/** + * NOTE: this class forwards {@link #setDisableMessageID(boolean)} and + * {@link #setDisableMessageTimestamp(boolean)} calls their equivalent at the + * {@link MessageProducer}. IF the user is using the producer in async mode, this may lead to races. + * We allow/tolerate this because these are just optional optimizations. + * + * @author <a href="http://jmesnil.net/">Jeff Mesnil</a> (c) 2013 Red Hat inc. + */ +public final class ActiveMQJMSProducer implements JMSProducer +{ + private final ActiveMQJMSContext context; + private final MessageProducer producer; + private final TypedProperties properties = new TypedProperties(); + + //we convert Strings to SimpleStrings so if getProperty is called the wrong object is returned, this list let's us return the + //correct type + private final List<SimpleString> stringPropertyNames = new ArrayList<>(); + + private volatile CompletionListener completionListener; + + private Destination jmsHeaderReplyTo; + private String jmsHeaderCorrelationID; + private byte[] jmsHeaderCorrelationIDAsBytes; + private String jmsHeaderType; + + ActiveMQJMSProducer(ActiveMQJMSContext context, MessageProducer producer) + { + this.context = context; + this.producer = producer; + } + + @Override + public JMSProducer send(Destination destination, Message message) + { + if (message == null) + { + throw new MessageFormatRuntimeException("null message"); + } + + try + { + if (jmsHeaderCorrelationID != null) + { + message.setJMSCorrelationID(jmsHeaderCorrelationID); + } + if (jmsHeaderCorrelationIDAsBytes != null && jmsHeaderCorrelationIDAsBytes.length > 0) + { + message.setJMSCorrelationIDAsBytes(jmsHeaderCorrelationIDAsBytes); + } + if (jmsHeaderReplyTo != null) + { + message.setJMSReplyTo(jmsHeaderReplyTo); + } + if (jmsHeaderType != null) + { + message.setJMSType(jmsHeaderType); + } + // XXX HORNETQ-1209 "JMS 2.0" can this be a foreign msg? + // if so, then "SimpleString" properties will trigger an error. + setProperties(message); + if (completionListener != null) + { + CompletionListener wrapped = new CompletionListenerWrapper(completionListener); + producer.send(destination, message, wrapped); + } + else + { + producer.send(destination, message); + } + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + return this; + } + + /** + * Sets all properties we carry onto the message. + * + * @param message + * @throws JMSException + */ + private void setProperties(Message message) throws JMSException + { + for (SimpleString name : properties.getPropertyNames()) + { + message.setObjectProperty(name.toString(), properties.getProperty(name)); + } + } + + @Override + public JMSProducer send(Destination destination, String body) + { + TextMessage message = context.createTextMessage(body); + send(destination, message); + return this; + } + + @Override + public JMSProducer send(Destination destination, Map<String, Object> body) + { + MapMessage message = context.createMapMessage(); + if (body != null) + { + try + { + for (Entry<String, Object> entry : body.entrySet()) + { + final String name = entry.getKey(); + final Object v = entry.getValue(); + if (v instanceof String) + { + message.setString(name, (String) v); + } + else if (v instanceof Long) + { + message.setLong(name, (Long) v); + } + else if (v instanceof Double) + { + message.setDouble(name, (Double) v); + } + else if (v instanceof Integer) + { + message.setInt(name, (Integer) v); + } + else if (v instanceof Character) + { + message.setChar(name, (Character) v); + } + else if (v instanceof Short) + { + message.setShort(name, (Short) v); + } + else if (v instanceof Boolean) + { + message.setBoolean(name, (Boolean) v); + } + else if (v instanceof Float) + { + message.setFloat(name, (Float) v); + } + else if (v instanceof Byte) + { + message.setByte(name, (Byte) v); + } + else if (v instanceof byte[]) + { + byte[] array = (byte[]) v; + message.setBytes(name, array, 0, array.length); + } + else + { + message.setObject(name, v); + } + } + } + catch (JMSException e) + { + throw new MessageFormatRuntimeException(e.getMessage()); + } + } + send(destination, message); + return this; + } + + @Override + public JMSProducer send(Destination destination, byte[] body) + { + BytesMessage message = context.createBytesMessage(); + if (body != null) + { + try + { + message.writeBytes(body); + } + catch (JMSException e) + { + throw new MessageFormatRuntimeException(e.getMessage()); + } + } + send(destination, message); + return this; + } + + @Override + public JMSProducer send(Destination destination, Serializable body) + { + ObjectMessage message = context.createObjectMessage(body); + send(destination, message); + return this; + } + + @Override + public JMSProducer setDisableMessageID(boolean value) + { + try + { + producer.setDisableMessageID(value); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + return this; + } + + @Override + public boolean getDisableMessageID() + { + try + { + return producer.getDisableMessageID(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public JMSProducer setDisableMessageTimestamp(boolean value) + { + try + { + producer.setDisableMessageTimestamp(value); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + return this; + } + + @Override + public boolean getDisableMessageTimestamp() + { + try + { + return producer.getDisableMessageTimestamp(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + @Override + public JMSProducer setDeliveryMode(int deliveryMode) + { + try + { + producer.setDeliveryMode(deliveryMode); + } + catch (JMSException e) + { + JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage()); + e2.initCause(e); + throw e2; + } + return this; + } + + @Override + public int getDeliveryMode() + { + try + { + return producer.getDeliveryMode(); + } + catch (JMSException e) + { + JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage()); + e2.initCause(e); + throw e2; + } + } + + @Override + public JMSProducer setPriority(int priority) + { + try + { + producer.setPriority(priority); + } + catch (JMSException e) + { + JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage()); + e2.initCause(e); + throw e2; + } + return this; + } + + @Override + public int getPriority() + { + try + { + return producer.getPriority(); + } + catch (JMSException e) + { + JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage()); + e2.initCause(e); + throw e2; + } + } + + @Override + public JMSProducer setTimeToLive(long timeToLive) + { + try + { + producer.setTimeToLive(timeToLive); + return this; + } + catch (JMSException e) + { + JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage()); + e2.initCause(e); + throw e2; + } + } + + @Override + public long getTimeToLive() + { + long timeToLive = 0; + try + { + timeToLive = producer.getTimeToLive(); + return timeToLive; + } + catch (JMSException e) + { + JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage()); + e2.initCause(e); + throw e2; + } + } + + @Override + public JMSProducer setDeliveryDelay(long deliveryDelay) + { + try + { + producer.setDeliveryDelay(deliveryDelay); + return this; + } + catch (JMSException e) + { + JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage()); + e2.initCause(e); + throw e2; + } + } + + @Override + public long getDeliveryDelay() + { + long deliveryDelay = 0; + try + { + deliveryDelay = producer.getDeliveryDelay(); + } + catch (Exception ignored) + { + } + return deliveryDelay; + } + + @Override + public JMSProducer setAsync(CompletionListener completionListener) + { + this.completionListener = completionListener; + return this; + } + + @Override + public CompletionListener getAsync() + { + return completionListener; + } + + @Override + public JMSProducer setProperty(String name, boolean value) + { + checkName(name); + properties.putBooleanProperty(new SimpleString(name), value); + return this; + } + + @Override + public JMSProducer setProperty(String name, byte value) + { + checkName(name); + properties.putByteProperty(new SimpleString(name), value); + return this; + } + + @Override + public JMSProducer setProperty(String name, short value) + { + checkName(name); + properties.putShortProperty(new SimpleString(name), value); + return this; + } + + @Override + public JMSProducer setProperty(String name, int value) + { + checkName(name); + properties.putIntProperty(new SimpleString(name), value); + return this; + } + + @Override + public JMSProducer setProperty(String name, long value) + { + checkName(name); + properties.putLongProperty(new SimpleString(name), value); + return this; + } + + @Override + public JMSProducer setProperty(String name, float value) + { + checkName(name); + properties.putFloatProperty(new SimpleString(name), value); + return this; + } + + @Override + public JMSProducer setProperty(String name, double value) + { + checkName(name); + properties.putDoubleProperty(new SimpleString(name), value); + return this; + } + + @Override + public JMSProducer setProperty(String name, String value) + { + checkName(name); + SimpleString key = new SimpleString(name); + properties.putSimpleStringProperty(key, new SimpleString(value)); + stringPropertyNames.add(key); + return this; + } + + @Override + public JMSProducer setProperty(String name, Object value) + { + checkName(name); + try + { + TypedProperties.setObjectProperty(new SimpleString(name), value, properties); + } + catch (ActiveMQPropertyConversionException amqe) + { + throw new MessageFormatRuntimeException(amqe.getMessage()); + } + catch (RuntimeException e) + { + throw new JMSRuntimeException(e.getMessage()); + } + return this; + } + + @Override + public JMSProducer clearProperties() + { + try + { + stringPropertyNames.clear(); + properties.clear(); + } + catch (RuntimeException e) + { + throw new JMSRuntimeException(e.getMessage()); + } + return this; + } + + @Override + public boolean propertyExists(String name) + { + return properties.containsProperty(new SimpleString(name)); + } + + @Override + public boolean getBooleanProperty(String name) + { + try + { + return properties.getBooleanProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException ce) + { + throw new MessageFormatRuntimeException(ce.getMessage()); + } + catch (RuntimeException e) + { + throw new JMSRuntimeException(e.getMessage()); + } + } + + @Override + public byte getByteProperty(String name) + { + try + { + return properties.getByteProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException ce) + { + throw new MessageFormatRuntimeException(ce.getMessage()); + } + } + + @Override + public short getShortProperty(String name) + { + try + { + return properties.getShortProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException ce) + { + throw new MessageFormatRuntimeException(ce.getMessage()); + } + } + + @Override + public int getIntProperty(String name) + { + try + { + return properties.getIntProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException ce) + { + throw new MessageFormatRuntimeException(ce.getMessage()); + } + } + + @Override + public long getLongProperty(String name) + { + try + { + return properties.getLongProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException ce) + { + throw new MessageFormatRuntimeException(ce.getMessage()); + } + } + + @Override + public float getFloatProperty(String name) + { + try + { + return properties.getFloatProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException ce) + { + throw new MessageFormatRuntimeException(ce.getMessage()); + } + } + + @Override + public double getDoubleProperty(String name) + { + try + { + return properties.getDoubleProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException ce) + { + throw new MessageFormatRuntimeException(ce.getMessage()); + } + } + + @Override + public String getStringProperty(String name) + { + try + { + SimpleString prop = properties.getSimpleStringProperty(new SimpleString(name)); + if (prop == null) + return null; + return prop.toString(); + } + catch (ActiveMQPropertyConversionException ce) + { + throw new MessageFormatRuntimeException(ce.getMessage()); + } + catch (RuntimeException e) + { + throw new JMSRuntimeException(e.getMessage()); + } + } + + @Override + public Object getObjectProperty(String name) + { + try + { + SimpleString key = new SimpleString(name); + Object property = properties.getProperty(key); + if (stringPropertyNames.contains(key)) + { + property = property.toString(); + } + return property; + } + catch (ActiveMQPropertyConversionException ce) + { + throw new MessageFormatRuntimeException(ce.getMessage()); + } + catch (RuntimeException e) + { + throw new JMSRuntimeException(e.getMessage()); + } + } + + @Override + public Set<String> getPropertyNames() + { + try + { + Set<SimpleString> simplePropNames = properties.getPropertyNames(); + Set<String> propNames = new HashSet<String>(simplePropNames.size()); + + for (SimpleString str : simplePropNames) + { + propNames.add(str.toString()); + } + return propNames; + } + catch (ActiveMQPropertyConversionException ce) + { + throw new MessageFormatRuntimeException(ce.getMessage()); + } + catch (RuntimeException e) + { + throw new JMSRuntimeException(e.getMessage()); + } + } + + @Override + public JMSProducer setJMSCorrelationIDAsBytes(byte[] correlationID) + { + if (correlationID == null || correlationID.length == 0) + { + throw new JMSRuntimeException("Please specify a non-zero length byte[]"); + } + jmsHeaderCorrelationIDAsBytes = Arrays.copyOf(correlationID, correlationID.length); + return this; + } + + @Override + public byte[] getJMSCorrelationIDAsBytes() + { + return Arrays.copyOf(jmsHeaderCorrelationIDAsBytes, jmsHeaderCorrelationIDAsBytes.length); + } + + @Override + public JMSProducer setJMSCorrelationID(String correlationID) + { + jmsHeaderCorrelationID = correlationID; + return this; + } + + @Override + public String getJMSCorrelationID() + { + return jmsHeaderCorrelationID; + } + + @Override + public JMSProducer setJMSType(String type) + { + jmsHeaderType = type; + return this; + } + + @Override + public String getJMSType() + { + return jmsHeaderType; + } + + @Override + public JMSProducer setJMSReplyTo(Destination replyTo) + { + jmsHeaderReplyTo = replyTo; + return this; + } + + @Override + public Destination getJMSReplyTo() + { + return jmsHeaderReplyTo; + } + + private void checkName(String name) + { + if (name == null) + { + throw ActiveMQJMSClientBundle.BUNDLE.nameCannotBeNull(); + } + if (name.equals("")) + { + throw ActiveMQJMSClientBundle.BUNDLE.nameCannotBeEmpty(); + } + } + + final class CompletionListenerWrapper implements CompletionListener + { + + private final CompletionListener wrapped; + + public CompletionListenerWrapper(CompletionListener wrapped) + { + this.wrapped = wrapped; + } + + @Override + public void onCompletion(Message message) + { + context.getThreadAwareContext().setCurrentThread(true); + try + { + wrapped.onCompletion(message); + } + finally + { + context.getThreadAwareContext().clearCurrentThread(true); + } + } + + @Override + public void onException(Message message, Exception exception) + { + context.getThreadAwareContext().setCurrentThread(true); + try + { + wrapped.onException(message, exception); + } + finally + { + context.getThreadAwareContext().clearCurrentThread(true); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMapMessage.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMapMessage.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMapMessage.java new file mode 100644 index 0000000..58e86ad --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMapMessage.java @@ -0,0 +1,450 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageFormatException; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.Set; + +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.ActiveMQPropertyConversionException; +import org.apache.activemq.api.core.Message; +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.api.core.client.ClientMessage; +import org.apache.activemq.api.core.client.ClientSession; +import org.apache.activemq.utils.TypedProperties; + + +import static org.apache.activemq.reader.MapMessageUtil.writeBodyMap; +import static org.apache.activemq.reader.MapMessageUtil.readBodyMap; + +/** + * ActiveMQ implementation of a JMS MapMessage. + * + * @author Norbert Lataille (norbert.latai...@m4x.org) + * @author <a href="mailto:adr...@jboss.org">Adrian Brock</a> + * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> + * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> + * @author <a href="mailto:atay...@redhat.com">Andy Taylor</a> + * @version $Revision: 3412 $ + */ +public final class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage +{ + // Constants ----------------------------------------------------- + + public static final byte TYPE = Message.MAP_TYPE; + + // Attributes ---------------------------------------------------- + + private final TypedProperties map = new TypedProperties(); + + private boolean invalid; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + /* + * This constructor is used to construct messages prior to sending + */ + protected ActiveMQMapMessage(final ClientSession session) + { + super(ActiveMQMapMessage.TYPE, session); + + invalid = true; + } + + /* + * This constructor is used during reading + */ + protected ActiveMQMapMessage(final ClientMessage message, final ClientSession session) + { + super(message, session); + + invalid = false; + } + + public ActiveMQMapMessage() + { + invalid = false; + } + + /** + * Constructor for a foreign MapMessage + * + * @param foreign + * @throws JMSException + */ + public ActiveMQMapMessage(final MapMessage foreign, final ClientSession session) throws JMSException + { + super(foreign, ActiveMQMapMessage.TYPE, session); + Enumeration<?> names = foreign.getMapNames(); + while (names.hasMoreElements()) + { + String name = (String) names.nextElement(); + Object obj = foreign.getObject(name); + setObject(name, obj); + } + } + + // Public -------------------------------------------------------- + + @Override + public byte getType() + { + return ActiveMQMapMessage.TYPE; + } + + // MapMessage implementation ------------------------------------- + + public void setBoolean(final String name, final boolean value) throws JMSException + { + checkName(name); + map.putBooleanProperty(new SimpleString(name), value); + invalid = true; + } + + public void setByte(final String name, final byte value) throws JMSException + { + checkName(name); + map.putByteProperty(new SimpleString(name), value); + invalid = true; + } + + public void setShort(final String name, final short value) throws JMSException + { + checkName(name); + map.putShortProperty(new SimpleString(name), value); + invalid = true; + } + + public void setChar(final String name, final char value) throws JMSException + { + checkName(name); + map.putCharProperty(new SimpleString(name), value); + invalid = true; + } + + public void setInt(final String name, final int value) throws JMSException + { + checkName(name); + map.putIntProperty(new SimpleString(name), value); + invalid = true; + } + + public void setLong(final String name, final long value) throws JMSException + { + checkName(name); + map.putLongProperty(new SimpleString(name), value); + invalid = true; + } + + public void setFloat(final String name, final float value) throws JMSException + { + checkName(name); + map.putFloatProperty(new SimpleString(name), value); + invalid = true; + } + + public void setDouble(final String name, final double value) throws JMSException + { + checkName(name); + map.putDoubleProperty(new SimpleString(name), value); + invalid = true; + } + + public void setString(final String name, final String value) throws JMSException + { + checkName(name); + map.putSimpleStringProperty(new SimpleString(name), value == null ? null : new SimpleString(value)); + invalid = true; + } + + public void setBytes(final String name, final byte[] value) throws JMSException + { + checkName(name); + map.putBytesProperty(new SimpleString(name), value); + invalid = true; + } + + public void setBytes(final String name, final byte[] value, final int offset, final int length) throws JMSException + { + checkName(name); + if (offset + length > value.length) + { + throw new JMSException("Invalid offset/length"); + } + byte[] newBytes = new byte[length]; + System.arraycopy(value, offset, newBytes, 0, length); + map.putBytesProperty(new SimpleString(name), newBytes); + invalid = true; + } + + public void setObject(final String name, final Object value) throws JMSException + { + checkName(name); + try + { + TypedProperties.setObjectProperty(new SimpleString(name), value, map); + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + invalid = true; + } + + public boolean getBoolean(final String name) throws JMSException + { + try + { + return map.getBooleanProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public byte getByte(final String name) throws JMSException + { + try + { + return map.getByteProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public short getShort(final String name) throws JMSException + { + try + { + return map.getShortProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public char getChar(final String name) throws JMSException + { + try + { + return map.getCharProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public int getInt(final String name) throws JMSException + { + try + { + return map.getIntProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public long getLong(final String name) throws JMSException + { + try + { + return map.getLongProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public float getFloat(final String name) throws JMSException + { + try + { + return map.getFloatProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public double getDouble(final String name) throws JMSException + { + try + { + return map.getDoubleProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public String getString(final String name) throws JMSException + { + try + { + SimpleString str = map.getSimpleStringProperty(new SimpleString(name)); + if (str == null) + { + return null; + } + else + { + return str.toString(); + } + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public byte[] getBytes(final String name) throws JMSException + { + try + { + return map.getBytesProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public Object getObject(final String name) throws JMSException + { + Object val = map.getProperty(new SimpleString(name)); + + if (val instanceof SimpleString) + { + val = ((SimpleString) val).toString(); + } + + return val; + } + + public Enumeration getMapNames() throws JMSException + { + Set<SimpleString> simplePropNames = map.getPropertyNames(); + Set<String> propNames = new HashSet<String>(simplePropNames.size()); + + for (SimpleString str : simplePropNames) + { + propNames.add(str.toString()); + } + + return Collections.enumeration(propNames); + } + + public boolean itemExists(final String name) throws JMSException + { + return map.containsProperty(new SimpleString(name)); + } + + + // ActiveMQRAMessage overrides ---------------------------------------- + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + + map.clear(); + + invalid = true; + } + + @Override + public void doBeforeSend() throws Exception + { + if (invalid) + { + writeBodyMap(message, map); + invalid = false; + } + + super.doBeforeSend(); + } + + @Override + public void doBeforeReceive() throws ActiveMQException + { + super.doBeforeReceive(); + + readBodyMap(message, map); + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + /** + * Check the name + * + * @param name the name + */ + private void checkName(final String name) throws JMSException + { + checkWrite(); + + if (name == null) + { + throw ActiveMQJMSClientBundle.BUNDLE.nameCannotBeNull(); + } + if (name.equals("")) + { + throw ActiveMQJMSClientBundle.BUNDLE.nameCannotBeEmpty(); + } + } + + @Override + protected boolean hasNoBody() + { + return map.isEmpty(); + } + + @Override + public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") + Class c) + { + if (hasNoBody()) + { + return true; + } + return c.isAssignableFrom(java.util.Map.class); + } + + @SuppressWarnings("unchecked") + @Override + protected <T> T getBodyInternal(Class<T> c) + { + return (T) map.getMap(); + } +}