Author: aconway
Date: Mon May 25 15:51:52 2009
New Revision: 778443

URL: http://svn.apache.org/viewvc?rev=778443&view=rev
Log:
ConsumerImpl optimization - only dispatch on queue if we were notified of 
messages.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=778443&r1=778442&r2=778443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon May 25 15:51:52 
2009
@@ -257,6 +257,7 @@
     msgCredit(0), 
     byteCredit(0),
     notifyEnabled(true),
+    queueHasMessages(true),
     syncFrequency(_arguments.getAsInt("qpid.sync_frequency")),
     deliveryCount(0) {}
 
@@ -524,7 +525,7 @@
 
 bool SemanticState::ConsumerImpl::haveCredit()
 {
-    if (msgCredit) {
+    if (msgCredit && byteCredit) {
         return true;
     } else {
         blocked = true;
@@ -592,7 +593,18 @@
 
 bool SemanticState::ConsumerImpl::doOutput()
 {
-    return haveCredit() && queue->dispatch(shared_from_this());
+    {
+        Mutex::ScopedLock l(lock);
+        if (!haveCredit() || !queueHasMessages) return false;
+        queueHasMessages = false;
+    }
+    bool moreMessages = queue->dispatch(shared_from_this());
+    {
+        Mutex::ScopedLock l(lock);
+        // queueHasMessages may have been set by a notify() during dispatch()
+        queueHasMessages = queueHasMessages || moreMessages;
+    }
+    return queueHasMessages;
 }
 
 void SemanticState::ConsumerImpl::enableNotify()
@@ -614,6 +626,10 @@
 
 void SemanticState::ConsumerImpl::notify()
 {
+    {
+        Mutex::ScopedLock l(lock);
+        queueHasMessages = true;
+    }
     //TODO: alter this, don't want to hold locks across external
     //calls; for now its is required to protect the notify() from
     //having part of the object chain of the invocation being

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=778443&r1=778442&r2=778443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Mon May 25 15:51:52 2009
@@ -36,6 +36,7 @@
 #include "qpid/framing/Uuid.h"
 #include "qpid/sys/AggregateOutput.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/sys/AtomicValue.h"
 #include "AclModule.h"
 
 #include <list>
@@ -76,6 +77,8 @@
         uint32_t msgCredit;
         uint32_t byteCredit;
         bool notifyEnabled;
+        //        sys::AtomicValue<bool> queueHasMessages;
+        bool queueHasMessages;
         const int syncFrequency;
         int deliveryCount;
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to