Fixing a few intermittent failures These tests were not as broken as on 2.3.x from the old hornetq branch where this fix originated. However I will play safe here as I believe the race could be after the exception is raised and before the counter was added (on ActiveMQMessageHandlerTest)
Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/4ebacc9d Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/4ebacc9d Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/4ebacc9d Branch: refs/heads/master Commit: 4ebacc9d87e7bc99655dc6475861c2a2c5ce0750 Parents: 0eb1e33 Author: Clebert Suconic <[email protected]> Authored: Wed Jan 21 18:38:27 2015 -0500 Committer: Clebert Suconic <[email protected]> Committed: Wed Jan 21 18:40:19 2015 -0500 ---------------------------------------------------------------------- .../integration/client/ConsumerStuckTest.java | 20 +++++----- .../ra/ActiveMQMessageHandlerTest.java | 40 ++++++++++++++------ 2 files changed, 39 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-6/blob/4ebacc9d/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/ConsumerStuckTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/ConsumerStuckTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/ConsumerStuckTest.java index 5587b27..6ee2924 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/ConsumerStuckTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/ConsumerStuckTest.java @@ -134,16 +134,14 @@ public class ConsumerStuckTest extends ServiceTestBase long timeout = System.currentTimeMillis() + 20000; - while (System.currentTimeMillis() < timeout && server.getSessions().size() != 0) + long timeStart = System.currentTimeMillis(); + + while (timeout > System.currentTimeMillis() && server.getSessions().size() != 0 && server.getConnectionCount() != 0) { Thread.sleep(10); } - System.out.println("Size = " + server.getConnectionCount()); - - System.out.println("sessions = " + server.getSessions().size()); - - + System.out.println("Time = " + System.currentTimeMillis() + " time diff = " + (System.currentTimeMillis() - timeStart) + ", connections Size = " + server.getConnectionCount() + " sessions = " + server.getSessions().size()); if (server.getSessions().size() != 0) { @@ -151,14 +149,16 @@ public class ConsumerStuckTest extends ServiceTestBase fail("The cleanup wasn't able to finish cleaning the session. It's probably stuck, look at the thread dump generated by the test for more information"); } + System.out.println("Size = " + server.getConnectionCount()); - timeout = System.currentTimeMillis() + 20000; + System.out.println("sessions = " + server.getSessions().size()); - while (System.currentTimeMillis() < timeout && server.getConnectionCount() != 0) + + if (server.getSessions().size() != 0) { - Thread.sleep(10); + System.out.println(threadDump("Thread dump")); + fail("The cleanup wasn't able to finish cleaning the session. It's probably stuck, look at the thread dump generated by the test for more information"); } - assertEquals(0, server.getConnectionCount()); } finally http://git-wip-us.apache.org/repos/asf/activemq-6/blob/4ebacc9d/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/ra/ActiveMQMessageHandlerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/ra/ActiveMQMessageHandlerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/ra/ActiveMQMessageHandlerTest.java index cdf90f7..3902470 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/ra/ActiveMQMessageHandlerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/ra/ActiveMQMessageHandlerTest.java @@ -99,7 +99,7 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase spec.setDestination(MDBQUEUE); qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY); CountDownLatch latch = new CountDownLatch(15); - MultipleEndpoints endpoint = new MultipleEndpoints(latch, false); + MultipleEndpoints endpoint = new MultipleEndpoints(latch, null, false); DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false); qResourceAdapter.endpointActivation(endpointFactory, spec); ClientSession session = locator.createSessionFactory().createSession(); @@ -132,7 +132,8 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase spec.setDestination(MDBQUEUE); qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY); CountDownLatch latch = new CountDownLatch(SIZE); - MultipleEndpoints endpoint = new MultipleEndpoints(latch, true); + CountDownLatch latchDone = new CountDownLatch(SIZE); + MultipleEndpoints endpoint = new MultipleEndpoints(latch, latchDone, true); DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false); qResourceAdapter.endpointActivation(endpointFactory, spec); ClientSession session = locator.createSessionFactory().createSession(); @@ -148,6 +149,8 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase qResourceAdapter.endpointDeactivation(endpointFactory, spec); + latchDone.await(5, TimeUnit.SECONDS); + assertEquals(SIZE, endpoint.messages.intValue()); assertEquals(0, endpoint.interrupted.intValue()); @@ -169,7 +172,8 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase spec.setDestination(MDBQUEUE); qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY); CountDownLatch latch = new CountDownLatch(SIZE); - MultipleEndpoints endpoint = new MultipleEndpoints(latch, true); + CountDownLatch latchDone = new CountDownLatch(SIZE); + MultipleEndpoints endpoint = new MultipleEndpoints(latch, latchDone, true); DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false); qResourceAdapter.endpointActivation(endpointFactory, spec); ClientSession session = locator.createSessionFactory().createSession(); @@ -185,6 +189,8 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase qResourceAdapter.endpointDeactivation(endpointFactory, spec); + latchDone.await(5, TimeUnit.SECONDS); + assertEquals(SIZE, endpoint.messages.intValue()); //half onmessage interrupted assertEquals(SIZE / 2, endpoint.interrupted.intValue()); @@ -856,14 +862,16 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase class MultipleEndpoints extends DummyMessageEndpoint { private final CountDownLatch latch; + private final CountDownLatch latchDone; private final boolean pause; AtomicInteger messages = new AtomicInteger(0); AtomicInteger interrupted = new AtomicInteger(0); - public MultipleEndpoints(CountDownLatch latch, boolean pause) + public MultipleEndpoints(CountDownLatch latch, CountDownLatch latchDone, boolean pause) { super(latch); this.latch = latch; + this.latchDone = latchDone; this.pause = pause; } @@ -888,17 +896,27 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase @Override public void onMessage(Message message) { - latch.countDown(); - if (pause && messages.getAndIncrement() % 2 == 0) + try { - try + latch.countDown(); + if (pause && messages.getAndIncrement() % 2 == 0) { - IntegrationTestLogger.LOGGER.info("pausing for 2 secs"); - Thread.sleep(2000); + try + { + System.out.println("pausing for 2 secs"); + Thread.sleep(2000); + } + catch (InterruptedException e) + { + interrupted.incrementAndGet(); + } } - catch (InterruptedException e) + } + finally + { + if (latchDone != null) { - interrupted.getAndIncrement(); + latchDone.countDown(); } } }
