Repository: qpid-jms Updated Branches: refs/heads/master 4b97438f5 -> 6c3734267
QPIDJMS-380 Fix race between session delivery and consumer close Prevent connection consumer from closing while a Session is still in the process of delivering and acknowledging a message to avoid error when the session finally acknowledges it. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/6c373426 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/6c373426 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/6c373426 Branch: refs/heads/master Commit: 6c3734267cc24f51f39a8283cf56ee559bcaa90a Parents: 4b97438 Author: Timothy Bish <[email protected]> Authored: Tue Apr 24 14:05:06 2018 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue Apr 24 14:05:06 2018 -0400 ---------------------------------------------------------------------- .../apache/qpid/jms/JmsConnectionConsumer.java | 77 +++++++++++++++++++- .../java/org/apache/qpid/jms/JmsSession.java | 57 +++------------ .../ConnectionConsumerIntegrationTest.java | 60 ++++++++++++++- 3 files changed, 140 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6c373426/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java index 030a0b6..f336676 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java @@ -16,6 +16,8 @@ */ package org.apache.qpid.jms; +import static org.apache.qpid.jms.message.JmsMessageSupport.lookupAckTypeForDisposition; + import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; @@ -23,7 +25,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; import javax.jms.ConnectionConsumer; import javax.jms.IllegalStateException; @@ -33,8 +38,11 @@ import javax.jms.ServerSessionPool; import javax.jms.Session; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.apache.qpid.jms.message.JmsMessage; import org.apache.qpid.jms.meta.JmsConsumerInfo; import org.apache.qpid.jms.meta.JmsResource.ResourceState; +import org.apache.qpid.jms.policy.JmsRedeliveryPolicy; +import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; import org.apache.qpid.jms.util.MessageQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +63,7 @@ public class JmsConnectionConsumer implements ConnectionConsumer, JmsMessageDisp private final Lock stateLock = new ReentrantLock(); private final Lock dispatchLock = new ReentrantLock(); + private final ReadWriteLock deliveringLock = new ReentrantReadWriteLock(true); private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicReference<Throwable> failureCause = new AtomicReference<>(); private final ScheduledThreadPoolExecutor dispatcher; @@ -131,8 +140,13 @@ public class JmsConnectionConsumer implements ConnectionConsumer, JmsMessageDisp * @throws JMSException if an error occurs during the consumer close operation. */ protected void doClose() throws JMSException { - shutdown(); - this.connection.destroyResource(consumerInfo); + deliveringLock.writeLock().lock(); + try { + shutdown(); + this.connection.destroyResource(consumerInfo); + } finally { + deliveringLock.writeLock().unlock(); + } } protected void shutdown() throws JMSException { @@ -250,7 +264,7 @@ public class JmsConnectionConsumer implements ConnectionConsumer, JmsMessageDisp JmsInboundMessageDispatch envelope = messageQueue.dequeueNoWait(); if (session instanceof JmsSession) { - ((JmsSession) session).enqueueInSession(envelope); + ((JmsSession) session).enqueueInSession(new DeliveryTask(envelope)); } else { LOG.warn("ServerSession provided an unknown JMS Session type to this ConnectionConsumer: {}", session); } @@ -286,4 +300,59 @@ public class JmsConnectionConsumer implements ConnectionConsumer, JmsMessageDisp } } } -} \ No newline at end of file + + private final class DeliveryTask implements Consumer<JmsSession> { + + private final JmsInboundMessageDispatch envelope; + + public DeliveryTask(JmsInboundMessageDispatch envelope) { + this.envelope = envelope; + } + + @Override + public void accept(JmsSession session) { + deliveringLock.readLock().lock(); + + try { + if (closed.get()) { + return; // Message has been released. + } + + JmsMessage copy = null; + + if (envelope.getMessage().isExpired()) { + LOG.trace("{} filtered expired message: {}", envelope.getConsumerId(), envelope); + session.acknowledge(envelope, ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE); + } else if (session.redeliveryExceeded(envelope)) { + LOG.trace("{} filtered message with excessive redelivery count: {}", envelope.getConsumerId(), envelope); + JmsRedeliveryPolicy redeliveryPolicy = envelope.getConsumerInfo().getRedeliveryPolicy(); + session.acknowledge(envelope, lookupAckTypeForDisposition(redeliveryPolicy.getOutcome(envelope.getConsumerInfo().getDestination()))); + } else { + boolean deliveryFailed = false; + + copy = session.acknowledge(envelope, ACK_TYPE.DELIVERED).getMessage().copy(); + + session.clearSessionRecovered(); + + try { + session.getMessageListener().onMessage(copy); + } catch (RuntimeException rte) { + deliveryFailed = true; + } + + if (!session.isSessionRecovered()) { + if (!deliveryFailed) { + session.acknowledge(envelope, ACK_TYPE.ACCEPTED); + } else { + session.acknowledge(envelope, ACK_TYPE.RELEASED); + } + } + } + } catch (Exception e) { + getConnection().onAsyncException(e); + } finally { + deliveringLock.readLock().unlock(); + } + } + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6c373426/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index 6152991..77df75c 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -16,11 +16,10 @@ */ package org.apache.qpid.jms; -import static org.apache.qpid.jms.message.JmsMessageSupport.lookupAckTypeForDisposition; - import java.io.Serializable; import java.lang.reflect.Method; import java.lang.reflect.Modifier; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; import java.util.Iterator; @@ -38,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import javax.jms.BytesMessage; import javax.jms.CompletionListener; @@ -94,8 +94,6 @@ import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; import org.apache.qpid.jms.provider.ProviderFuture; import org.apache.qpid.jms.selector.SelectorParser; import org.apache.qpid.jms.selector.filter.FilterException; -import org.apache.qpid.jms.util.FifoMessageQueue; -import org.apache.qpid.jms.util.MessageQueue; import org.apache.qpid.jms.util.NoOpExecutor; import org.apache.qpid.jms.util.QpidJMSThreadFactory; import org.slf4j.Logger; @@ -117,7 +115,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe private final Map<JmsProducerId, JmsMessageProducer> producers = new ConcurrentHashMap<JmsProducerId, JmsMessageProducer>(); private final Map<JmsConsumerId, JmsMessageConsumer> consumers = new ConcurrentHashMap<JmsConsumerId, JmsMessageConsumer>(); private MessageListener messageListener; - private final MessageQueue sessionQueue = new FifoMessageQueue(16); + + private final java.util.Queue<Consumer<JmsSession>> sessionQueue = new ArrayDeque<>(); + private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean started = new AtomicBoolean(); private final JmsSessionInfo sessionInfo; @@ -728,42 +728,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe throw new RuntimeException(ex); } - JmsInboundMessageDispatch envelope = null; - while ((envelope = sessionQueue.dequeueNoWait()) != null) { - try { - JmsMessage copy = null; - - if (envelope.getMessage().isExpired()) { - LOG.trace("{} filtered expired message: {}", envelope.getConsumerId(), envelope); - acknowledge(envelope, ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE); - } else if (redeliveryExceeded(envelope)) { - LOG.trace("{} filtered message with excessive redelivery count: {}", envelope.getConsumerId(), envelope); - JmsRedeliveryPolicy redeliveryPolicy = envelope.getConsumerInfo().getRedeliveryPolicy(); - acknowledge(envelope, lookupAckTypeForDisposition(redeliveryPolicy.getOutcome(envelope.getConsumerInfo().getDestination()))); - } else { - boolean deliveryFailed = false; - - copy = acknowledge(envelope, ACK_TYPE.DELIVERED).getMessage().copy(); - - clearSessionRecovered(); - - try { - messageListener.onMessage(copy); - } catch (RuntimeException rte) { - deliveryFailed = true; - } - - if (!isSessionRecovered()) { - if (!deliveryFailed) { - acknowledge(envelope, ACK_TYPE.ACCEPTED); - } else { - acknowledge(envelope, ACK_TYPE.RELEASED); - } - } - } - } catch (Exception e) { - getConnection().onException(e); - } + Consumer<JmsSession> dispatcher = null; + while ((dispatcher = sessionQueue.poll()) != null) { + dispatcher.accept(this); } } @@ -1108,8 +1075,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe for (JmsMessageConsumer consumer : consumers.values()) { consumer.start(); } - - sessionQueue.start(); } } @@ -1120,8 +1085,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe consumer.stop(); } - sessionQueue.stop(); - synchronized (sessionInfo) { if (deliveryExecutor != null) { deliveryExecutor.shutdown(); @@ -1423,8 +1386,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe } } - void enqueueInSession(JmsInboundMessageDispatch envelope) { - sessionQueue.enqueue(envelope); + void enqueueInSession(Consumer<JmsSession> dispatcher) { + sessionQueue.add(dispatcher); } //----- Asynchronous Send Helpers ----------------------------------------// http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6c373426/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java index f92357d..30885b8 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java @@ -133,7 +133,63 @@ public class ConnectionConsumerIntegrationTest extends QpidJmsTestCase { } assertTrue("Message didn't arrive in time", messageArrived.await(10, TimeUnit.SECONDS)); - testPeer.waitForAllHandlersToComplete(2000); + + testPeer.expectDetach(true, true, true); + consumer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testPauseInOnMessageAndConsumerClosed() throws Exception { + final CountDownLatch messageArrived = new CountDownLatch(1); + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + + testPeer.expectBegin(); + + // Create a session for our ServerSessionPool to use + Session session = connection.createSession(); + session.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + messageArrived.countDown(); + + LOG.trace("Pausing onMessage to check for race on connection consumer close"); + + // Pause a bit to see if we race consumer close and our own + // message accept attempt by the delivering Session. + try { + TimeUnit.MILLISECONDS.sleep(10); + } catch (InterruptedException e) { + } + + LOG.trace("Paused onMessage to check for race on connection consumer close"); + } + }); + JmsServerSession serverSession = new JmsServerSession(session); + JmsServerSessionPool sessionPool = new JmsServerSessionPool(serverSession); + + // Now the Connection consumer arrives and we give it a message + // to be dispatched to the server session. + DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); + testPeer.expectDispositionThatIsAcceptedAndSettled(); + + Queue queue = new JmsQueue("myQueue"); + ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100); + + connection.start(); + + assertTrue("Message didn't arrive in time", messageArrived.await(10, TimeUnit.SECONDS)); testPeer.expectDetach(true, true, true); consumer.close(); @@ -240,7 +296,6 @@ public class ConnectionConsumerIntegrationTest extends QpidJmsTestCase { connection.start(); assertTrue("Message didn't arrive in time", messagesArrived.await(10, TimeUnit.SECONDS)); - testPeer.waitForAllHandlersToComplete(2000); testPeer.expectDetach(true, true, true); consumer.close(); @@ -303,7 +358,6 @@ public class ConnectionConsumerIntegrationTest extends QpidJmsTestCase { connection.start(); assertTrue("Message didn't arrive in time", messagesArrived.await(10, TimeUnit.SECONDS)); - testPeer.waitForAllHandlersToComplete(2000); testPeer.expectDetach(true, true, true); consumer.close(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
