David - ignore me - missed the taskRunner.wakeup() call ... :) rajdavies wrote: > > Don't understand why you changed an 'atomic' get and set with a heavy > synchronized(this) ? > > On 10 Dec 2008, at 07:16, [EMAIL PROTECTED] wrote: > >> Author: djencks >> Date: Tue Dec 9 23:16:39 2008 >> New Revision: 725020 >> >> URL: http://svn.apache.org/viewvc?rev=725020&view=rev >> Log: >> AMQ-2028 fix thread safety problem in ActiveMQSessionExecutor >> >> Modified: >> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ >> ActiveMQSessionExecutor.java >> >> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/ >> activemq/ActiveMQSessionExecutor.java >> URL: >> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=725020&r1=725019&r2=725020&view=diff >> = >> = >> = >> = >> = >> = >> = >> = >> ====================================================================== >> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ >> ActiveMQSessionExecutor.java (original) >> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ >> ActiveMQSessionExecutor.java Tue Dec 9 23:16:39 2008 >> @@ -17,9 +17,7 @@ >> >> package org.apache.activemq; >> >> -import java.util.Iterator; >> import java.util.List; >> -import java.util.concurrent.atomic.AtomicBoolean; >> >> import javax.jms.JMSException; >> >> @@ -44,9 +42,8 @@ >> private ActiveMQSession session; >> private MessageDispatchChannel messageQueue = new >> MessageDispatchChannel(); >> private boolean dispatchedBySessionPool; >> - private TaskRunner taskRunner; >> + private volatile TaskRunner taskRunner; >> private boolean startedOrWarnedThatNotStarted; >> - private AtomicBoolean taskRunnerCreated = new AtomicBoolean(); >> >> ActiveMQSessionExecutor(ActiveMQSession session) { >> this.session = session; >> @@ -90,10 +87,14 @@ >> if (!dispatchedBySessionPool) { >> if (session.isSessionAsyncDispatch()) { >> try { >> - if (taskRunnerCreated.compareAndSet(false, >> true)) { >> - if (taskRunner == null) { >> - taskRunner = >> session.connection.getSessionTaskRunner().createTaskRunner(this, >> - "ActiveMQ Session: " + >> session.getSessionId()); >> + TaskRunner taskRunner = this.taskRunner; >> + if (taskRunner == null) { >> + synchronized (this) { >> + if (this.taskRunner == null) { >> + this.taskRunner = >> session.connection.getSessionTaskRunner().createTaskRunner(this, >> + "ActiveMQ Session: " + >> session.getSessionId()); >> + } >> + taskRunner = this.taskRunner; >> } >> } >> taskRunner.wakeup(); >> @@ -120,8 +121,7 @@ >> >> // TODO - we should use a Map for this indexed by consumerId >> >> - for (Iterator i = this.session.consumers.iterator(); >> i.hasNext();) { >> - ActiveMQMessageConsumer consumer = >> (ActiveMQMessageConsumer)i.next(); >> + for (ActiveMQMessageConsumer consumer : >> this.session.consumers) { >> ConsumerId consumerId = message.getConsumerId(); >> if (consumerId.equals(consumer.getConsumerId())) { >> consumer.dispatch(message); >> @@ -143,10 +143,10 @@ >> try { >> if (messageQueue.isRunning()) { >> messageQueue.stop(); >> + TaskRunner taskRunner = this.taskRunner; >> if (taskRunner != null) { >> + this.taskRunner = null; >> taskRunner.shutdown(); >> - taskRunner = null; >> - taskRunnerCreated.set(false); >> } >> } >> } catch (InterruptedException e) { >> @@ -168,7 +168,7 @@ >> } >> >> MessageDispatch dequeueNoWait() { >> - return (MessageDispatch)messageQueue.dequeueNoWait(); >> + return messageQueue.dequeueNoWait(); >> } >> >> protected void clearMessagesInProgress() { >> @@ -182,8 +182,7 @@ >> public boolean iterate() { >> >> // Deliver any messages queued on the consumer to their >> listeners. >> - for (Iterator i = this.session.consumers.iterator(); >> i.hasNext();) { >> - ActiveMQMessageConsumer consumer = >> (ActiveMQMessageConsumer)i.next(); >> + for (ActiveMQMessageConsumer consumer : >> this.session.consumers) { >> if (consumer.iterate()) { >> return true; >> } >> >> > > >
-- View this message in context: http://www.nabble.com/Re%3A-svn-commit%3A-r725020----activemq-trunk-activemq-core-src-main-java-org-apache-activemq-ActiveMQSessionExecutor.java-tp20935620p20936633.html Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
