Author: aconway
Date: Fri Oct  9 19:37:14 2009
New Revision: 823669

URL: http://svn.apache.org/viewvc?rev=823669&view=rev
Log:
Fix race condition in enqueue/dequeue callbacks.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=823669&r1=823668&r2=823669&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Oct  9 19:37:14 2009
@@ -389,28 +389,36 @@
 }
 
 void Message::allEnqueuesComplete() {
-    MessageCallback* cb = 0;
-    {
-        sys::Mutex::ScopedLock l(lock);
-        std::swap(cb, enqueueCallback);
-    }
+    sys::Mutex::ScopedLock l(callbackLock);
+    MessageCallback* cb = enqueueCallback;
     if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
 }
 
 void Message::allDequeuesComplete() {
-    MessageCallback* cb = 0;
-    {
-        sys::Mutex::ScopedLock l(lock);
-        std::swap(cb, dequeueCallback);
-    }
+    sys::Mutex::ScopedLock l(callbackLock);
+    MessageCallback* cb = dequeueCallback;
     if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
 }
 
-void Message::setEnqueueCompleteCallback(MessageCallback& cb) { 
enqueueCallback = &cb; }
-void Message::resetEnqueueCompleteCallback() { enqueueCallback = 0; }
+void Message::setEnqueueCompleteCallback(MessageCallback& cb) {
+    sys::Mutex::ScopedLock l(callbackLock);
+    enqueueCallback = &cb;
+}
+
+void Message::resetEnqueueCompleteCallback() {
+    sys::Mutex::ScopedLock l(callbackLock);
+    enqueueCallback = 0;
+}
 
-void Message::setDequeueCompleteCallback(MessageCallback& cb) { 
dequeueCallback = &cb; }
-void Message::resetDequeueCompleteCallback() { dequeueCallback = 0; }
+void Message::setDequeueCompleteCallback(MessageCallback& cb) {
+    sys::Mutex::ScopedLock l(callbackLock);
+    dequeueCallback = &cb;
+}
+
+void Message::resetDequeueCompleteCallback() {
+    sys::Mutex::ScopedLock l(callbackLock);
+    dequeueCallback = 0;
+}
 
 framing::FieldTable& Message::getOrInsertHeaders()
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=823669&r1=823668&r2=823669&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Oct  9 19:37:14 2009
@@ -184,8 +184,11 @@
 
     mutable Replacement replacement;
     mutable boost::intrusive_ptr<Message> empty;
+
+    sys::Mutex callbackLock;
     MessageCallback* enqueueCallback;
     MessageCallback* dequeueCallback;
+
     uint32_t requiredCredit;
     static std::string updateDestination;
 };



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to