http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java new file mode 100644 index 0000000..b106c61 --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java @@ -0,0 +1,1276 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; +import javax.jms.TransactionInProgressException; +import javax.transaction.xa.XAResource; + +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.selector.filter.FilterException; +import org.apache.activemq.selector.SelectorParser; +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.api.core.client.ClientConsumer; +import org.apache.activemq.api.core.client.ClientProducer; +import org.apache.activemq.api.core.client.ClientSession; +import org.apache.activemq.api.core.client.ClientSession.AddressQuery; +import org.apache.activemq.api.core.client.ClientSession.QueueQuery; + +/** + * ActiveMQ implementation of a JMS Session. + * <br> + * Note that we *do not* support JMS ASF (Application Server Facilities) optional + * constructs such as ConnectionConsumer + * + * @author <a href="mailto:[email protected]">Ovidiu Feodorov</a> + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * + * + */ +public class ActiveMQSession implements QueueSession, TopicSession +{ + public static final int TYPE_GENERIC_SESSION = 0; + + public static final int TYPE_QUEUE_SESSION = 1; + + public static final int TYPE_TOPIC_SESSION = 2; + + private static SimpleString REJECTING_FILTER = new SimpleString("_HQX=-1"); + + private final ActiveMQConnection connection; + + private final ClientSession session; + + private final int sessionType; + + private final int ackMode; + + private final boolean transacted; + + private final boolean xa; + + private boolean recoverCalled; + + private final Set<ActiveMQMessageConsumer> consumers = new HashSet<ActiveMQMessageConsumer>(); + + // Constructors -------------------------------------------------- + + protected ActiveMQSession(final ActiveMQConnection connection, + final boolean transacted, + final boolean xa, + final int ackMode, + final ClientSession session, + final int sessionType) + { + this.connection = connection; + + this.ackMode = ackMode; + + this.session = session; + + this.sessionType = sessionType; + + this.transacted = transacted; + + this.xa = xa; + } + + // Session implementation ---------------------------------------- + + public BytesMessage createBytesMessage() throws JMSException + { + checkClosed(); + + return new ActiveMQBytesMessage(session); + } + + public MapMessage createMapMessage() throws JMSException + { + checkClosed(); + + return new ActiveMQMapMessage(session); + } + + public Message createMessage() throws JMSException + { + checkClosed(); + + return new ActiveMQMessage(session); + } + + public ObjectMessage createObjectMessage() throws JMSException + { + checkClosed(); + + return new ActiveMQObjectMessage(session); + } + + public ObjectMessage createObjectMessage(final Serializable object) throws JMSException + { + checkClosed(); + + ActiveMQObjectMessage msg = new ActiveMQObjectMessage(session); + + msg.setObject(object); + + return msg; + } + + public StreamMessage createStreamMessage() throws JMSException + { + checkClosed(); + + return new ActiveMQStreamMessage(session); + } + + public TextMessage createTextMessage() throws JMSException + { + checkClosed(); + + ActiveMQTextMessage msg = new ActiveMQTextMessage(session); + + msg.setText(null); + + return msg; + } + + public TextMessage createTextMessage(final String text) throws JMSException + { + checkClosed(); + + ActiveMQTextMessage msg = new ActiveMQTextMessage(session); + + msg.setText(text); + + return msg; + } + + public boolean getTransacted() throws JMSException + { + checkClosed(); + + return transacted; + } + + public int getAcknowledgeMode() throws JMSException + { + checkClosed(); + + return ackMode; + } + + public boolean isXA() + { + return xa; + } + + public void commit() throws JMSException + { + if (!transacted) + { + throw new IllegalStateException("Cannot commit a non-transacted session"); + } + if (xa) + { + throw new TransactionInProgressException("Cannot call commit on an XA session"); + } + try + { + session.commit(); + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + public void rollback() throws JMSException + { + if (!transacted) + { + throw new IllegalStateException("Cannot rollback a non-transacted session"); + } + if (xa) + { + throw new TransactionInProgressException("Cannot call rollback on an XA session"); + } + + try + { + session.rollback(); + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + public void close() throws JMSException + { + connection.getThreadAwareContext().assertNotCompletionListenerThread(); + connection.getThreadAwareContext().assertNotMessageListenerThread(); + synchronized (connection) + { + try + { + for (ActiveMQMessageConsumer cons : new HashSet<ActiveMQMessageConsumer>(consumers)) + { + cons.close(); + } + + session.close(); + + connection.removeSession(this); + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + } + + public void recover() throws JMSException + { + if (transacted) + { + throw new IllegalStateException("Cannot recover a transacted session"); + } + + try + { + session.rollback(true); + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + + recoverCalled = true; + } + + public MessageListener getMessageListener() throws JMSException + { + checkClosed(); + + return null; + } + + public void setMessageListener(final MessageListener listener) throws JMSException + { + checkClosed(); + } + + public void run() + { + } + + public MessageProducer createProducer(final Destination destination) throws JMSException + { + if (destination != null && !(destination instanceof ActiveMQDestination)) + { + throw new InvalidDestinationException("Not a ActiveMQ Destination:" + destination); + } + + try + { + ActiveMQDestination jbd = (ActiveMQDestination)destination; + + if (jbd != null) + { + ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress()); + + if (!response.isExists()) + { + throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist"); + } + + connection.addKnownDestination(jbd.getSimpleAddress()); + } + + ClientProducer producer = session.createProducer(jbd == null ? null : jbd.getSimpleAddress()); + + return new ActiveMQMessageProducer(connection, producer, jbd, session); + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + public MessageConsumer createConsumer(final Destination destination) throws JMSException + { + return createConsumer(destination, null, false); + } + + public MessageConsumer createConsumer(final Destination destination, final String messageSelector) throws JMSException + { + return createConsumer(destination, messageSelector, false); + } + + public MessageConsumer createConsumer(final Destination destination, + final String messageSelector, + final boolean noLocal) throws JMSException + { + if (destination == null) + { + throw new InvalidDestinationException("Cannot create a consumer with a null destination"); + } + + if (!(destination instanceof ActiveMQDestination)) + { + throw new InvalidDestinationException("Not a ActiveMQDestination:" + destination); + } + + ActiveMQDestination jbdest = (ActiveMQDestination)destination; + + if (jbdest.isTemporary() && !connection.containsTemporaryQueue(jbdest.getSimpleAddress())) + { + throw new JMSException("Can not create consumer for temporary destination " + destination + + " from another JMS connection"); + } + + return createConsumer(jbdest, null, messageSelector, noLocal, ConsumerDurability.NON_DURABLE); + } + + public Queue createQueue(final String queueName) throws JMSException + { + // As per spec. section 4.11 + if (sessionType == ActiveMQSession.TYPE_TOPIC_SESSION) + { + throw new IllegalStateException("Cannot create a queue using a TopicSession"); + } + + try + { + ActiveMQQueue queue = lookupQueue(queueName, false); + + if (queue == null) + { + queue = lookupQueue(queueName, true); + } + + if (queue == null) + { + throw new JMSException("There is no queue with name " + queueName); + } + else + { + return queue; + } + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + public Topic createTopic(final String topicName) throws JMSException + { + // As per spec. section 4.11 + if (sessionType == ActiveMQSession.TYPE_QUEUE_SESSION) + { + throw new IllegalStateException("Cannot create a topic on a QueueSession"); + } + + try + { + ActiveMQTopic topic = lookupTopic(topicName, false); + + if (topic == null) + { + topic = lookupTopic(topicName, true); + } + + if (topic == null) + { + throw new JMSException("There is no topic with name " + topicName); + } + else + { + return topic; + } + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + public TopicSubscriber createDurableSubscriber(final Topic topic, final String name) throws JMSException + { + return createDurableSubscriber(topic, name, null, false); + } + + public TopicSubscriber createDurableSubscriber(final Topic topic, + final String name, + String messageSelector, + final boolean noLocal) throws JMSException + { + // As per spec. section 4.11 + if (sessionType == ActiveMQSession.TYPE_QUEUE_SESSION) + { + throw new IllegalStateException("Cannot create a durable subscriber on a QueueSession"); + } + checkTopic(topic); + if (!(topic instanceof ActiveMQDestination)) + { + throw new InvalidDestinationException("Not a ActiveMQTopic:" + topic); + } + if ("".equals(messageSelector)) + { + messageSelector = null; + } + + ActiveMQDestination jbdest = (ActiveMQDestination)topic; + + if (jbdest.isQueue()) + { + throw new InvalidDestinationException("Cannot create a subscriber on a queue"); + } + + return createConsumer(jbdest, name, messageSelector, noLocal, ConsumerDurability.DURABLE); + } + + private void checkTopic(Topic topic) throws InvalidDestinationException + { + if (topic == null) + { + throw ActiveMQJMSClientBundle.BUNDLE.nullTopic(); + } + } + + @Override + public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) throws JMSException + { + return createSharedConsumer(topic, sharedSubscriptionName, null); + } + + /** + * Note: Needs to throw an exception if a subscriptionName is already in use by another topic, or if the messageSelector is different + * + * validate multiple subscriptions on the same session. + * validate multiple subscriptions on different sessions + * validate failure in one connection while another connection stills fine. + * Validate different filters in different possible scenarios + * + * @param topic + * @param name + * @param messageSelector + * @return + * @throws JMSException + */ + @Override + public MessageConsumer createSharedConsumer(Topic topic, String name, String messageSelector) throws JMSException + { + if (sessionType == ActiveMQSession.TYPE_QUEUE_SESSION) + { + throw new IllegalStateException("Cannot create a shared consumer on a QueueSession"); + } + checkTopic(topic); + ActiveMQTopic localTopic; + if (topic instanceof ActiveMQTopic) + { + localTopic = (ActiveMQTopic)topic; + } + else + { + localTopic = new ActiveMQTopic(topic.getTopicName()); + } + return internalCreateSharedConsumer(localTopic, name, messageSelector, ConsumerDurability.NON_DURABLE, true); + } + + @Override + public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException + { + return createDurableConsumer(topic, name, null, false); + } + + @Override + public MessageConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException + { + if (sessionType == ActiveMQSession.TYPE_QUEUE_SESSION) + { + throw new IllegalStateException("Cannot create a durable consumer on a QueueSession"); + } + checkTopic(topic); + ActiveMQTopic localTopic; + if (topic instanceof ActiveMQTopic) + { + localTopic = (ActiveMQTopic)topic; + } + else + { + localTopic = new ActiveMQTopic(topic.getTopicName()); + } + return createConsumer(localTopic, name, messageSelector, noLocal, ConsumerDurability.DURABLE); + } + + @Override + public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException + { + return createSharedDurableConsumer(topic, name, null); + } + + @Override + public MessageConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector) throws JMSException + { + if (sessionType == ActiveMQSession.TYPE_QUEUE_SESSION) + { + throw new IllegalStateException("Cannot create a shared durable consumer on a QueueSession"); + } + + checkTopic(topic); + + ActiveMQTopic localTopic; + + if (topic instanceof ActiveMQTopic) + { + localTopic = (ActiveMQTopic)topic; + } + else + { + localTopic = new ActiveMQTopic(topic.getTopicName()); + } + return internalCreateSharedConsumer(localTopic, name, messageSelector, ConsumerDurability.DURABLE, true); + } + + enum ConsumerDurability + { + DURABLE, NON_DURABLE; + } + + + /** + * This is an internal method for shared consumers + */ + private ActiveMQMessageConsumer internalCreateSharedConsumer(final ActiveMQDestination dest, + final String subscriptionName, + String selectorString, + ConsumerDurability durability, + final boolean shared) throws JMSException + { + try + { + + if (dest.isQueue()) + { + // This is not really possible unless someone makes a mistake on code + // createSharedConsumer only accpets Topics by declaration + throw new RuntimeException("Internal error: createSharedConsumer is only meant for Topics"); + } + + if (subscriptionName == null) + { + throw ActiveMQJMSClientBundle.BUNDLE.invalidSubscriptionName(); + } + + selectorString = "".equals(selectorString) ? null : selectorString; + + SimpleString coreFilterString = null; + + if (selectorString != null) + { + coreFilterString = new SimpleString(SelectorTranslator.convertToActiveMQFilterString(selectorString)); + } + + ClientConsumer consumer; + + SimpleString autoDeleteQueueName = null; + + AddressQuery response = session.addressQuery(dest.getSimpleAddress()); + + if (!response.isExists()) + { + throw ActiveMQJMSClientBundle.BUNDLE.destinationDoesNotExist(dest.getSimpleAddress()); + } + + SimpleString queueName; + + if (dest.isTemporary() && durability == ConsumerDurability.DURABLE) + { + throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic"); + } + + queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), + subscriptionName)); + + if (durability == ConsumerDurability.DURABLE) + { + try + { + session.createSharedQueue(dest.getSimpleAddress(), queueName, coreFilterString, true); + } + catch (ActiveMQQueueExistsException ignored) + { + // We ignore this because querying and then creating the queue wouldn't be idempotent + // we could also add a parameter to ignore existence what would require a bigger work around to avoid + // compatibility. + } + } + else + { + session.createSharedQueue(dest.getSimpleAddress(), queueName, coreFilterString, false); + } + + consumer = session.createConsumer(queueName, null, false); + + ActiveMQMessageConsumer jbc = new ActiveMQMessageConsumer(connection, this, + consumer, + false, + dest, + selectorString, + autoDeleteQueueName); + + consumers.add(jbc); + + return jbc; + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + + + private ActiveMQMessageConsumer createConsumer(final ActiveMQDestination dest, + final String subscriptionName, + String selectorString, final boolean noLocal, + ConsumerDurability durability) throws JMSException + { + try + { + selectorString = "".equals(selectorString) ? null : selectorString; + + if (noLocal) + { + connection.setHasNoLocal(); + + String filter; + if (connection.getClientID() != null) + { + filter = + ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + connection.getClientID() + + "'"; + } + else + { + filter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + connection.getUID() + "'"; + } + + if (selectorString != null) + { + selectorString += " AND " + filter; + } + else + { + selectorString = filter; + } + } + + SimpleString coreFilterString = null; + + if (selectorString != null) + { + coreFilterString = new SimpleString(SelectorTranslator.convertToActiveMQFilterString(selectorString)); + } + + ClientConsumer consumer; + + SimpleString autoDeleteQueueName = null; + + if (dest.isQueue()) + { + AddressQuery response = session.addressQuery(dest.getSimpleAddress()); + + if (!response.isExists()) + { + throw new InvalidDestinationException("Queue " + dest.getName() + " does not exist"); + } + + connection.addKnownDestination(dest.getSimpleAddress()); + + consumer = session.createConsumer(dest.getSimpleAddress(), coreFilterString, false); + } + else + { + AddressQuery response = session.addressQuery(dest.getSimpleAddress()); + + if (!response.isExists()) + { + throw new InvalidDestinationException("Topic " + dest.getName() + " does not exist"); + } + + connection.addKnownDestination(dest.getSimpleAddress()); + + SimpleString queueName; + + if (subscriptionName == null) + { + if (durability != ConsumerDurability.NON_DURABLE) + throw new RuntimeException(); + // Non durable sub + + queueName = new SimpleString(UUID.randomUUID().toString()); + + session.createTemporaryQueue(dest.getSimpleAddress(), queueName, coreFilterString); + + consumer = session.createConsumer(queueName, null, false); + + autoDeleteQueueName = queueName; + } + else + { + // Durable sub + if (durability != ConsumerDurability.DURABLE) + throw new RuntimeException(); + if (connection.getClientID() == null) + { + throw new IllegalStateException("Cannot create durable subscription - client ID has not been set"); + } + + if (dest.isTemporary()) + { + throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic"); + } + + queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, connection.getClientID(), + subscriptionName)); + + QueueQuery subResponse = session.queueQuery(queueName); + + if (!subResponse.isExists()) + { + session.createQueue(dest.getSimpleAddress(), queueName, coreFilterString, true); + } + else + { + // Already exists + if (subResponse.getConsumerCount() > 0) + { + throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)"); + } + + // From javax.jms.Session Javadoc (and also JMS 1.1 6.11.1): + // A client can change an existing durable subscription by + // creating a durable + // TopicSubscriber with the same name and a new topic and/or + // message selector. + // Changing a durable subscriber is equivalent to + // unsubscribing (deleting) the old + // one and creating a new one. + + SimpleString oldFilterString = subResponse.getFilterString(); + + boolean selectorChanged = coreFilterString == null && oldFilterString != null || + oldFilterString == null && + coreFilterString != null || + oldFilterString != null && + coreFilterString != null && + !oldFilterString.equals(coreFilterString); + + SimpleString oldTopicName = subResponse.getAddress(); + + boolean topicChanged = !oldTopicName.equals(dest.getSimpleAddress()); + + if (selectorChanged || topicChanged) + { + // Delete the old durable sub + session.deleteQueue(queueName); + + // Create the new one + session.createQueue(dest.getSimpleAddress(), queueName, coreFilterString, true); + } + } + + consumer = session.createConsumer(queueName, null, false); + } + } + + ActiveMQMessageConsumer jbc = new ActiveMQMessageConsumer(connection, + this, + consumer, + noLocal, + dest, + selectorString, + autoDeleteQueueName); + + consumers.add(jbc); + + return jbc; + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + public void ackAllConsumers() throws JMSException + { + checkClosed(); + } + + public QueueBrowser createBrowser(final Queue queue) throws JMSException + { + return createBrowser(queue, null); + } + + public QueueBrowser createBrowser(final Queue queue, String filterString) throws JMSException + { + // As per spec. section 4.11 + if (sessionType == ActiveMQSession.TYPE_TOPIC_SESSION) + { + throw new IllegalStateException("Cannot create a browser on a TopicSession"); + } + if (queue == null) + { + throw new InvalidDestinationException("Cannot create a browser with a null queue"); + } + if (!(queue instanceof ActiveMQDestination)) + { + throw new InvalidDestinationException("Not a ActiveMQQueue:" + queue); + } + if ("".equals(filterString)) + { + filterString = null; + } + + // eager test of the filter syntax as required by JMS spec + try + { + if (filterString != null) + { + SelectorParser.parse(filterString.trim()); + } + } + catch (FilterException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(ActiveMQJMSClientBundle.BUNDLE.invalidFilter(e, new SimpleString(filterString))); + } + + ActiveMQDestination jbq = (ActiveMQDestination)queue; + + if (!jbq.isQueue()) + { + throw new InvalidDestinationException("Cannot create a browser on a topic"); + } + + try + { + AddressQuery message = session.addressQuery(new SimpleString(jbq.getAddress())); + if (!message.isExists()) + { + throw new InvalidDestinationException(jbq.getAddress() + " does not exist"); + } + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + + return new ActiveMQQueueBrowser((ActiveMQQueue)jbq, filterString, session); + + } + + public TemporaryQueue createTemporaryQueue() throws JMSException + { + // As per spec. section 4.11 + if (sessionType == ActiveMQSession.TYPE_TOPIC_SESSION) + { + throw new IllegalStateException("Cannot create a temporary queue using a TopicSession"); + } + + try + { + ActiveMQTemporaryQueue queue = ActiveMQDestination.createTemporaryQueue(this); + + SimpleString simpleAddress = queue.getSimpleAddress(); + + session.createTemporaryQueue(simpleAddress, simpleAddress); + + connection.addTemporaryQueue(simpleAddress); + + return queue; + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + public TemporaryTopic createTemporaryTopic() throws JMSException + { + // As per spec. section 4.11 + if (sessionType == ActiveMQSession.TYPE_QUEUE_SESSION) + { + throw new IllegalStateException("Cannot create a temporary topic on a QueueSession"); + } + + try + { + ActiveMQTemporaryTopic topic = ActiveMQDestination.createTemporaryTopic(this); + + SimpleString simpleAddress = topic.getSimpleAddress(); + + // We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS + // checks when routing messages to a topic that + // does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no + // subscriptions - core has no notion of a topic + + session.createTemporaryQueue(simpleAddress, simpleAddress, ActiveMQSession.REJECTING_FILTER); + + connection.addTemporaryQueue(simpleAddress); + + return topic; + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + public void unsubscribe(final String name) throws JMSException + { + // As per spec. section 4.11 + if (sessionType == ActiveMQSession.TYPE_QUEUE_SESSION) + { + throw new IllegalStateException("Cannot unsubscribe using a QueueSession"); + } + + SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, connection.getClientID(), + name)); + + try + { + QueueQuery response = session.queueQuery(queueName); + + if (!response.isExists()) + { + throw new InvalidDestinationException("Cannot unsubscribe, subscription with name " + name + + " does not exist"); + } + + if (response.getConsumerCount() != 0) + { + throw new IllegalStateException("Cannot unsubscribe durable subscription " + name + + " since it has active subscribers"); + } + + session.deleteQueue(queueName); + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + // XASession implementation + + public Session getSession() throws JMSException + { + if (!xa) + { + throw new IllegalStateException("Isn't an XASession"); + } + + return this; + } + + public XAResource getXAResource() + { + return session.getXAResource(); + } + + // QueueSession implementation + + public QueueReceiver createReceiver(final Queue queue, final String messageSelector) throws JMSException + { + return (QueueReceiver)createConsumer(queue, messageSelector); + } + + public QueueReceiver createReceiver(final Queue queue) throws JMSException + { + return (QueueReceiver)createConsumer(queue); + } + + public QueueSender createSender(final Queue queue) throws JMSException + { + return (QueueSender)createProducer(queue); + } + + // XAQueueSession implementation + + public QueueSession getQueueSession() throws JMSException + { + return (QueueSession)getSession(); + } + + // TopicSession implementation + + public TopicPublisher createPublisher(final Topic topic) throws JMSException + { + return (TopicPublisher)createProducer(topic); + } + + public TopicSubscriber createSubscriber(final Topic topic, final String messageSelector, final boolean noLocal) throws JMSException + { + return (TopicSubscriber)createConsumer(topic, messageSelector, noLocal); + } + + public TopicSubscriber createSubscriber(final Topic topic) throws JMSException + { + return (TopicSubscriber)createConsumer(topic); + } + + // XATopicSession implementation + + public TopicSession getTopicSession() throws JMSException + { + return (TopicSession)getSession(); + } + + // Public -------------------------------------------------------- + + @Override + public String toString() + { + return "ActiveMQSession->" + session; + } + + public ClientSession getCoreSession() + { + return session; + } + + public boolean isRecoverCalled() + { + return recoverCalled; + } + + public void setRecoverCalled(final boolean recoverCalled) + { + this.recoverCalled = recoverCalled; + } + + public void deleteTemporaryTopic(final ActiveMQDestination tempTopic) throws JMSException + { + if (!tempTopic.isTemporary()) + { + throw new InvalidDestinationException("Not a temporary topic " + tempTopic); + } + + try + { + AddressQuery response = session.addressQuery(tempTopic.getSimpleAddress()); + + if (!response.isExists()) + { + throw new InvalidDestinationException("Cannot delete temporary topic " + tempTopic.getName() + + " does not exist"); + } + + if (response.getQueueNames().size() > 1) + { + throw new IllegalStateException("Cannot delete temporary topic " + tempTopic.getName() + + " since it has subscribers"); + } + + SimpleString address = tempTopic.getSimpleAddress(); + + session.deleteQueue(address); + + connection.removeTemporaryQueue(address); + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + public void deleteTemporaryQueue(final ActiveMQDestination tempQueue) throws JMSException + { + if (!tempQueue.isTemporary()) + { + throw new InvalidDestinationException("Not a temporary queue " + tempQueue); + } + try + { + QueueQuery response = session.queueQuery(tempQueue.getSimpleAddress()); + + if (!response.isExists()) + { + throw new InvalidDestinationException("Cannot delete temporary queue " + tempQueue.getName() + + " does not exist"); + } + + if (response.getConsumerCount() > 0) + { + throw new IllegalStateException("Cannot delete temporary queue " + tempQueue.getName() + + " since it has subscribers"); + } + + SimpleString address = tempQueue.getSimpleAddress(); + + session.deleteQueue(address); + + connection.removeTemporaryQueue(address); + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + public void start() throws JMSException + { + try + { + session.start(); + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + public void stop() throws JMSException + { + try + { + session.stop(); + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + public void removeConsumer(final ActiveMQMessageConsumer consumer) + { + consumers.remove(consumer); + } + + // Package protected --------------------------------------------- + + void deleteQueue(final SimpleString queueName) throws JMSException + { + if (!session.isClosed()) + { + try + { + session.deleteQueue(queueName); + } + catch (ActiveMQException ignore) + { + // Exception on deleting queue shouldn't prevent close from completing + } + } + } + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + private void checkClosed() throws JMSException + { + if (session.isClosed()) + { + throw new IllegalStateException("Session is closed"); + } + } + + private ActiveMQQueue lookupQueue(final String queueName, boolean isTemporary) throws ActiveMQException + { + ActiveMQQueue queue; + + if (isTemporary) + { + queue = ActiveMQDestination.createTemporaryQueue(queueName); + } + else + { + queue = ActiveMQDestination.createQueue(queueName); + } + + QueueQuery response = session.queueQuery(queue.getSimpleAddress()); + + if (response.isExists()) + { + return queue; + } + else + { + return null; + } + } + + private ActiveMQTopic lookupTopic(final String topicName, final boolean isTemporary) throws ActiveMQException + { + + ActiveMQTopic topic; + + if (isTemporary) + { + topic = ActiveMQDestination.createTemporaryTopic(topicName); + } + else + { + topic = ActiveMQDestination.createTopic(topicName); + } + + AddressQuery query = session.addressQuery(topic.getSimpleAddress()); + + if (!query.isExists()) + { + return null; + } + else + { + return topic; + } + } + + // Inner classes ------------------------------------------------- + +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQStreamMessage.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQStreamMessage.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQStreamMessage.java new file mode 100644 index 0000000..c59fbc3 --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQStreamMessage.java @@ -0,0 +1,466 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import javax.jms.StreamMessage; + +import org.apache.activemq.api.core.ActiveMQBuffer; +import org.apache.activemq.api.core.Message; +import org.apache.activemq.api.core.Pair; +import org.apache.activemq.api.core.client.ClientMessage; +import org.apache.activemq.api.core.client.ClientSession; +import org.apache.activemq.core.client.impl.ClientMessageImpl; +import org.apache.activemq.utils.DataConstants; + +import static org.apache.activemq.reader.StreamMessageUtil.streamReadBoolean; +import static org.apache.activemq.reader.StreamMessageUtil.streamReadByte; +import static org.apache.activemq.reader.StreamMessageUtil.streamReadBytes; +import static org.apache.activemq.reader.StreamMessageUtil.streamReadChar; +import static org.apache.activemq.reader.StreamMessageUtil.streamReadDouble; +import static org.apache.activemq.reader.StreamMessageUtil.streamReadFloat; +import static org.apache.activemq.reader.StreamMessageUtil.streamReadInteger; +import static org.apache.activemq.reader.StreamMessageUtil.streamReadLong; +import static org.apache.activemq.reader.StreamMessageUtil.streamReadObject; +import static org.apache.activemq.reader.StreamMessageUtil.streamReadShort; +import static org.apache.activemq.reader.StreamMessageUtil.streamReadString; + +/** + * ActiveMQ implementation of a JMS StreamMessage. + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + * Some parts based on JBM 1.x class by: + * + * @author Norbert Lataille ([email protected]) + * @author <a href="mailto:[email protected]">Adrian Brock</a> + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Ovidiu Feodorov</a> + * @author <a href="mailto:[email protected]">Andy Taylor</a> + */ +public final class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMessage +{ + public static final byte TYPE = Message.STREAM_TYPE; + + protected ActiveMQStreamMessage(final ClientSession session) + { + super(ActiveMQStreamMessage.TYPE, session); + } + + protected ActiveMQStreamMessage(final ClientMessage message, final ClientSession session) + { + super(message, session); + } + + public ActiveMQStreamMessage(final StreamMessage foreign, final ClientSession session) throws JMSException + { + super(foreign, ActiveMQStreamMessage.TYPE, session); + + foreign.reset(); + + try + { + while (true) + { + Object obj = foreign.readObject(); + writeObject(obj); + } + } + catch (MessageEOFException e) + { + // Ignore + } + } + + // For testing only + public ActiveMQStreamMessage() + { + message = new ClientMessageImpl((byte)0, false, 0, 0, (byte)4, 1500); + } + + // Public -------------------------------------------------------- + + @Override + public byte getType() + { + return ActiveMQStreamMessage.TYPE; + } + + // StreamMessage implementation ---------------------------------- + + public boolean readBoolean() throws JMSException + { + checkRead(); + try + { + return streamReadBoolean(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public byte readByte() throws JMSException + { + checkRead(); + + try + { + return streamReadByte(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public short readShort() throws JMSException + { + checkRead(); + try + { + return streamReadShort(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public char readChar() throws JMSException + { + checkRead(); + try + { + return streamReadChar(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public int readInt() throws JMSException + { + checkRead(); + try + { + return streamReadInteger(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public long readLong() throws JMSException + { + checkRead(); + try + { + return streamReadLong(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public float readFloat() throws JMSException + { + checkRead(); + try + { + return streamReadFloat(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public double readDouble() throws JMSException + { + checkRead(); + try + { + return streamReadDouble(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public String readString() throws JMSException + { + checkRead(); + try + { + return streamReadString(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + /** + * len here is used to control how many more bytes to read + */ + private int len = 0; + + public int readBytes(final byte[] value) throws JMSException + { + checkRead(); + try + { + Pair<Integer, Integer> pairRead = streamReadBytes(message, len, value); + + len = pairRead.getA(); + return pairRead.getB(); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public Object readObject() throws JMSException + { + checkRead(); + try + { + return streamReadObject(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public void writeBoolean(final boolean value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.BOOLEAN); + getBuffer().writeBoolean(value); + } + + public void writeByte(final byte value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.BYTE); + getBuffer().writeByte(value); + } + + public void writeShort(final short value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.SHORT); + getBuffer().writeShort(value); + } + + public void writeChar(final char value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.CHAR); + getBuffer().writeShort((short)value); + } + + public void writeInt(final int value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.INT); + getBuffer().writeInt(value); + } + + public void writeLong(final long value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.LONG); + getBuffer().writeLong(value); + } + + public void writeFloat(final float value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.FLOAT); + getBuffer().writeInt(Float.floatToIntBits(value)); + } + + public void writeDouble(final double value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.DOUBLE); + getBuffer().writeLong(Double.doubleToLongBits(value)); + } + + public void writeString(final String value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.STRING); + getBuffer().writeNullableString(value); + } + + public void writeBytes(final byte[] value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.BYTES); + getBuffer().writeInt(value.length); + getBuffer().writeBytes(value); + } + + public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.BYTES); + getBuffer().writeInt(length); + getBuffer().writeBytes(value, offset, length); + } + + public void writeObject(final Object value) throws JMSException + { + if (value instanceof String) + { + writeString((String)value); + } + else if (value instanceof Boolean) + { + writeBoolean((Boolean)value); + } + else if (value instanceof Byte) + { + writeByte((Byte)value); + } + else if (value instanceof Short) + { + writeShort((Short)value); + } + else if (value instanceof Integer) + { + writeInt((Integer)value); + } + else if (value instanceof Long) + { + writeLong((Long)value); + } + else if (value instanceof Float) + { + writeFloat((Float)value); + } + else if (value instanceof Double) + { + writeDouble((Double)value); + } + else if (value instanceof byte[]) + { + writeBytes((byte[])value); + } + else if (value instanceof Character) + { + writeChar((Character)value); + } + else if (value == null) + { + writeString(null); + } + else + { + throw new MessageFormatException("Invalid object type: " + value.getClass()); + } + } + + public void reset() throws JMSException + { + if (!readOnly) + { + readOnly = true; + } + getBuffer().resetReaderIndex(); + } + + // ActiveMQRAMessage overrides ---------------------------------------- + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + + getBuffer().clear(); + } + + @Override + public void doBeforeSend() throws Exception + { + reset(); + } + + private ActiveMQBuffer getBuffer() + { + return message.getBodyBuffer(); + } + + @SuppressWarnings("rawtypes") + @Override + public boolean isBodyAssignableTo(Class c) + { + return false; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTemporaryQueue.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTemporaryQueue.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTemporaryQueue.java new file mode 100644 index 0000000..97f7870 --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTemporaryQueue.java @@ -0,0 +1,67 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.TemporaryQueue; + + +/** + * ActiveMQ implementation of a JMS TemporaryQueue. + * <br> + * This class can be instantiated directly. + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @version <tt>$Revision: 3569 $</tt> + * + */ +public class ActiveMQTemporaryQueue extends ActiveMQQueue implements TemporaryQueue +{ + // Constants ----------------------------------------------------- + + private static final long serialVersionUID = -4624930377557954624L; + + // Static -------------------------------------------------------- + + // Attributes ---------------------------------------------------- + + // Constructors -------------------------------------------------- + + + // TemporaryQueue implementation ------------------------------------------ + + // Public -------------------------------------------------------- + + /** + * @param address + * @param name + * @param session + */ + public ActiveMQTemporaryQueue(String address, String name, ActiveMQSession session) + { + super(address, name, true, session); + } + + @Override + public String toString() + { + return "ActiveMQTemporaryQueue[" + name + "]"; + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTemporaryTopic.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTemporaryTopic.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTemporaryTopic.java new file mode 100644 index 0000000..8a85376 --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTemporaryTopic.java @@ -0,0 +1,53 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.TemporaryTopic; + +/** + * A ActiveMQTemporaryTopic + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public class ActiveMQTemporaryTopic extends ActiveMQTopic implements TemporaryTopic +{ + + // Constants ----------------------------------------------------- + + private static final long serialVersionUID = 845450764835635266L; + + // Attributes ---------------------------------------------------- + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + protected ActiveMQTemporaryTopic(final String address, final String name, + final ActiveMQSession session) + { + super(address, name, true, session); + } + + // Public -------------------------------------------------------- + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTextMessage.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTextMessage.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTextMessage.java new file mode 100644 index 0000000..75944b9 --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTextMessage.java @@ -0,0 +1,146 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.JMSException; +import javax.jms.TextMessage; + +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.Message; +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.api.core.client.ClientMessage; +import org.apache.activemq.api.core.client.ClientSession; + +import static org.apache.activemq.reader.TextMessageUtil.readBodyText; +import static org.apache.activemq.reader.TextMessageUtil.writeBodyText; + + +/** + * ActiveMQ implementation of a JMS TextMessage. + * <br> + * This class was ported from SpyTextMessage in JBossMQ. + * + * @author Norbert Lataille ([email protected]) + * @author <a href="mailto:[email protected]">Jason Dillon</a> + * @author <a href="mailto:[email protected]">Adrian Brock</a> + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Ovidiu Feodorov</a> + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * @version $Revision: 3412 $ + */ +public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage +{ + // Constants ----------------------------------------------------- + + public static final byte TYPE = Message.TEXT_TYPE; + + // Attributes ---------------------------------------------------- + + // We cache it locally - it's more performant to cache as a SimpleString, the AbstractChannelBuffer write + // methods are more efficient for a SimpleString + private SimpleString text; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + public ActiveMQTextMessage(final ClientSession session) + { + super(ActiveMQTextMessage.TYPE, session); + } + + public ActiveMQTextMessage(final ClientMessage message, final ClientSession session) + { + super(message, session); + } + + /** + * A copy constructor for non-ActiveMQ JMS TextMessages. + */ + public ActiveMQTextMessage(final TextMessage foreign, final ClientSession session) throws JMSException + { + super(foreign, ActiveMQTextMessage.TYPE, session); + + setText(foreign.getText()); + } + + // Public -------------------------------------------------------- + + @Override + public byte getType() + { + return ActiveMQTextMessage.TYPE; + } + + // TextMessage implementation ------------------------------------ + + public void setText(final String text) throws JMSException + { + checkWrite(); + + if (text != null) + { + this.text = new SimpleString(text); + } + else + { + this.text = null; + } + + writeBodyText(message, this.text); + } + + public String getText() + { + if (text != null) + { + return text.toString(); + } + else + { + return null; + } + } + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + + text = null; + } + + // ActiveMQRAMessage override ----------------------------------------- + + @Override + public void doBeforeReceive() throws ActiveMQException + { + super.doBeforeReceive(); + + text = readBodyText(message); + } + + @Override + protected <T> T getBodyInternal(Class<T> c) + { + return (T) getText(); + } + + @Override + public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") Class c) + { + if (text == null) + return true; + return c.isAssignableFrom(java.lang.String.class); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTopic.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTopic.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTopic.java new file mode 100644 index 0000000..1c04629 --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTopic.java @@ -0,0 +1,85 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.Topic; + +import org.apache.activemq.api.core.SimpleString; + +/** + * ActiveMQ implementation of a JMS Topic. + * <br> + * This class can be instantiated directly. + * + * @author <a href="mailto:[email protected]">Ovidiu Feodorov</a> + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @version <tt>$Revision: 8737 $</tt> + * + */ +public class ActiveMQTopic extends ActiveMQDestination implements Topic +{ + // Constants ----------------------------------------------------- + + private static final long serialVersionUID = 7873614001276404156L; + // Static -------------------------------------------------------- + + public static SimpleString createAddressFromName(final String name) + { + return new SimpleString(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX + name); + } + + // Attributes ---------------------------------------------------- + + // Constructors -------------------------------------------------- + + public ActiveMQTopic(final String name) + { + super(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX + name, name, false, false, null); + } + + + /** + * @param address + * @param name + * @param temporary + * @param session + */ + protected ActiveMQTopic(String address, String name, boolean temporary, ActiveMQSession session) + { + super(address, name, temporary, false, session); + } + + + // Topic implementation ------------------------------------------ + + public String getTopicName() + { + return name; + } + + // Public -------------------------------------------------------- + + @Override + public String toString() + { + return "ActiveMQTopic[" + name + "]"; + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTopicConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTopicConnectionFactory.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTopicConnectionFactory.java new file mode 100644 index 0000000..56d88d7 --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQTopicConnectionFactory.java @@ -0,0 +1,70 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.TopicConnectionFactory; + +import org.apache.activemq.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq.api.core.TransportConfiguration; +import org.apache.activemq.api.core.client.ServerLocator; +import org.apache.activemq.api.jms.JMSFactoryType; + +/** + * A class that represents a TopicConnectionFactory. + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + */ +public class ActiveMQTopicConnectionFactory extends ActiveMQConnectionFactory implements TopicConnectionFactory +{ + private static final long serialVersionUID = 7317051989866548455L; + + /** + * + */ + public ActiveMQTopicConnectionFactory() + { + super(); + } + + /** + * @param serverLocator + */ + public ActiveMQTopicConnectionFactory(ServerLocator serverLocator) + { + super(serverLocator); + } + + + /** + * @param ha + * @param groupConfiguration + */ + public ActiveMQTopicConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration) + { + super(ha, groupConfiguration); + } + + /** + * @param ha + * @param initialConnectors + */ + public ActiveMQTopicConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors) + { + super(ha, initialConnectors); + } + + public int getFactoryType() + { + return JMSFactoryType.TOPIC_CF.intValue(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXAConnection.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXAConnection.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXAConnection.java new file mode 100644 index 0000000..9333f6d --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXAConnection.java @@ -0,0 +1,70 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.JMSException; +import javax.jms.Session; +import javax.jms.XAQueueConnection; +import javax.jms.XAQueueSession; +import javax.jms.XASession; +import javax.jms.XATopicConnection; +import javax.jms.XATopicSession; + +import org.apache.activemq.api.core.client.ClientSessionFactory; + +/** + * ActiveMQ implementation of a JMS XAConnection. + * <p> + * The flat implementation of {@link XATopicConnection} and {@link XAQueueConnection} is per design, + * following common practices of JMS 1.1. + * @author <a href="mailto:[email protected]">Howard Gao</a> + */ +public final class ActiveMQXAConnection extends ActiveMQConnection implements XATopicConnection, XAQueueConnection +{ + + public ActiveMQXAConnection(final String username, final String password, final int connectionType, + final String clientID, final int dupsOKBatchSize, final int transactionBatchSize, + final ClientSessionFactory sessionFactory) + { + super(username, password, connectionType, clientID, dupsOKBatchSize, transactionBatchSize, sessionFactory); + } + + @Override + public XASession createXASession() throws JMSException + { + checkClosed(); + return (XASession)createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_GENERIC_SESSION); + } + + @Override + public XAQueueSession createXAQueueSession() throws JMSException + { + checkClosed(); + return (XAQueueSession)createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_QUEUE_SESSION); + + } + + @Override + public XATopicSession createXATopicSession() throws JMSException + { + checkClosed(); + return (XATopicSession)createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_TOPIC_SESSION); + } + + @Override + protected boolean isXA() + { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXAConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXAConnectionFactory.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXAConnectionFactory.java new file mode 100644 index 0000000..61a131d --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXAConnectionFactory.java @@ -0,0 +1,76 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.XAQueueConnectionFactory; +import javax.jms.XATopicConnectionFactory; + +import org.apache.activemq.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq.api.core.TransportConfiguration; +import org.apache.activemq.api.core.client.ServerLocator; +import org.apache.activemq.api.jms.JMSFactoryType; + +/** + * A class that represents a XAConnectionFactory. + * <p> + * We consider the XAConnectionFactory to be the most complete possible option. It can be casted to any other connection factory since it is fully functional + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + */ +public class ActiveMQXAConnectionFactory extends ActiveMQConnectionFactory implements XATopicConnectionFactory, + XAQueueConnectionFactory +{ + private static final long serialVersionUID = 743611571839154115L; + + /** + * + */ + public ActiveMQXAConnectionFactory() + { + super(); + } + + /** + * @param serverLocator + */ + public ActiveMQXAConnectionFactory(ServerLocator serverLocator) + { + super(serverLocator); + } + + /** + * @param ha + * @param groupConfiguration + */ + public ActiveMQXAConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration) + { + super(ha, groupConfiguration); + } + + /** + * @param ha + * @param initialConnectors + */ + public ActiveMQXAConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors) + { + super(ha, initialConnectors); + } + + @Override + public int getFactoryType() + { + return JMSFactoryType.XA_CF.intValue(); + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXAJMSContext.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXAJMSContext.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXAJMSContext.java new file mode 100644 index 0000000..7bc8421 --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXAJMSContext.java @@ -0,0 +1,23 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.XAJMSContext; + +public class ActiveMQXAJMSContext extends ActiveMQJMSContext implements XAJMSContext +{ + public ActiveMQXAJMSContext(ActiveMQConnectionForContext connection, ThreadAwareContext threadAwareContext) + { + super(connection, threadAwareContext); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXAQueueConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXAQueueConnectionFactory.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXAQueueConnectionFactory.java new file mode 100644 index 0000000..a0d91a5 --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXAQueueConnectionFactory.java @@ -0,0 +1,71 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.XAQueueConnectionFactory; + +import org.apache.activemq.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq.api.core.TransportConfiguration; +import org.apache.activemq.api.core.client.ServerLocator; +import org.apache.activemq.api.jms.JMSFactoryType; + +/** + * A class that represents a XAQueueConnectionFactory. + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + * + */ +public class ActiveMQXAQueueConnectionFactory extends ActiveMQConnectionFactory implements XAQueueConnectionFactory +{ + private static final long serialVersionUID = 8612457847251087454L; + + /** + * + */ + public ActiveMQXAQueueConnectionFactory() + { + super(); + } + + /** + * @param serverLocator + */ + public ActiveMQXAQueueConnectionFactory(ServerLocator serverLocator) + { + super(serverLocator); + } + + /** + * @param ha + * @param groupConfiguration + */ + public ActiveMQXAQueueConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration) + { + super(ha, groupConfiguration); + } + + /** + * @param ha + * @param initialConnectors + */ + public ActiveMQXAQueueConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors) + { + super(ha, initialConnectors); + } + + public int getFactoryType() + { + return JMSFactoryType.QUEUE_XA_CF.intValue(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXASession.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXASession.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXASession.java new file mode 100644 index 0000000..6ce34b6 --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXASession.java @@ -0,0 +1,47 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.XAQueueSession; +import javax.jms.XATopicSession; + +import org.apache.activemq.api.core.client.ClientSession; + +/** + * A ActiveMQXASession + * + * @author clebertsuconic + * + * + */ +public class ActiveMQXASession extends ActiveMQSession implements XAQueueSession, XATopicSession +{ + + /** + * @param connection + * @param transacted + * @param xa + * @param ackMode + * @param session + * @param sessionType + */ + protected ActiveMQXASession(ActiveMQConnection connection, + boolean transacted, + boolean xa, + int ackMode, + ClientSession session, + int sessionType) + { + super(connection, transacted, xa, ackMode, session, sessionType); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXATopicConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXATopicConnectionFactory.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXATopicConnectionFactory.java new file mode 100644 index 0000000..a9c327c --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQXATopicConnectionFactory.java @@ -0,0 +1,69 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.XATopicConnectionFactory; + +import org.apache.activemq.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq.api.core.TransportConfiguration; +import org.apache.activemq.api.core.client.ServerLocator; +import org.apache.activemq.api.jms.JMSFactoryType; + +/** + * A class that represents a XATopicConnectionFactory. + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + */ +public class ActiveMQXATopicConnectionFactory extends ActiveMQConnectionFactory implements XATopicConnectionFactory +{ + private static final long serialVersionUID = -7018290426884419693L; + + /** + * + */ + public ActiveMQXATopicConnectionFactory() + { + super(); + } + + /** + * @param serverLocator + */ + public ActiveMQXATopicConnectionFactory(final ServerLocator serverLocator) + { + super(serverLocator); + } + + /** + * @param ha + * @param groupConfiguration + */ + public ActiveMQXATopicConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration) + { + super(ha, groupConfiguration); + } + + /** + * @param ha + * @param initialConnectors + */ + public ActiveMQXATopicConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors) + { + super(ha, initialConnectors); + } + + public int getFactoryType() + { + return JMSFactoryType.TOPIC_XA_CF.intValue(); + } +}
