Author: rajdavies
Date: Wed Sep 16 14:04:20 2009
New Revision: 815788
URL: http://svn.apache.org/viewvc?rev=815788&view=rev
Log:
fix some synchonization issues
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java?rev=815788&r1=815787&r2=815788&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java
Wed Sep 16 14:04:20 2009
@@ -28,32 +28,25 @@
public class ConsumerBean extends Assert implements MessageListener {
private static final Log LOG = LogFactory.getLog(ConsumerBean.class);
- private List<Message> messages = new ArrayList<Message>();
- private Object semaphore;
+ private final List<Message> messages = new ArrayList<Message>();
private boolean verbose;
/**
* Constructor.
*/
public ConsumerBean() {
- this(new Object());
- }
-
- /**
- * Constructor, initialized semaphore object.
- *
- * @param semaphore
- */
- public ConsumerBean(Object semaphore) {
- this.semaphore = semaphore;
}
+
/**
* @return all the messages on the list so far, clearing the buffer
*/
- public synchronized List<Message> flushMessages() {
- List<Message> answer = new ArrayList<Message>(messages);
+ public List<Message> flushMessages() {
+ List<Message> answer = null;
+ synchronized(messages) {
+ answer = new ArrayList<Message>(messages);
messages.clear();
+ }
return answer;
}
@@ -62,13 +55,13 @@
*
* @param message
*/
- public synchronized void onMessage(Message message) {
- messages.add(message);
- if (verbose) {
- LOG.info("Received: " + message);
- }
- synchronized (semaphore) {
- semaphore.notifyAll();
+ public void onMessage(Message message) {
+ synchronized (messages) {
+ messages.add(message);
+ if (verbose) {
+ LOG.info("Received: " + message);
+ }
+ messages.notifyAll();
}
}
@@ -82,8 +75,8 @@
try {
if (hasReceivedMessage()) {
- synchronized (semaphore) {
- semaphore.wait(4000);
+ synchronized (messages) {
+ messages.wait(4000);
}
}
} catch (InterruptedException e) {
@@ -100,29 +93,30 @@
* @param messageCount
*/
public void waitForMessagesToArrive(int messageCount) {
- final long maxRemainingMessageCount = Math.max(0, messageCount -
messages.size());
- LOG.info("Waiting for (" + maxRemainingMessageCount + ") message(s) to
arrive");
+ long maxRemainingMessageCount = Math.max(0, messageCount -
messages.size());
+ LOG.info("Waiting for (" + maxRemainingMessageCount + ") message(s) to
arrive");
long start = System.currentTimeMillis();
- for (int i = 0; i < maxRemainingMessageCount; i++) {
+ long maxWaitTime = start + 120 * 1000;
+ while (maxRemainingMessageCount > 0) {
try {
- synchronized (semaphore) {
- semaphore.wait(1000);
+ synchronized (messages) {
+ messages.wait(1000);
}
- if (hasReceivedMessages(messageCount)) {
+ if (hasReceivedMessages(messageCount) ||
System.currentTimeMillis() > maxWaitTime) {
break;
}
} catch (InterruptedException e) {
LOG.info("Caught: " + e);
}
+ maxRemainingMessageCount = Math.max(0, messageCount -
messages.size());
}
long end = System.currentTimeMillis() - start;
-
LOG.info("End of wait for " + end + " millis");
}
public void assertMessagesArrived(int total) {
waitForMessagesToArrive(total);
- synchronized (this) {
+ synchronized (messages) {
int count = messages.size();
assertEquals("Messages received", total, count);
@@ -152,7 +146,9 @@
* @param messageCount
* @return
*/
- protected synchronized boolean hasReceivedMessages(int messageCount) {
- return messages.size() >= messageCount;
+ protected boolean hasReceivedMessages(int messageCount) {
+ synchronized (messages) {
+ return messages.size() >= messageCount;
+ }
}
}