Repository: qpid-jms Updated Branches: refs/heads/master da0d4e50a -> a307df8bc
QPIDJMS-376: add related test (and verify exception listener isnt called for regular close) and restore timeout on an existing test Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/a307df8b Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/a307df8b Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/a307df8b Branch: refs/heads/master Commit: a307df8bcb24f29760cb68c51f92f420396c1c3a Parents: da0d4e5 Author: Robbie Gemmell <rob...@apache.org> Authored: Wed Apr 11 13:23:33 2018 +0100 Committer: Robbie Gemmell <rob...@apache.org> Committed: Wed Apr 11 13:23:33 2018 +0100 ---------------------------------------------------------------------- .../integration/ConsumerIntegrationTest.java | 63 +++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a307df8b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java index c573972..9885200 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java @@ -1048,7 +1048,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase { } } - @Test // (timeout=20000) + @Test(timeout=20000) public void testMessageListenerCallsConnectionStopThrowsIllegalStateException() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference<Exception> asyncError = new AtomicReference<Exception>(null); @@ -1163,6 +1163,67 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase { } } + @Test(timeout=20000) + public void testMessageListenerClosesItsConsumer() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch exceptionListenerFired = new CountDownLatch(1); + final AtomicReference<Throwable> error = new AtomicReference<>(); + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + LOG.trace("JMS ExceptionListener: ", exception); + exceptionListenerFired.countDown(); + } + }); + + testPeer.expectBegin(); + + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue destination = session.createQueue(getTestName()); + connection.start(); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1); + + MessageConsumer consumer = session.createConsumer(destination); + + testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH -1))); + testPeer.expectDisposition(true, new AcceptedMatcher()); + testPeer.expectDetach(true, true, true); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message m) { + try { + consumer.close(); + } catch (Throwable t) { + error.set(t); + LOG.error("Unexpected error during close", t); + } + + latch.countDown(); + LOG.debug("Async consumer got Message: {}", m); + } + }); + + assertTrue("Process not completed within given timeout", latch.await(3000, TimeUnit.MILLISECONDS)); + assertNull("No error expected during close", error.get()); + + testPeer.waitForAllHandlersToComplete(2000); + + testPeer.expectClose(); + connection.close(); + + assertFalse("JMS Exception listener shouldn't have fired", exceptionListenerFired.await(20, TimeUnit.MILLISECONDS)); + testPeer.waitForAllHandlersToComplete(2000); + } + } + @Repeat(repetitions = 1) @Test(timeout=20000) public void testRecoverOrderingWithAsyncConsumer() throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org