Don't understand why you changed an 'atomic' get and set with a heavy
synchronized(this) ?
On 10 Dec 2008, at 07:16, [EMAIL PROTECTED] wrote:
Author: djencks
Date: Tue Dec 9 23:16:39 2008
New Revision: 725020
URL: http://svn.apache.org/viewvc?rev=725020&view=rev
Log:
AMQ-2028 fix thread safety problem in ActiveMQSessionExecutor
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
ActiveMQSessionExecutor.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/
activemq/ActiveMQSessionExecutor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=725020&r1=725019&r2=725020&view=diff
=
=
=
=
=
=
=
=
======================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
ActiveMQSessionExecutor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
ActiveMQSessionExecutor.java Tue Dec 9 23:16:39 2008
@@ -17,9 +17,7 @@
package org.apache.activemq;
-import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
@@ -44,9 +42,8 @@
private ActiveMQSession session;
private MessageDispatchChannel messageQueue = new
MessageDispatchChannel();
private boolean dispatchedBySessionPool;
- private TaskRunner taskRunner;
+ private volatile TaskRunner taskRunner;
private boolean startedOrWarnedThatNotStarted;
- private AtomicBoolean taskRunnerCreated = new AtomicBoolean();
ActiveMQSessionExecutor(ActiveMQSession session) {
this.session = session;
@@ -90,10 +87,14 @@
if (!dispatchedBySessionPool) {
if (session.isSessionAsyncDispatch()) {
try {
- if (taskRunnerCreated.compareAndSet(false,
true)) {
- if (taskRunner == null) {
- taskRunner =
session.connection.getSessionTaskRunner().createTaskRunner(this,
- "ActiveMQ Session: " +
session.getSessionId());
+ TaskRunner taskRunner = this.taskRunner;
+ if (taskRunner == null) {
+ synchronized (this) {
+ if (this.taskRunner == null) {
+ this.taskRunner =
session.connection.getSessionTaskRunner().createTaskRunner(this,
+ "ActiveMQ Session: " +
session.getSessionId());
+ }
+ taskRunner = this.taskRunner;
}
}
taskRunner.wakeup();
@@ -120,8 +121,7 @@
// TODO - we should use a Map for this indexed by consumerId
- for (Iterator i = this.session.consumers.iterator();
i.hasNext();) {
- ActiveMQMessageConsumer consumer =
(ActiveMQMessageConsumer)i.next();
+ for (ActiveMQMessageConsumer consumer :
this.session.consumers) {
ConsumerId consumerId = message.getConsumerId();
if (consumerId.equals(consumer.getConsumerId())) {
consumer.dispatch(message);
@@ -143,10 +143,10 @@
try {
if (messageQueue.isRunning()) {
messageQueue.stop();
+ TaskRunner taskRunner = this.taskRunner;
if (taskRunner != null) {
+ this.taskRunner = null;
taskRunner.shutdown();
- taskRunner = null;
- taskRunnerCreated.set(false);
}
}
} catch (InterruptedException e) {
@@ -168,7 +168,7 @@
}
MessageDispatch dequeueNoWait() {
- return (MessageDispatch)messageQueue.dequeueNoWait();
+ return messageQueue.dequeueNoWait();
}
protected void clearMessagesInProgress() {
@@ -182,8 +182,7 @@
public boolean iterate() {
// Deliver any messages queued on the consumer to their
listeners.
- for (Iterator i = this.session.consumers.iterator();
i.hasNext();) {
- ActiveMQMessageConsumer consumer =
(ActiveMQMessageConsumer)i.next();
+ for (ActiveMQMessageConsumer consumer :
this.session.consumers) {
if (consumer.iterate()) {
return true;
}