Repository: qpid-jms Updated Branches: refs/heads/master fe4aa6ed9 -> 816773722
QPIDJMS-389 Signal the correct producer completion only Ensure that if more than one producer exists on a session that the remote close of one producer does not trigger completions for the other producer that is waiting for its send to complete. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/81677372 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/81677372 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/81677372 Branch: refs/heads/master Commit: 816773722df3abb23d1bca81aa64f353e57c0a02 Parents: fe4aa6e Author: Timothy Bish <[email protected]> Authored: Thu Jun 7 18:47:40 2018 -0400 Committer: Timothy Bish <[email protected]> Committed: Thu Jun 7 18:47:40 2018 -0400 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsSession.java | 22 ++- .../integration/ProducerIntegrationTest.java | 176 ++++++++++++++++++- 2 files changed, 187 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/81677372/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index 782050e..b8c88f6 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -1414,20 +1414,24 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe while (pending.hasNext()) { SendCompletion completion = pending.next(); - if (!completion.hasCompleted()) { - if (producerId == null || producerId.equals(completion.envelope.getProducerId())) { + if (producerId == null || producerId.equals(completion.envelope.getProducerId())) { + if (!completion.hasCompleted()) { completion.markAsFailed(failureCause); } - } - try { - completion.signalCompletion(); - } catch (Throwable error) { - LOG.trace("Signaled completion of send: {}", completion.envelope); + try { + completion.signalCompletion(); + } catch (Throwable error) { + } finally { + LOG.trace("Signaled completion of send: {}", completion.envelope); + } } } - asyncSendQueue.clear(); + // Only clear on non-discriminating variant to avoid losing track of completions. + if (producerId == null) { + asyncSendQueue.clear(); + } } } @@ -1530,6 +1534,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe } public void signalCompletion() { + envelope.getMessage().onSendComplete(); // Ensure message is returned as readable. + if (failureCause == null) { listener.onCompletion(envelope.getMessage()); } else { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/81677372/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java index 7cda887..8a8a96c 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java @@ -2397,7 +2397,6 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { @Override public void onCompletion(Message message) { - try { session.close(); } catch (JMSException jmsEx) { @@ -2761,7 +2760,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { MessageProducer producer = session.createProducer(queue); int delay = 0; - if(deliveryDelay) { + if (deliveryDelay) { delay = 123456; producer.setDeliveryDelay(delay); } @@ -2777,7 +2776,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); - if(deliveryDelay) { + if (deliveryDelay) { msgAnnotationsMatcher.withEntry(DELIVERY_TIME, inRange); } messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); @@ -2803,6 +2802,177 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { } } + @Test(timeout = 20000) + public void testRemotelyCloseOneProducerDoesNotCompleteAsyncSendFromAnotherProducer() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + + testPeer.expectBegin(); + testPeer.expectSenderAttach(); + testPeer.expectSenderAttach(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + MessageProducer producer = session.createProducer(queue); + + // Create a second producer which allows for a safe wait for credit for the + // first producer without the need for a sleep. Otherwise the first producer + // might not do an actual async send due to not having received credit yet. + session.createProducer(queue); + + Message message = session.createTextMessage("content"); + message.setIntProperty("test", 1); + + testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher()); + + // This closes link for the second producer we created, not the one that we + // will use to send a message. + testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true); + + assertNull("Should not yet have a JMSDestination", message.getJMSDestination()); + + TestJmsCompletionListener listener = new TestJmsCompletionListener(); + try { + producer.send(message, listener); + } catch (JMSException e) { + LOG.warn("Caught unexpected error: {}", e.getMessage()); + fail("No expected exception for this send."); + } + + testPeer.waitForAllHandlersToComplete(100); + + assertFalse("Should not get async callback", listener.awaitCompletion(10, TimeUnit.MILLISECONDS)); + + // Closing the session should complete the send with an exception + testPeer.expectEnd(); + session.close(); + + assertTrue("Did not get async callback", listener.awaitCompletion(5, TimeUnit.SECONDS)); + assertNotNull(listener.exception); + assertNotNull(listener.message); + assertTrue(listener.message instanceof TextMessage); + + // Message should be readable + assertNotNull("Should have a readable JMSDestination", message.getJMSDestination()); + assertEquals("Message body not as expected", "content", ((TextMessage) message).getText()); + assertEquals("Message property not as expected", 1, message.getIntProperty("test")); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(2000); + } + } + + @Test(timeout = 20000) + public void testRemotelyCloseProducerAndAttemptAsyncCompletionSendThrowsAndLeavesMessageReadable() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + + final CountDownLatch producerClosed = new CountDownLatch(1); + + connection.addConnectionListener(new JmsDefaultConnectionListener() { + + @Override + public void onProducerClosed(MessageProducer producer, Throwable cause) { + producerClosed.countDown(); + } + }); + + testPeer.expectBegin(); + testPeer.expectSenderAttach(); + testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + MessageProducer producer = session.createProducer(queue); + Message message = session.createTextMessage("content"); + message.setIntProperty("test", 1); + + assertNull("Should not yet have a JMSDestination", message.getJMSDestination()); + + testPeer.waitForAllHandlersToComplete(100); + + assertTrue("Producer should have been closed", producerClosed.await(2, TimeUnit.SECONDS)); + + TestJmsCompletionListener listener = new TestJmsCompletionListener(); + try { + producer.send(message, listener); + fail("No expected exception for this send."); + } catch (JMSException e) { + LOG.warn("Caught unexpected error: {}", e.getMessage()); + } + + assertFalse("Should not get async callback", listener.awaitCompletion(5, TimeUnit.MILLISECONDS)); + + // Message should be readable but not carry a destination as it wasn't actually sent anywhere + assertNull("Should not have a readable JMSDestination", message.getJMSDestination()); + assertEquals("Message body not as expected", "content", ((TextMessage) message).getText()); + assertEquals("Message property not as expected", 1, message.getIntProperty("test")); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(2000); + } + } + + @Test(timeout = 20000) + public void testRemotelyCloseSessionAndAttemptAsyncCompletionSendThrowsAndLeavesMessageReadable() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + + final CountDownLatch sessionClosed = new CountDownLatch(1); + + connection.addConnectionListener(new JmsDefaultConnectionListener() { + + @Override + public void onSessionClosed(Session session, Throwable cause) { + sessionClosed.countDown(); + } + }); + + testPeer.expectBegin(); + testPeer.expectSenderAttach(); + testPeer.remotelyEndLastOpenedSession(true); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + MessageProducer producer = session.createProducer(queue); + Message message = session.createTextMessage("content"); + message.setIntProperty("test", 1); + + assertNull("Should not yet have a JMSDestination", message.getJMSDestination()); + + testPeer.waitForAllHandlersToComplete(100); + + assertTrue("Session should have been closed", sessionClosed.await(2, TimeUnit.SECONDS)); + + TestJmsCompletionListener listener = new TestJmsCompletionListener(); + try { + producer.send(message, listener); + fail("No expected exception for this send."); + } catch (JMSException e) { + LOG.warn("Caught unexpected error: {}", e.getMessage()); + } + + assertFalse("Should not get async callback", listener.awaitCompletion(5, TimeUnit.MILLISECONDS)); + + // Message should be readable but not carry a destination as it wasn't actually sent anywhere + assertNull("Should not have a readable JMSDestination", message.getJMSDestination()); + assertEquals("Message body not as expected", "content", ((TextMessage) message).getText()); + assertEquals("Message property not as expected", 1, message.getIntProperty("test")); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(2000); + } + } + private class TestJmsCompletionListener implements CompletionListener { private final CountDownLatch completed; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
