Author: aconway
Date: Mon May 25 15:51:52 2009
New Revision: 778443
URL: http://svn.apache.org/viewvc?rev=778443&view=rev
Log:
ConsumerImpl optimization - only dispatch on queue if we were notified of
messages.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=778443&r1=778442&r2=778443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon May 25 15:51:52
2009
@@ -257,6 +257,7 @@
msgCredit(0),
byteCredit(0),
notifyEnabled(true),
+ queueHasMessages(true),
syncFrequency(_arguments.getAsInt("qpid.sync_frequency")),
deliveryCount(0) {}
@@ -524,7 +525,7 @@
bool SemanticState::ConsumerImpl::haveCredit()
{
- if (msgCredit) {
+ if (msgCredit && byteCredit) {
return true;
} else {
blocked = true;
@@ -592,7 +593,18 @@
bool SemanticState::ConsumerImpl::doOutput()
{
- return haveCredit() && queue->dispatch(shared_from_this());
+ {
+ Mutex::ScopedLock l(lock);
+ if (!haveCredit() || !queueHasMessages) return false;
+ queueHasMessages = false;
+ }
+ bool moreMessages = queue->dispatch(shared_from_this());
+ {
+ Mutex::ScopedLock l(lock);
+ // queueHasMessages may have been set by a notify() during dispatch()
+ queueHasMessages = queueHasMessages || moreMessages;
+ }
+ return queueHasMessages;
}
void SemanticState::ConsumerImpl::enableNotify()
@@ -614,6 +626,10 @@
void SemanticState::ConsumerImpl::notify()
{
+ {
+ Mutex::ScopedLock l(lock);
+ queueHasMessages = true;
+ }
//TODO: alter this, don't want to hold locks across external
//calls; for now its is required to protect the notify() from
//having part of the object chain of the invocation being
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=778443&r1=778442&r2=778443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Mon May 25 15:51:52 2009
@@ -36,6 +36,7 @@
#include "qpid/framing/Uuid.h"
#include "qpid/sys/AggregateOutput.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/AtomicValue.h"
#include "AclModule.h"
#include <list>
@@ -76,6 +77,8 @@
uint32_t msgCredit;
uint32_t byteCredit;
bool notifyEnabled;
+ // sys::AtomicValue<bool> queueHasMessages;
+ bool queueHasMessages;
const int syncFrequency;
int deliveryCount;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]