Author: tabish
Date: Tue Mar 19 21:02:24 2013
New Revision: 1458514
URL: http://svn.apache.org/r1458514
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4389
Modified:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java
Modified:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java?rev=1458514&r1=1458513&r2=1458514&view=diff
==============================================================================
---
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java
(original)
+++
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java
Tue Mar 19 21:02:24 2013
@@ -130,9 +130,14 @@ public class AMQ4351Test extends BrokerT
final AtomicLong size = new AtomicLong();
final AtomicBoolean done = new AtomicBoolean();
CountDownLatch doneLatch = new CountDownLatch(1);
+ CountDownLatch started;
+ CountDownLatch finished;
- public ConsumingClient(String name) {
+
+ public ConsumingClient(String name, CountDownLatch started,
CountDownLatch finished) {
this.name = name;
+ this.started = started;
+ this.finished = finished;
}
public void start() {
@@ -141,6 +146,7 @@ public class AMQ4351Test extends BrokerT
}
public void stopAsync() {
+ finished.countDown();
done.set(true);
}
@@ -158,6 +164,7 @@ public class AMQ4351Test extends BrokerT
try {
Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
MessageConsumer consumer =
session.createDurableSubscriber(destination, name, null, false);
+ started.countDown();
while( !done.get() ) {
Message msg = consumer.receive(100);
if(msg!=null ) {
@@ -181,24 +188,28 @@ public class AMQ4351Test extends BrokerT
public void testAMQ4351() throws InterruptedException, JMSException {
LOG.info("Start test.");
+ int subs = 100;
+ CountDownLatch startedLatch = new CountDownLatch(subs - 1);
+ CountDownLatch shutdownLatch = new CountDownLatch(subs - 4);
+
ProducingClient producer = new ProducingClient();
- ConsumingClient listener1 = new ConsumingClient("subscriber-1");
- ConsumingClient listener2 = new ConsumingClient("subscriber-2");
- ConsumingClient listener3 = new ConsumingClient("subscriber-3");
+ ConsumingClient listener1 = new ConsumingClient("subscriber-1",
startedLatch, shutdownLatch);
+ ConsumingClient listener2 = new ConsumingClient("subscriber-2",
startedLatch, shutdownLatch);
+ ConsumingClient listener3 = new ConsumingClient("subscriber-3",
startedLatch, shutdownLatch);
try {
listener1.start();
listener2.start();
listener3.start();
- int subs = 100;
List<ConsumingClient> subscribers = new
ArrayList<ConsumingClient>(subs);
for (int i = 4; i < subs; i++) {
- ConsumingClient client = new ConsumingClient("subscriber-" +
i);
+ ConsumingClient client = new ConsumingClient("subscriber-" +
i, startedLatch, shutdownLatch);
subscribers.add(client);
client.start();
}
+ startedLatch.await(10, TimeUnit.SECONDS);
LOG.info("All subscribers started.");
producer.sendMessage();
@@ -207,12 +218,12 @@ public class AMQ4351Test extends BrokerT
for (ConsumingClient client : subscribers) {
client.stopAsync();
}
+ shutdownLatch.await(10, TimeUnit.SECONDS);
// Start producing messages for 10 minutes, at high rate
LOG.info("Starting mass message producer...");
producer.start();
-
long lastSize = listener1.size.get();
for( int i=0 ; i < 10; i++ ) {
Thread.sleep(1000);