Author: chirino
Date: Thu Mar 27 19:22:39 2008
New Revision: 642067
URL: http://svn.apache.org/viewvc?rev=642067&view=rev
Log:
- Fixing out of order dispatch.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=642067&r1=642066&r2=642067&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Thu Mar 27 19:22:39 2008
@@ -25,6 +25,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
@@ -107,7 +108,7 @@
private long redeliveryDelay;
private int ackCounter;
private int dispatchedCount;
- private MessageListener messageListener;
+ private final AtomicReference<MessageListener> messageListener = new
AtomicReference<MessageListener>();
private JMSConsumerStatsImpl stats;
private final String selector;
@@ -330,7 +331,7 @@
*/
public MessageListener getMessageListener() throws JMSException {
checkClosed();
- return this.messageListener;
+ return this.messageListener.get();
}
/**
@@ -354,19 +355,20 @@
throw new JMSException(
"Illegal prefetch size of zero. This
setting is not supported for asynchronous consumers please set a value of at
least 1");
}
- this.messageListener = listener;
if (listener != null) {
boolean wasRunning = session.isRunning();
if (wasRunning) {
session.stop();
}
+ this.messageListener.set(listener);
session.redispatch(this, unconsumedMessages);
if (wasRunning) {
session.start();
}
-
+ } else {
+ this.messageListener.set(null);
}
}
@@ -934,7 +936,7 @@
}
public void dispatch(MessageDispatch md) {
- MessageListener listener = this.messageListener;
+ MessageListener listener = this.messageListener.get();
try {
synchronized (unconsumedMessages.getMutex()) {
if (clearDispatchList) {
@@ -1024,7 +1026,7 @@
* @throws JMSException
*/
public boolean iterate() {
- MessageListener listener = this.messageListener;
+ MessageListener listener = this.messageListener.get();
if (listener != null) {
MessageDispatch md = unconsumedMessages.dequeueNoWait();
if (md != null) {