Updated Branches: refs/heads/trunk 257710ba1 -> a6d05daba
Make sure there are Topic consumers online before starting to send otherwise they can miss a message and the test fails when it shouldn't Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a6d05dab Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a6d05dab Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a6d05dab Branch: refs/heads/trunk Commit: a6d05daba6135dc3096b906643451950cd577d2f Parents: 257710b Author: Timothy Bish <[email protected]> Authored: Mon Dec 16 14:37:06 2013 -0500 Committer: Timothy Bish <[email protected]> Committed: Mon Dec 16 14:37:06 2013 -0500 ---------------------------------------------------------------------- .../activemq/transport/amqp/AMQ4920Test.java | 56 ++++++++++++-------- 1 file changed, 33 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/a6d05dab/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java index 5d6d473..72f8b11 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java @@ -16,11 +16,13 @@ */ package org.apache.activemq.transport.amqp; -import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -31,21 +33,22 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; +import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AMQ4920Test extends AmqpTestSupport { private static final Logger LOG = LoggerFactory.getLogger(AMQ4920Test.class); private static final Integer ITERATIONS = 1 * 1000; private static final Integer CONSUMER_COUNT = 4; // At least 2 consumers are required to reproduce the original issue public static final String TEXT_MESSAGE = "TextMessage: "; - private CountDownLatch latch = new CountDownLatch(CONSUMER_COUNT * ITERATIONS); + private final CountDownLatch latch = new CountDownLatch(CONSUMER_COUNT * ITERATIONS); + private final CountDownLatch initLatch = new CountDownLatch(CONSUMER_COUNT); + @Override @Before public void setUp() throws Exception { super.setUp(); @@ -54,7 +57,7 @@ public class AMQ4920Test extends AmqpTestSupport { @Test(timeout = 5 * 60 * 1000) public void testSendWithMultipleConsumers() throws Exception { - ConnectionFactory connectionFactory = (ConnectionFactory) new ConnectionFactoryImpl("localhost", port, "admin", "admin"); + ConnectionFactory connectionFactory = new ConnectionFactoryImpl("localhost", port, "admin", "admin"); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); String destinationName = "topic://AMQ4920Test" + System.currentTimeMillis(); @@ -63,11 +66,15 @@ public class AMQ4920Test extends AmqpTestSupport { ExecutorService executor = Executors.newCachedThreadPool(); for (int i=0; i < CONSUMER_COUNT; i++) { - AMQ4930ConsumerTask consumerTask = new AMQ4930ConsumerTask(destinationName, port, "Consumer-" + i, latch, ITERATIONS); + AMQ4930ConsumerTask consumerTask = + new AMQ4930ConsumerTask(initLatch, destinationName, port, "Consumer-" + i, latch, ITERATIONS); executor.submit(consumerTask); } connection.start(); + // Make sure at least Topic consumers are subscribed before the first send. + initLatch.await(); + LOG.debug("At start latch is " + latch.getCount()); sendMessages(connection, destination, ITERATIONS, 10); LOG.debug("After send latch is " + latch.getCount()); @@ -97,16 +104,17 @@ public class AMQ4920Test extends AmqpTestSupport { } } - class AMQ4930ConsumerTask implements Callable<Boolean> { protected static final Logger LOG = LoggerFactory.getLogger(AMQ4930ConsumerTask.class); - private String destinationName; - private String consumerName; - private CountDownLatch messagesReceived; - private int port; - private int expectedMessageCount; - - public AMQ4930ConsumerTask (String destinationName, int port, String consumerName, CountDownLatch latch, int expectedMessageCount) { + private final String destinationName; + private final String consumerName; + private final CountDownLatch messagesReceived; + private final int port; + private final int expectedMessageCount; + private final CountDownLatch started; + + public AMQ4930ConsumerTask (CountDownLatch started, String destinationName, int port, String consumerName, CountDownLatch latch, int expectedMessageCount) { + this.started = started; this.destinationName = destinationName; this.port = port; this.consumerName = consumerName; @@ -119,13 +127,15 @@ class AMQ4930ConsumerTask implements Callable<Boolean> { LOG.debug(consumerName + " starting"); Connection connection=null; try { - ConnectionFactory connectionFactory = (ConnectionFactory) new ConnectionFactoryImpl("localhost", port, "admin", "admin"); + ConnectionFactory connectionFactory = new ConnectionFactoryImpl("localhost", port, "admin", "admin"); connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic(destinationName); MessageConsumer consumer = session.createConsumer(destination); connection.start(); + started.countDown(); + int receivedCount = 0; while(receivedCount < expectedMessageCount) { Message message = consumer.receive(5 * 1000);
