Author: dejanb
Date: Fri Jan 30 14:31:08 2009
New Revision: 739292
URL: http://svn.apache.org/viewvc?rev=739292&view=rev
Log:
additional fix for http://issues.apache.org/activemq/browse/AMQ-2016
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=739292&r1=739291&r2=739292&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Jan 30 14:31:08 2009
@@ -233,12 +233,13 @@
// set a flag if this is a first consumer
if (consumers.size() == 0) {
firstConsumer = true;
+ if (consumersBeforeDispatchStarts != 0) {
+ consumersBeforeStartsLatch = new
CountDownLatch(consumersBeforeDispatchStarts - 1);
+ }
} else {
- firstConsumer = false;
- }
-
- if (consumersBeforeStartsLatch != null) {
- consumersBeforeStartsLatch.countDown();
+ if (consumersBeforeStartsLatch != null) {
+ consumersBeforeStartsLatch.countDown();
+ }
}
addToConsumerList(sub);
@@ -647,7 +648,6 @@
public void setConsumersBeforeDispatchStarts(int
consumersBeforeDispatchStarts) {
this.consumersBeforeDispatchStarts =
consumersBeforeDispatchStarts;
- consumersBeforeStartsLatch = new
CountDownLatch(consumersBeforeDispatchStarts);
}
// Implementation methods
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java?rev=739292&r1=739291&r2=739292&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java
Fri Jan 30 14:31:08 2009
@@ -85,6 +85,7 @@
// Setup a destination policy where it takes only 1 message at a time.
PolicyMap policyMap = new PolicyMap();
PolicyEntry policy = new PolicyEntry();
+ log.info("testing with consumersBeforeDispatchStarts=" +
consumersBeforeDispatchStarts + " and timeBeforeDispatchStarts=" +
timeBeforeDispatchStarts);
policy.setConsumersBeforeDispatchStarts(consumersBeforeDispatchStarts);
policy.setTimeBeforeDispatchStarts(timeBeforeDispatchStarts);
policyMap.setDefaultEntry(policy);
@@ -109,7 +110,7 @@
public void testDelayedDirectConnectionListener() throws Exception {
- for(int i = 0; i < 10; i++) {
+ for(int i = 0; i < 10; i++) {
Message msga = session.createTextMessage("hello a");
msga.setStringProperty("JMSXGroupID", "A");
producer.send(msga);
@@ -153,8 +154,10 @@
for (String worker: messageCount.keySet()) {
log.info("worker " + worker + " received " + messageCount.get(worker) +
" messages from groups " + messageGroups.get(worker));
- assertEquals(10, messageCount.get(worker).intValue());
- assertEquals(1, messageGroups.get(worker).size());
+ assertEquals("worker " + worker + " received " +
messageCount.get(worker) + " messages from groups " + messageGroups.get(worker)
+ , 10, messageCount.get(worker).intValue());
+ assertEquals("worker " + worker + " received " +
messageCount.get(worker) + " messages from groups " + messageGroups.get(worker)
+ , 1, messageGroups.get(worker).size());
}
}