Author: tabish
Date: Mon May 19 07:16:13 2008
New Revision: 657834
URL: http://svn.apache.org/viewvc?rev=657834&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-174
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.h
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp?rev=657834&r1=657833&r2=657834&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp
Mon May 19 07:16:13 2008
@@ -138,21 +138,18 @@
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::stop() {
- synchronized( &mutex ) {
+ // We lock here to make sure that we wait until the thread
+ // is done with an internal dispatch operation, otherwise
+ // we might return before that and cause the caller to be
+ // in an inconsistant state.
+ synchronized( &dispatchMutex ) {
if( closed || !started ) {
return;
}
- // Set the state to stopped.
- started = false;
-
- // Wakeup the thread so that it can acknowledge the stop request.
- mutex.notifyAll();
- // Wait for the thread to notify us that it has acknowledged
- // the stop request.
- mutex.wait();
+ synchronized( &mutex ) { started = false; }
}
}
@@ -218,13 +215,8 @@
return;
}
- // When told to stop, the calling thread will wait for a
- // responding notification, indicating that we have
acknowledged
- // the stop command.
- if( !started ) {
- mutex.notifyAll();
- }
-
+ // When stopped we hit this case and wait otherwise
+ // if there are messages we
if( messageQueue.empty() || !started ) {
// Wait for more data or to be woken up.
@@ -248,29 +240,31 @@
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::dispatchAll() {
- // Take out all of the dispatch data currently in the array.
- list<DispatchData> dataList;
- synchronized( &mutex ) {
+ // Dispatch all currently available messages. This lock allows the
+ // main thread to wait while we finish with a dispatch cycle, the
+ // stop method for instance should try and lock this mutex so that
+ // it knows that we've had a chance to read the started flag and
+ // detect that we are stopped, otherwise stop might return while
+ // we are still dispatching messages.
+ synchronized( &dispatchMutex ) {
+
+ // Take out all of the dispatch data currently in the array.
+ list<DispatchData> dataList;
+ synchronized( &mutex ) {
+
+ // If stopped or closed we don't want to start dispatching.
+ if( !started || closed ) {
+ return;
+ }
- // When told to stop, the calling thread will wait for a
- // responding notification, indicating that we have acknowledged
- // the stop command.
- if( !started ) {
- mutex.notifyAll();
+ dataList = messageQueue;
+ messageQueue.clear();
}
- if( !started || closed ) {
- return;
+ list<DispatchData>::iterator iter = dataList.begin();
+ while( iter != dataList.end() ) {
+ DispatchData& data = *iter++;
+ dispatch( data );
}
-
- dataList = messageQueue;
- messageQueue.clear();
- }
-
- // Dispatch all currently available messages.
- list<DispatchData>::iterator iter = dataList.begin();
- while( iter != dataList.end() ) {
- DispatchData& data = *iter++;
- dispatch( data );
}
}
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.h?rev=657834&r1=657833&r2=657834&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.h
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.h
Mon May 19 07:16:13 2008
@@ -45,6 +45,7 @@
std::list<DispatchData> messageQueue;
decaf::lang::Thread* thread;
decaf::util::concurrent::Mutex mutex;
+ decaf::util::concurrent::Mutex dispatchMutex;
bool started;
bool closed;