Repository: qpid-jms Updated Branches: refs/heads/master 8dd97074a -> a4fa85a02
QPIDJMS-380 Implement JMS ConnectionConsumer functionality Add support for JMS ConnectionConsumer and add tests. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/a4fa85a0 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/a4fa85a0 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/a4fa85a0 Branch: refs/heads/master Commit: a4fa85a02149c97a27259c41b227915bd11a99c8 Parents: 8dd9707 Author: Timothy Bish <[email protected]> Authored: Thu Sep 14 17:12:48 2017 -0400 Committer: Timothy Bish <[email protected]> Committed: Fri Apr 20 14:35:40 2018 -0400 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsConnection.java | 205 +++++-- .../apache/qpid/jms/JmsConnectionConsumer.java | 289 ++++++++++ .../org/apache/qpid/jms/JmsMessageConsumer.java | 15 +- .../java/org/apache/qpid/jms/JmsSession.java | 103 +++- .../jms/message/JmsInboundMessageDispatch.java | 10 + .../apache/qpid/jms/meta/JmsConsumerInfo.java | 20 + .../qpid/jms/policy/JmsPrefetchPolicy.java | 2 +- .../qpid/jms/provider/amqp/AmqpConnection.java | 9 + .../provider/amqp/AmqpConnectionSession.java | 16 + .../qpid/jms/provider/amqp/AmqpProvider.java | 9 +- .../org/apache/qpid/jms/JmsConnectionTest.java | 38 -- .../org/apache/qpid/jms/JmsSessionTest.java | 5 - .../ConnectionConsumerIntegrationTest.java | 540 +++++++++++++++++++ .../failover/FailoverIntegrationTest.java | 71 +++ 14 files changed, 1217 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index 623a580..1ef72f7 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -55,6 +55,7 @@ import org.apache.qpid.jms.exceptions.JmsExceptionSupport; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.apache.qpid.jms.message.JmsMessage; import org.apache.qpid.jms.message.JmsMessageFactory; +import org.apache.qpid.jms.message.JmsMessageTransformation; import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; import org.apache.qpid.jms.meta.JmsConnectionId; import org.apache.qpid.jms.meta.JmsConnectionInfo; @@ -80,6 +81,9 @@ import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; import org.apache.qpid.jms.provider.ProviderFuture; import org.apache.qpid.jms.provider.ProviderListener; import org.apache.qpid.jms.provider.ProviderSynchronization; +import org.apache.qpid.jms.util.FifoMessageQueue; +import org.apache.qpid.jms.util.MessageQueue; +import org.apache.qpid.jms.util.PriorityMessageQueue; import org.apache.qpid.jms.util.QpidJMSThreadFactory; import org.apache.qpid.jms.util.ThreadPoolUtils; import org.slf4j.Logger; @@ -92,7 +96,8 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection private static final Logger LOG = LoggerFactory.getLogger(JmsConnection.class); - private final Map<JmsSessionId, JmsSession> sessions = new ConcurrentHashMap<JmsSessionId, JmsSession>(); + private final Map<JmsSessionId, JmsSession> sessions = new ConcurrentHashMap<>(); + private final Map<JmsConsumerId, JmsConnectionConsumer> connectionConsumers = new ConcurrentHashMap<>(); private final AtomicBoolean connected = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean closing = new AtomicBoolean(); @@ -105,15 +110,13 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection private JmsMessageFactory messageFactory; private Provider provider; - private final Set<JmsConnectionListener> connectionListeners = - new CopyOnWriteArraySet<JmsConnectionListener>(); - private final Map<JmsTemporaryDestination, JmsTemporaryDestination> tempDestinations = - new ConcurrentHashMap<JmsTemporaryDestination, JmsTemporaryDestination>(); + private final Set<JmsConnectionListener> connectionListeners = new CopyOnWriteArraySet<>(); + private final Map<JmsTemporaryDestination, JmsTemporaryDestination> tempDestinations = new ConcurrentHashMap<>(); private final AtomicLong sessionIdGenerator = new AtomicLong(); private final AtomicLong tempDestIdGenerator = new AtomicLong(); private final AtomicLong transactionIdGenerator = new AtomicLong(); - - private final Map<AsyncResult, AsyncResult> requests = new ConcurrentHashMap<AsyncResult, AsyncResult>(); + private final AtomicLong connectionConsumerIdGenerator = new AtomicLong(); + private final Map<AsyncResult, AsyncResult> requests = new ConcurrentHashMap<>(); protected JmsConnection(final JmsConnectionInfo connectionInfo, Provider provider) throws JMSException { @@ -205,6 +208,10 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection session.shutdown(); } + for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) { + connectionConsumer.shutdown(); + } + if (isConnected() && !isFailed()) { ProviderFuture request = new ProviderFuture(); requests.put(request, request); @@ -274,6 +281,10 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection session.shutdown(cause); } + for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) { + connectionConsumer.shutdown(); + } + if (isConnected() && !isFailed() && !closing.get()) { destroyResource(connectionInfo); } @@ -345,9 +356,13 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection createJmsConnection(); if (started.compareAndSet(false, true)) { try { - for (JmsSession s : sessions.values()) { - s.start(); + for (JmsSession session : sessions.values()) { + session.start(); } + + for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) { + connectionConsumer.start(); + } } catch (Exception e) { throw JmsExceptionSupport.create(e); } @@ -379,9 +394,15 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection } if (started.compareAndSet(true, false)) { - synchronized(sessions) { - for (JmsSession s : sessions.values()) { - s.stop(); + synchronized (sessions) { + for (JmsSession session : sessions.values()) { + session.stop(); + } + } + + synchronized (connectionConsumers) { + for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) { + connectionConsumer.stop(); } } } @@ -391,48 +412,95 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection public ConnectionConsumer createSharedConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { checkClosedOrFailed(); createJmsConnection(); - throw new JMSException("Not supported"); + + return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, subscriptionName, false, true); } @Override public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { checkClosedOrFailed(); createJmsConnection(); - throw new JMSException("Not supported"); + + return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, subscriptionName, true, true); } @Override - public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, - String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { + public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { checkClosedOrFailed(); createJmsConnection(); - throw new JMSException("Not supported"); + + return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, subscriptionName, true, false); } @Override - public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, - ServerSessionPool sessionPool, int maxMessages) throws JMSException { + public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { checkClosedOrFailed(); createJmsConnection(); - throw new JMSException("Not supported"); + + return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, null, false, false); } @Override - public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, - ServerSessionPool sessionPool, int maxMessages) throws JMSException { + public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { checkClosedOrFailed(); createJmsConnection(); - throw new JMSException("Not supported"); + + return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, null, false, false); } @Override - public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, - ServerSessionPool sessionPool, int maxMessages) throws JMSException { + public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { checkClosedOrFailed(); createJmsConnection(); - throw new JMSException("Not supported"); + + return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, null, false, false); } + private ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, String subscriptionName, boolean durable, boolean shared) throws JMSException { + JmsDestination jmsDestination = JmsMessageTransformation.transformDestination(this, destination); + + int configuredPrefetch = getPrefetchPolicy().getConfiguredPrefetch((JmsSession) null, jmsDestination, durable, false); + + final MessageQueue messageQueue; + + if (isLocalMessagePriority()) { + messageQueue = new PriorityMessageQueue(); + } else { + messageQueue = new FifoMessageQueue(configuredPrefetch); + } + + JmsConsumerInfo consumerInfo = new JmsConsumerInfo(getNextConnectionConsumerId(), messageQueue); + consumerInfo.setExplicitClientID(isExplicitClientID()); + consumerInfo.setSelector(messageSelector); + consumerInfo.setDurable(durable); + consumerInfo.setSubscriptionName(subscriptionName); + consumerInfo.setShared(shared); + consumerInfo.setDestination(jmsDestination); + consumerInfo.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE); + consumerInfo.setNoLocal(false); + consumerInfo.setBrowser(false); + consumerInfo.setPrefetchSize(configuredPrefetch); + consumerInfo.setRedeliveryPolicy(getRedeliveryPolicy().copy()); + consumerInfo.setLocalMessageExpiry(isLocalMessageExpiry()); + consumerInfo.setPresettle(false); + consumerInfo.setDeserializationPolicy(getDeserializationPolicy().copy()); + consumerInfo.setMaxMessages(maxMessages); + consumerInfo.setConnectionConsumer(true); + + JmsConnectionConsumer consumer = new JmsConnectionConsumer(this, consumerInfo, messageQueue, sessionPool); + + try { + consumer.init(); + if (started.get()) { + consumer.start(); + } + return consumer; + } catch (JMSException jmsEx) { + consumer.close(); + throw jmsEx; + } + } + @Override public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { checkClosedOrFailed(); @@ -497,6 +565,14 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection sessions.put(sessionInfo.getId(), session); } + protected void removeConnectionConsumer(JmsConsumerInfo consumerInfo) throws JMSException { + connectionConsumers.remove(consumerInfo.getId()); + } + + protected void addConnectionConsumer(JmsConsumerInfo consumerInfo, JmsConnectionConsumer consumer) { + connectionConsumers.put(consumerInfo.getId(), consumer); + } + private void createJmsConnection() throws JMSException { if (isConnected() || closed.get()) { return; @@ -589,6 +665,10 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection return new JmsTransactionId(connectionInfo.getId(), transactionIdGenerator.incrementAndGet()); } + protected JmsConsumerId getNextConnectionConsumerId() { + return new JmsConsumerId(connectionInfo.getId().toString(), -1, connectionConsumerIdGenerator.incrementAndGet()); + } + protected synchronized boolean isExplicitClientID() { return connectionInfo.isExplicitClientID(); } @@ -1106,6 +1186,11 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection JmsMessageDispatcher dispatcher = sessions.get(envelope.getConsumerId().getParentId()); if (dispatcher != null) { dispatcher.onInboundMessage(envelope); + } else { + dispatcher = connectionConsumers.get(envelope.getConsumerId()); + if (dispatcher != null) { + dispatcher.onInboundMessage(envelope); + } } // Run the application callbacks on the connection executor to allow the provider to @@ -1176,6 +1261,15 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection request.sync(); } + for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) { + JmsConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo(); + if (consumerInfo.isOpen()) { + request = new ProviderFuture(); + provider.create(consumerInfo, request); + request.sync(); + } + } + for (JmsSession session : sessions.values()) { session.onConnectionRecovery(provider); } @@ -1188,6 +1282,15 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection setMessageFactory(provider.getMessageFactory()); connectionInfo.setConnectedURI(provider.getRemoteURI()); + for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) { + JmsConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo(); + if (consumerInfo.isOpen()) { + ProviderFuture request = new ProviderFuture(); + provider.start(consumerInfo, request); + request.sync(); + } + } + for (JmsSession session : sessions.values()) { session.onConnectionRecovered(provider); } @@ -1317,14 +1420,22 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection } } } else if (resource instanceof JmsConsumerInfo) { - JmsSessionId parentId = ((JmsConsumerInfo) resource).getParentId(); - JmsSession session = sessions.get(parentId); - if (session != null) { - JmsMessageConsumer consumer = session.lookup((JmsConsumerId) resource.getId()); + JmsConsumerInfo consumerInfo = (JmsConsumerInfo) resource; + if (consumerInfo.isConnectionConsumer()) { + JmsConnectionConsumer consumer = connectionConsumers.get(consumerInfo.getId()); if (consumer != null) { consumer.setFailureCause(cause); } - } + } else { + JmsSessionId parentId = consumerInfo.getParentId(); + JmsSession session = sessions.get(parentId); + if (session != null) { + JmsMessageConsumer consumer = session.lookup((JmsConsumerId) resource.getId()); + if (consumer != null) { + consumer.setFailureCause(cause); + } + } + } } executor.execute(new Runnable() { @@ -1351,16 +1462,32 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection } } } else if (resource instanceof JmsConsumerInfo) { - JmsSessionId parentId = ((JmsConsumerInfo) resource).getParentId(); - JmsSession session = sessions.get(parentId); - if (session != null) { - JmsMessageConsumer consumer = session.consumerClosed((JmsConsumerInfo) resource, cause); - if (consumer != null) { - for (JmsConnectionListener listener : connectionListeners) { - listener.onConsumerClosed(consumer, cause); + JmsConsumerInfo consumerInfo = (JmsConsumerInfo) resource; + if (consumerInfo.isConnectionConsumer()) { + JmsConnectionConsumer consumer = connectionConsumers.get(consumerInfo.getId()); + if (consumer != null) { + try { + if (consumer != null) { + consumer.shutdown(cause); + } + } catch (Throwable error) { + LOG.trace("Ignoring exception thrown during cleanup of closed connection consumer", error); } + + onAsyncException(new JMSException("Connection Consumer remotely closed").initCause(cause)); } - } + } else { + JmsSessionId parentId = consumerInfo.getParentId(); + JmsSession session = sessions.get(parentId); + if (session != null) { + JmsMessageConsumer consumer = session.consumerClosed((JmsConsumerInfo) resource, cause); + if (consumer != null) { + for (JmsConnectionListener listener : connectionListeners) { + listener.onConsumerClosed(consumer, cause); + } + } + } + } } else { LOG.info("A JMS resource has been remotely closed: {}", resource); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java new file mode 100644 index 0000000..862b959 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.jms.ConnectionConsumer; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.ServerSession; +import javax.jms.ServerSessionPool; +import javax.jms.Session; + +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.apache.qpid.jms.meta.JmsConsumerInfo; +import org.apache.qpid.jms.meta.JmsResource.ResourceState; +import org.apache.qpid.jms.util.MessageQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JMS Connection Consumer implementation. + */ +public class JmsConnectionConsumer implements ConnectionConsumer, JmsMessageDispatcher { + + private static final Logger LOG = LoggerFactory.getLogger(JmsConnectionConsumer.class); + + private static final long DEFAULT_DISPATCH_RETRY_DELAY = 1000; + + private final JmsConnection connection; + private final JmsConsumerInfo consumerInfo; + private final ServerSessionPool sessionPool; + private final MessageQueue messageQueue; + + private final Lock stateLock = new ReentrantLock(); + private final Lock dispatchLock = new ReentrantLock(); + private final AtomicBoolean closed = new AtomicBoolean(false); + private final AtomicReference<Throwable> failureCause = new AtomicReference<>(); + private final ScheduledThreadPoolExecutor dispatcher; + + public JmsConnectionConsumer(JmsConnection connection, JmsConsumerInfo consumerInfo, MessageQueue messageQueue, ServerSessionPool sessionPool) throws JMSException { + this.connection = connection; + this.consumerInfo = consumerInfo; + this.sessionPool = sessionPool; + this.messageQueue = messageQueue; + this.dispatcher = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { + + @Override + public Thread newThread(Runnable runner) { + Thread serial = new Thread(runner); + serial.setDaemon(true); + serial.setName(this.getClass().getSimpleName() + ":(" + consumerInfo.getId() + ")"); + return serial; + } + }); + + // Ensure a timely shutdown for consumer close. + dispatcher.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + dispatcher.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + + connection.addConnectionConsumer(consumerInfo, this); + try { + connection.createResource(consumerInfo); + } catch (JMSException jmse) { + connection.removeConnectionConsumer(consumerInfo); + throw jmse; + } + } + + public JmsConnectionConsumer init() throws JMSException { + getConnection().startResource(consumerInfo); + return this; + } + + @Override + public void onInboundMessage(JmsInboundMessageDispatch envelope) { + envelope.setConsumerInfo(consumerInfo); + + stateLock.lock(); + try { + if (envelope.isEnqueueFirst()) { + this.messageQueue.enqueueFirst(envelope); + } else { + this.messageQueue.enqueue(envelope); + } + + if (messageQueue.isRunning()) { + try { + dispatcher.execute(() -> deliverNextPending()); + } catch (RejectedExecutionException rje) { + LOG.debug("Rejected on attempt to queue message dispatch", rje); + } + } + } finally { + stateLock.unlock(); + } + } + + @Override + public void close() throws JMSException { + if (!closed.get()) { + doClose(); + } + } + + /** + * Called to initiate shutdown of consumer resources and request that the remote + * peer remove the registered producer. + * + * @throws JMSException if an error occurs during the consumer close operation. + */ + protected void doClose() throws JMSException { + shutdown(); + this.connection.destroyResource(consumerInfo); + } + + protected void shutdown() throws JMSException { + shutdown(null); + } + + protected void shutdown(Throwable cause) throws JMSException { + if (closed.compareAndSet(false, true)) { + dispatchLock.lock(); + try { + failureCause.set(cause); + consumerInfo.setState(ResourceState.CLOSED); + connection.removeConnectionConsumer(consumerInfo); + stop(true); + dispatcher.shutdown(); + try { + dispatcher.awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.trace("ConnectionConsumer shutdown of dispatcher was interupted"); + } + } finally { + dispatchLock.unlock(); + } + } + } + + public void start() { + stateLock.lock(); + try { + if (!messageQueue.isRunning()) { + this.messageQueue.start(); + this.dispatcher.execute(new BoundedMessageDeliverTask(messageQueue.size())); + } + } finally { + stateLock.unlock(); + } + } + + public void stop() { + stop(false); + } + + private void stop(boolean closeMessageQueue) { + dispatchLock.lock(); + stateLock.lock(); + try { + if (closeMessageQueue) { + this.messageQueue.close(); + } else { + this.messageQueue.stop(); + } + } finally { + stateLock.unlock(); + dispatchLock.unlock(); + } + } + + @Override + public ServerSessionPool getServerSessionPool() throws JMSException { + checkClosed(); + return sessionPool; + } + + JmsConnection getConnection() { + return connection; + } + + JmsConsumerInfo getConsumerInfo() { + return consumerInfo; + } + + void setFailureCause(Throwable failureCause) { + this.failureCause.set(failureCause); + } + + Throwable getFailureCause() { + return failureCause.get(); + } + + @Override + public String toString() { + return "JmsConnectionConsumer { id=" + consumerInfo.getId() + " }"; + } + + protected void checkClosed() throws IllegalStateException { + if (closed.get()) { + IllegalStateException jmsEx = null; + + if (getFailureCause() == null) { + jmsEx = new IllegalStateException("The ConnectionConsumer is closed"); + } else { + jmsEx = new IllegalStateException("The ConnectionConsumer was closed due to an unrecoverable error."); + jmsEx.initCause(getFailureCause()); + } + + throw jmsEx; + } + } + + private boolean deliverNextPending() { + if (messageQueue.isRunning() && !messageQueue.isEmpty()) { + dispatchLock.lock(); + + try { + ServerSession serverSession = getServerSessionPool().getServerSession(); + if (serverSession == null) { + // There might not be an available session so queue a task to try again + // and hope that by then one is available in the pool. + dispatcher.schedule(new BoundedMessageDeliverTask(messageQueue.size()), DEFAULT_DISPATCH_RETRY_DELAY, TimeUnit.MILLISECONDS); + return false; + } + + Session session = serverSession.getSession(); + + JmsInboundMessageDispatch envelope = messageQueue.dequeueNoWait(); + + if (session instanceof JmsSession) { + ((JmsSession) session).enqueueInSession(envelope); + } else { + LOG.warn("ServerSession provided an onknown JMS Session type to this connection consumer: {}", session); + } + + serverSession.start(); + } catch (JMSException e) { + connection.onAsyncException(e); + stop(true); + } finally { + dispatchLock.unlock(); + } + } + + return !messageQueue.isEmpty(); + } + + private final class BoundedMessageDeliverTask implements Runnable { + + private final int deliveryCount; + + public BoundedMessageDeliverTask(int deliveryCount) { + this.deliveryCount = deliveryCount; + } + + @Override + public void run() { + int current = 0; + + while (messageQueue.isRunning() && current++ < deliveryCount) { + if (!deliverNextPending()) { + return; // Another task already drained the queue. + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java index 9bafd02..7a21e73 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java @@ -328,7 +328,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe timeout = Math.max(deadline - System.currentTimeMillis(), 0); } performPullIfRequired(timeout, false); - } else if (redeliveryExceeded(envelope)) { + } else if (session.redeliveryExceeded(envelope)) { LOG.debug("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope); applyRedeliveryPolicyOutcome(envelope); if (timeout > 0) { @@ -356,15 +356,6 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe return false; } - protected boolean redeliveryExceeded(JmsInboundMessageDispatch envelope) { - LOG.trace("checking envelope with {} redeliveries", envelope.getRedeliveryCount()); - - JmsRedeliveryPolicy redeliveryPolicy = consumerInfo.getRedeliveryPolicy(); - return redeliveryPolicy != null && - redeliveryPolicy.getMaxRedeliveries(getDestination()) >= 0 && - redeliveryPolicy.getMaxRedeliveries(getDestination()) < envelope.getRedeliveryCount(); - } - protected void checkClosed() throws IllegalStateException { if (closed.get()) { IllegalStateException jmsEx = null; @@ -470,6 +461,8 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe */ @Override public void onInboundMessage(final JmsInboundMessageDispatch envelope) { + envelope.setConsumerInfo(consumerInfo); + lock.lock(); try { if (acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) { @@ -720,7 +713,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe if (consumeExpiredMessage(envelope)) { LOG.trace("{} filtered expired message: {}", getConsumerId(), envelope); doAckExpired(envelope); - } else if (redeliveryExceeded(envelope)) { + } else if (session.redeliveryExceeded(envelope)) { LOG.trace("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope); applyRedeliveryPolicyOutcome(envelope); } else { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index 192cfc8..6152991 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -16,6 +16,8 @@ */ package org.apache.qpid.jms; +import static org.apache.qpid.jms.message.JmsMessageSupport.lookupAckTypeForDisposition; + import java.io.Serializable; import java.lang.reflect.Method; import java.lang.reflect.Modifier; @@ -92,6 +94,8 @@ import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; import org.apache.qpid.jms.provider.ProviderFuture; import org.apache.qpid.jms.selector.SelectorParser; import org.apache.qpid.jms.selector.filter.FilterException; +import org.apache.qpid.jms.util.FifoMessageQueue; +import org.apache.qpid.jms.util.MessageQueue; import org.apache.qpid.jms.util.NoOpExecutor; import org.apache.qpid.jms.util.QpidJMSThreadFactory; import org.slf4j.Logger; @@ -113,6 +117,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe private final Map<JmsProducerId, JmsMessageProducer> producers = new ConcurrentHashMap<JmsProducerId, JmsMessageProducer>(); private final Map<JmsConsumerId, JmsMessageConsumer> consumers = new ConcurrentHashMap<JmsConsumerId, JmsMessageConsumer>(); private MessageListener messageListener; + private final MessageQueue sessionQueue = new FifoMessageQueue(16); private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean started = new AtomicBoolean(); private final JmsSessionInfo sessionInfo; @@ -193,7 +198,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe @Override public void setMessageListener(MessageListener listener) throws JMSException { - checkClosed(); + if (listener != null) { + checkClosed(); + } + this.messageListener = listener; } @@ -253,17 +261,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe } @Override - public void run() { - try { - checkClosed(); - } catch (IllegalStateException e) { - throw new RuntimeException(e); - } - - throw new UnsupportedOperationException(); - } - - @Override public void close() throws JMSException { checkIsDeliveryThread(); checkIsCompletionThread(); @@ -721,6 +718,55 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe return connection.createTemporaryTopic(); } + //----- Session dispatch support -----------------------------------------// + + @Override + public void run() { + try { + checkClosed(); + } catch (IllegalStateException ex) { + throw new RuntimeException(ex); + } + + JmsInboundMessageDispatch envelope = null; + while ((envelope = sessionQueue.dequeueNoWait()) != null) { + try { + JmsMessage copy = null; + + if (envelope.getMessage().isExpired()) { + LOG.trace("{} filtered expired message: {}", envelope.getConsumerId(), envelope); + acknowledge(envelope, ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE); + } else if (redeliveryExceeded(envelope)) { + LOG.trace("{} filtered message with excessive redelivery count: {}", envelope.getConsumerId(), envelope); + JmsRedeliveryPolicy redeliveryPolicy = envelope.getConsumerInfo().getRedeliveryPolicy(); + acknowledge(envelope, lookupAckTypeForDisposition(redeliveryPolicy.getOutcome(envelope.getConsumerInfo().getDestination()))); + } else { + boolean deliveryFailed = false; + + copy = acknowledge(envelope, ACK_TYPE.DELIVERED).getMessage().copy(); + + clearSessionRecovered(); + + try { + messageListener.onMessage(copy); + } catch (RuntimeException rte) { + deliveryFailed = true; + } + + if (!isSessionRecovered()) { + if (!deliveryFailed) { + acknowledge(envelope, ACK_TYPE.ACCEPTED); + } else { + acknowledge(envelope, ACK_TYPE.RELEASED); + } + } + } + } catch (Exception e) { + getConnection().onException(e); + } + } + } + //----- Session Implementation methods -----------------------------------// protected void add(JmsMessageConsumer consumer) throws JMSException { @@ -921,8 +967,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe } } - void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException { + JmsInboundMessageDispatch acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException { transactionContext.acknowledge(connection, envelope, ackType); + return envelope; } /** @@ -1061,6 +1108,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe for (JmsMessageConsumer consumer : consumers.values()) { consumer.start(); } + + sessionQueue.start(); } } @@ -1071,6 +1120,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe consumer.stop(); } + sessionQueue.stop(); + synchronized (sessionInfo) { if (deliveryExecutor != null) { deliveryExecutor.shutdown(); @@ -1280,6 +1331,17 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe } } + boolean redeliveryExceeded(JmsInboundMessageDispatch envelope) { + LOG.trace("checking envelope with {} redeliveries", envelope.getRedeliveryCount()); + + JmsConsumerInfo consumerInfo = envelope.getConsumerInfo(); + + JmsRedeliveryPolicy redeliveryPolicy = consumerInfo.getRedeliveryPolicy(); + return redeliveryPolicy != null && + redeliveryPolicy.getMaxRedeliveries(consumerInfo.getDestination()) >= 0 && + redeliveryPolicy.getMaxRedeliveries(consumerInfo.getDestination()) < envelope.getRedeliveryCount(); + } + //----- Event handlers ---------------------------------------------------// @Override @@ -1354,16 +1416,17 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe if (id == null) { this.connection.onException(new JMSException("No ConsumerId set for " + envelope.getMessage())); } - if (messageListener != null) { - messageListener.onMessage(envelope.getMessage()); - } else { - JmsMessageConsumer consumer = consumers.get(id); - if (consumer != null) { - consumer.onInboundMessage(envelope); - } + + JmsMessageConsumer consumer = consumers.get(id); + if (consumer != null) { + consumer.onInboundMessage(envelope); } } + void enqueueInSession(JmsInboundMessageDispatch envelope) { + sessionQueue.enqueue(envelope); + } + //----- Asynchronous Send Helpers ----------------------------------------// private final class FailOrCompleteAsyncCompletionsTask implements Runnable { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java index fbb5a1d..e55426f 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java @@ -18,6 +18,7 @@ package org.apache.qpid.jms.message; import org.apache.qpid.jms.meta.JmsAbstractResourceId; import org.apache.qpid.jms.meta.JmsConsumerId; +import org.apache.qpid.jms.meta.JmsConsumerInfo; /** * Envelope used to deliver incoming messages to their targeted consumer. @@ -31,6 +32,7 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId { private boolean enqueueFirst; private boolean delivered; + private transient JmsConsumerInfo consumerInfo; private transient String stringView; public JmsInboundMessageDispatch(long sequence) { @@ -83,6 +85,14 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId { return redeliveryCount; } + public JmsConsumerInfo getConsumerInfo() { + return consumerInfo; + } + + public void setConsumerInfo(JmsConsumerInfo consumerInfo) { + this.consumerInfo = consumerInfo; + } + @Override public String toString() { if (stringView == null) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java index 74256ff..b2c1d07 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java @@ -38,6 +38,8 @@ public final class JmsConsumerInfo extends JmsAbstractResource implements Compar private int acknowledgementMode; private boolean localMessageExpiry; private boolean presettle; + private boolean connectionConsumer; + private int maxMessages; private volatile boolean listener; private final MessageQueue messageQueue; @@ -76,6 +78,8 @@ public final class JmsConsumerInfo extends JmsAbstractResource implements Compar info.redeliveryPolicy = getRedeliveryPolicy().copy(); info.deserializationPolicy = getDeserializationPolicy().copy(); info.listener = listener; + info.connectionConsumer = connectionConsumer; + info.maxMessages = maxMessages; } public int getPrefetchedMessageCount() { @@ -225,6 +229,22 @@ public final class JmsConsumerInfo extends JmsAbstractResource implements Compar this.presettle = presettle; } + public boolean isConnectionConsumer() { + return connectionConsumer; + } + + public void setConnectionConsumer(boolean connectionConsumer) { + this.connectionConsumer = connectionConsumer; + } + + public int getMaxMessages() { + return maxMessages; + } + + public void setMaxMessages(int maxMessages) { + this.maxMessages = maxMessages; + } + @Override public String toString() { return "JmsConsumerInfo: { " + getId() + ", destination = " + getDestination() + " }"; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPrefetchPolicy.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPrefetchPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPrefetchPolicy.java index efc49de..2834c60 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPrefetchPolicy.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPrefetchPolicy.java @@ -36,7 +36,7 @@ public interface JmsPrefetchPolicy { * Returns the prefetch value to use when creating a MessageConsumer instance. * * @param session - * the Session that own the MessageConsumer being created. + * the Session that own the MessageConsumer being created. (null for a ConnectionConsumer). * @param destination * the Destination that the consumer will be subscribed to. * @param durable http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java index c93107b..0c43e43 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java @@ -177,6 +177,15 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn } /** + * Retrieves the AmqpConnectionSession owned by this AmqpConnection. + * + * @return the AmqpConnectionSession owned by this AmqpConnection. + */ + public AmqpConnectionSession getConnectionSession() { + return connectionSession; + } + + /** * @return true if anonymous producers should be cached or closed on send complete. */ public boolean isAnonymousProducerCache() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java index 5c0b53f..beb5a0b 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java @@ -85,11 +85,27 @@ public class AmqpConnectionSession extends AmqpSession { } @Override + public void addChildResource(AmqpResource resource) { + // When a Connection Consumer is created the Connection is doing so + // without a known session to associate it with, we link up the consumer + // to this session by adding this session as the provider hint on the + // consumer's parent session ID. + if (resource instanceof AmqpConsumer) { + AmqpConsumer consumer = (AmqpConsumer) resource; + consumer.getConsumerId().getParentId().setProviderHint(this); + } + + super.addChildResource(resource); + } + + @Override public void handleResourceClosure(AmqpProvider provider, Throwable cause) { List<AsyncResult> pending = new ArrayList<>(pendingUnsubs.values()); for (AsyncResult unsubscribeRequest : pending) { unsubscribeRequest.onFailure(cause); } + + super.handleResourceClosure(provider, cause); } private static final class DurableSubscriptionReattach extends AmqpAbstractResource<JmsSessionInfo, Receiver> { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 916e87c..176a3a0 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -382,7 +382,14 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP @Override public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception { - AmqpSession session = connection.getSession(consumerInfo.getParentId()); + final AmqpSession session; + + if (consumerInfo.isConnectionConsumer()) { + session = connection.getConnectionSession(); + } else { + session = connection.getSession(consumerInfo.getParentId()); + } + session.createConsumer(consumerInfo, request); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java index 9cf74c4..7577338 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java @@ -305,42 +305,4 @@ public class JmsConnectionTest { int minor = metaData.getProviderMinorVersion(); assertTrue("Expected non-zero provider major(" + major + ") / minor(" + minor +") version.", (major + minor) != 0); } - - //----- Currently these are unimplemented, these will fail after that ----// - - @Test(timeout=30000, expected=JMSException.class) - public void testCreateConnectionConsumer() throws Exception { - connection = new JmsConnection(connectionInfo, provider); - connection.createConnectionConsumer((JmsDestination) new JmsTopic(), "", null, 1); - } - - @Test(timeout=30000, expected=JMSException.class) - public void testCreateConnectionTopicConsumer() throws Exception { - connection = new JmsConnection(connectionInfo, provider); - connection.createConnectionConsumer(new JmsTopic(), "", null, 1); - } - - @Test(timeout=30000, expected=JMSException.class) - public void testCreateConnectionQueueConsumer() throws Exception { - connection = new JmsConnection(connectionInfo, provider); - connection.createConnectionConsumer(new JmsQueue(), "", null, 1); - } - - @Test(timeout=30000, expected=JMSException.class) - public void testCreateDurableConnectionQueueConsumer() throws Exception { - connection = new JmsConnection(connectionInfo, provider); - connection.createDurableConnectionConsumer(new JmsTopic(), "", "", null, 1); - } - - @Test(timeout=30000, expected=JMSException.class) - public void testCreateSharedConnectionConsumer() throws Exception { - connection = new JmsConnection(connectionInfo, provider); - connection.createSharedConnectionConsumer(new JmsTopic(), "id", "", null, 1); - } - - @Test(timeout=30000, expected=JMSException.class) - public void testCreateSharedDurableConnectionConsumer() throws Exception { - connection = new JmsConnection(connectionInfo, provider); - connection.createSharedDurableConnectionConsumer(new JmsTopic(), "id", "", null, 1); - } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java index 6dbb661..034ea59 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java @@ -364,11 +364,6 @@ public class JmsSessionTest extends JmsConnectionTestSupport { public void testSessionRunFailsWhenSessionIsClosed() throws Exception { JmsSession session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - try { - session.run(); - fail("Not implemented"); - } catch (UnsupportedOperationException usoe) {} - session.close(); try { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java new file mode 100644 index 0000000..934b385 --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.integration; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ConnectionConsumer; +import javax.jms.ExceptionListener; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.ServerSession; +import javax.jms.ServerSessionPool; +import javax.jms.Session; + +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsDefaultConnectionListener; +import org.apache.qpid.jms.JmsQueue; +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.apache.qpid.jms.test.QpidJmsTestCase; +import org.apache.qpid.jms.test.Wait; +import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; +import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError; +import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType; +import org.apache.qpid.proton.amqp.DescribedType; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test for expected behaviors of JMS Connection Consumer implementation. + */ +public class ConnectionConsumerIntegrationTest extends QpidJmsTestCase { + + private static final Logger LOG = LoggerFactory.getLogger(ConnectionConsumerIntegrationTest.class); + + private final IntegrationTestFixture testFixture = new IntegrationTestFixture(); + + @Test(timeout = 20000) + public void testCreateConnectionConsumer() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsServerSessionPool sessionPool = new JmsServerSessionPool(); + Connection connection = testFixture.establishConnecton(testPeer); + + // No additional Begin calls as there's no Session created for a Connection Consumer + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlow(); + + Queue queue = new JmsQueue("myQueue"); + ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100); + + testPeer.expectDetach(true, true, true); + consumer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testConnectionConsumerDispatchesToSessionConnectionSratedBeforeCreate() throws Exception { + doTestConnectionConsumerDispatchesToSession(true); + } + + @Test(timeout = 20000) + public void testConnectionConsumerDispatchesToSessionConnectionSratedAfterCreate() throws Exception { + doTestConnectionConsumerDispatchesToSession(false); + } + + private void doTestConnectionConsumerDispatchesToSession(boolean startBeforeCreate) throws Exception { + final CountDownLatch messageArrived = new CountDownLatch(1); + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + + if (startBeforeCreate) { + connection.start(); + } + + testPeer.expectBegin(); + + // Create a session for our ServerSessionPool to use + Session session = connection.createSession(); + session.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + messageArrived.countDown(); + } + }); + JmsServerSession serverSession = new JmsServerSession(session); + JmsServerSessionPool sessionPool = new JmsServerSessionPool(serverSession); + + // Now the Connection consumer arrives and we give it a message + // to be dispatched to the server session. + DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); + testPeer.expectDispositionThatIsAcceptedAndSettled(); + + Queue queue = new JmsQueue("myQueue"); + ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100); + + if (!startBeforeCreate) { + connection.start(); + } + + assertTrue("Message didn't arrive in time", messageArrived.await(10, TimeUnit.SECONDS)); + + testPeer.expectDetach(true, true, true); + consumer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testNonStartedConnectionConsumerDoesNotDispatch() throws Exception { + final CountDownLatch messageArrived = new CountDownLatch(1); + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + + testPeer.expectBegin(); + + // Create a session for our ServerSessionPool to use + Session session = connection.createSession(); + session.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + messageArrived.countDown(); + } + }); + JmsServerSession serverSession = new JmsServerSession(session); + JmsServerSessionPool sessionPool = new JmsServerSessionPool(serverSession); + + // Now the Connection consumer arrives and we give it a message + // to be dispatched to the server session. + DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); + + Queue queue = new JmsQueue("myQueue"); + ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100); + + assertFalse("Message Arrived unexpectedly", messageArrived.await(500, TimeUnit.MILLISECONDS)); + + testPeer.expectDetach(true, true, true); + testPeer.expectDispositionThatIsReleasedAndSettled(); + consumer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testQueuedMessagesAreDrainedToServerSession() throws Exception { + final int MESSAGE_COUNT = 10; + final CountDownLatch messagesDispatched = new CountDownLatch(MESSAGE_COUNT); + final CountDownLatch messagesArrived = new CountDownLatch(MESSAGE_COUNT); + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + + @Override + public void onInboundMessage(JmsInboundMessageDispatch envelope) { + messagesDispatched.countDown(); + } + }); + + testPeer.expectBegin(); + + // Create a session for our ServerSessionPool to use + Session session = connection.createSession(); + session.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + messagesArrived.countDown(); + } + }); + + JmsServerSession serverSession = new JmsServerSession(session); + JmsServerSessionPool sessionPool = new JmsServerSessionPool(serverSession); + + // Now the Connection consumer arrives and we give it a message + // to be dispatched to the server session. + DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, MESSAGE_COUNT); + + for (int i = 0; i < MESSAGE_COUNT; i++) { + testPeer.expectDispositionThatIsAcceptedAndSettled(); + } + + Queue queue = new JmsQueue("myQueue"); + ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100); + + assertTrue("Message didn't arrive in time", messagesDispatched.await(10, TimeUnit.SECONDS)); + assertEquals(MESSAGE_COUNT, messagesArrived.getCount()); + + connection.start(); + + assertTrue("Message didn't arrive in time", messagesArrived.await(10, TimeUnit.SECONDS)); + + testPeer.expectDetach(true, true, true); + consumer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testConsumerRecoversAfterSessionPoolReturnsNullSession() throws Exception { + final int MESSAGE_COUNT = 10; + final CountDownLatch messagesDispatched = new CountDownLatch(MESSAGE_COUNT); + final CountDownLatch messagesArrived = new CountDownLatch(MESSAGE_COUNT); + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + + @Override + public void onInboundMessage(JmsInboundMessageDispatch envelope) { + messagesDispatched.countDown(); + } + }); + + testPeer.expectBegin(); + + // Create a session for our ServerSessionPool to use + Session session = connection.createSession(); + session.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + messagesArrived.countDown(); + } + }); + + JmsServerSession serverSession = new JmsServerSession(session); + JmsServerSessionPoolFirstAttemptGetsNull sessionPool = new JmsServerSessionPoolFirstAttemptGetsNull(serverSession); + + // Now the Connection consumer arrives and we give it a message + // to be dispatched to the server session. + DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, MESSAGE_COUNT); + + for (int i = 0; i < MESSAGE_COUNT; i++) { + testPeer.expectDispositionThatIsAcceptedAndSettled(); + } + + Queue queue = new JmsQueue("myQueue"); + ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100); + + assertTrue("Message didn't arrive in time", messagesDispatched.await(10, TimeUnit.SECONDS)); + assertEquals(MESSAGE_COUNT, messagesArrived.getCount()); + + connection.start(); + + assertTrue("Message didn't arrive in time", messagesArrived.await(10, TimeUnit.SECONDS)); + + testPeer.expectDetach(true, true, true); + consumer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testRemotelyCloseConnectionConsumer() throws Exception { + final String BREAD_CRUMB = "ErrorMessage"; + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + final CountDownLatch connectionError = new CountDownLatch(1); + JmsServerSessionPool sessionPool = new JmsServerSessionPool(); + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.setExceptionListener(new ExceptionListener() { + + @Override + public void onException(JMSException exception) { + connectionError.countDown(); + } + }); + + // Create a consumer, then remotely end it afterwards. + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlow(); + testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_DELETED, BREAD_CRUMB); + + Queue queue = new JmsQueue("myQueue"); + ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100); + + // Verify the consumer gets marked closed + testPeer.waitForAllHandlersToComplete(1000); + assertTrue("consumer never closed.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + try { + consumer.getServerSessionPool(); + } catch (IllegalStateException jmsise) { + LOG.debug("Error reported from consumer.getServerSessionPool()", jmsise); + if (jmsise.getCause() != null) { + String message = jmsise.getCause().getMessage(); + return message.contains(AmqpError.RESOURCE_DELETED.toString()) && + message.contains(BREAD_CRUMB); + } else { + return false; + } + } + return false; + } + }, 10000, 10)); + + assertTrue("Consumer closed callback didn't trigger", connectionError.await(5, TimeUnit.SECONDS)); + + // Try closing it explicitly, should effectively no-op in client. + // The test peer will throw during close if it sends anything. + consumer.close(); + + testPeer.expectClose(); + connection.close(); + } + } + + @Test(timeout = 20000) + public void testOnExceptionFiredOnSessionPoolFailure() throws Exception { + final CountDownLatch exceptionFired = new CountDownLatch(1); + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.setExceptionListener(new ExceptionListener() { + + @Override + public void onException(JMSException exception) { + exceptionFired.countDown(); + } + }); + + connection.start(); + + JmsFailingServerSessionPool sessionPool = new JmsFailingServerSessionPool(); + + // Now the Connection consumer arrives and we give it a message + // to be dispatched to the server session. + DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); + + Queue queue = new JmsQueue("myQueue"); + ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100); + + assertTrue("Exception should have been fired", exceptionFired.await(5, TimeUnit.SECONDS)); + + testPeer.expectDetach(true, true, true); + testPeer.expectDispositionThatIsReleasedAndSettled(); + consumer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testOnExceptionFiredOnServerSessionFailure() throws Exception { + final CountDownLatch exceptionFired = new CountDownLatch(1); + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.setExceptionListener(new ExceptionListener() { + + @Override + public void onException(JMSException exception) { + exceptionFired.countDown(); + } + }); + + connection.start(); + + JmsServerSessionPool sessionPool = new JmsServerSessionPool(new JmsFailingServerSession()); + + // Now the Connection consumer arrives and we give it a message + // to be dispatched to the server session. + DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); + + Queue queue = new JmsQueue("myQueue"); + ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100); + + assertTrue("Exception should have been fired", exceptionFired.await(5, TimeUnit.SECONDS)); + + testPeer.expectDetach(true, true, true); + testPeer.expectDispositionThatIsReleasedAndSettled(); + consumer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + //----- Internal ServerSessionPool ---------------------------------------// + + private class JmsFailingServerSessionPool implements ServerSessionPool { + + public JmsFailingServerSessionPool() { + } + + @Override + public ServerSession getServerSession() throws JMSException { + throw new JMSException("Something is wrong with me"); + } + } + + private class JmsServerSessionPool implements ServerSessionPool { + + private JmsServerSession serverSession; + + public JmsServerSessionPool() { + this.serverSession = new JmsServerSession(); + } + + public JmsServerSessionPool(JmsServerSession serverSession) { + this.serverSession = serverSession; + } + + @Override + public ServerSession getServerSession() throws JMSException { + return serverSession; + } + } + + private class JmsServerSessionPoolFirstAttemptGetsNull implements ServerSessionPool { + + private volatile boolean firstAttempt = true; + private JmsServerSession serverSession; + + public JmsServerSessionPoolFirstAttemptGetsNull(JmsServerSession serverSession) { + this.serverSession = serverSession; + } + + @Override + public ServerSession getServerSession() throws JMSException { + if (firstAttempt) { + firstAttempt = false; + return null; + } else { + return serverSession; + } + } + } + + private class JmsServerSession implements ServerSession { + + private final Session session; + private final ExecutorService runner = Executors.newSingleThreadExecutor(); + + public JmsServerSession() { + this.session = null; + } + + public JmsServerSession(Session session) { + this.session = session; + } + + @Override + public Session getSession() throws JMSException { + return session; + } + + @Override + public void start() throws JMSException { + runner.execute(() -> { + session.run(); + }); + } + } + + private class JmsFailingServerSession extends JmsServerSession { + + public JmsFailingServerSession() { + } + + @Override + public Session getSession() throws JMSException { + throw new JMSException("Something is wrong with me"); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java index 7f9cec5..63756b6 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java @@ -47,6 +47,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.QueueBrowser; +import javax.jms.ServerSessionPool; import javax.jms.Session; import javax.jms.TemporaryTopic; import javax.jms.TextMessage; @@ -56,6 +57,7 @@ import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsDefaultConnectionListener; import org.apache.qpid.jms.JmsOperationTimedOutException; +import org.apache.qpid.jms.JmsQueue; import org.apache.qpid.jms.JmsResourceNotFoundException; import org.apache.qpid.jms.JmsSendTimedOutException; import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; @@ -76,6 +78,7 @@ import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.junit.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1844,6 +1847,74 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { } } + @Test(timeout = 20000) + public void testConnectionConsumerRecreatedAfterReconnect() throws Exception { + try (TestAmqpPeer originalPeer = new TestAmqpPeer(); + TestAmqpPeer finalPeer = new TestAmqpPeer();) { + + ServerSessionPool sessionPool = Mockito.mock(ServerSessionPool.class); + + final CountDownLatch originalConnected = new CountDownLatch(1); + final CountDownLatch finalConnected = new CountDownLatch(1); + + // Create a peer to connect to, then one to reconnect to + final String originalURI = createPeerURI(originalPeer); + final String finalURI = createPeerURI(finalPeer); + + LOG.info("Original peer is at: {}", originalURI); + LOG.info("Final peer is at: {}", finalURI); + + // Connect to the first peer + originalPeer.expectSaslAnonymous(); + originalPeer.expectOpen(); + originalPeer.expectBegin(); + originalPeer.expectReceiverAttach(); + originalPeer.expectLinkFlow(); + originalPeer.dropAfterLastHandler(); + + final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (originalURI.equals(remoteURI.toString())) { + originalConnected.countDown(); + } + } + + @Override + public void onConnectionRestored(URI remoteURI) { + LOG.info("Connection Restored: {}", remoteURI); + if (finalURI.equals(remoteURI.toString())) { + finalConnected.countDown(); + } + } + }); + connection.start(); + + Queue queue = new JmsQueue("myQueue"); + connection.createConnectionConsumer(queue, null, sessionPool, 100); + + assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS)); + + // --- Post Failover Expectations of FinalPeer --- // + + finalPeer.expectSaslAnonymous(); + finalPeer.expectOpen(); + finalPeer.expectBegin(); + finalPeer.expectReceiverAttach(); + finalPeer.expectLinkFlow(); + finalPeer.expectClose(); + + assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS)); + + // Shut it down + connection.close(); + + finalPeer.waitForAllHandlersToComplete(1000); + } + } + private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) throws JMSException { return establishAnonymousConnecton(null, null, peers); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
