Author: cctrieloff
Date: Mon Nov  9 18:41:36 2009
New Revision: 834172

URL: http://svn.apache.org/viewvc?rev=834172&view=rev
Log:
remove looping for position search and replace with stl algorithms for better 
performance

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h
    qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=834172&r1=834171&r2=834172&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Nov  9 18:41:36 2009
@@ -247,18 +247,18 @@
 {
     Mutex::ScopedLock locker(messageLock);
     QPID_LOG(debug, "Attempting to acquire message at " << position);
-    for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
-        if (i->position == position) {
-            message = *i;
-            if (lastValueQueue) {
-                clearLVQIndex(*i);
-            }
-            QPID_LOG(debug,
-                     "Acquired message at " << i->position << " from " << 
name);
-            messages.erase(i);
-            return true;
-        }
-    }
+    
+    Messages::iterator i = findAt(position); 
+    if (i != messages.end() ) {
+        message = *i;
+        if (lastValueQueue) {
+            clearLVQIndex(*i);
+        }
+        QPID_LOG(debug,
+                 "Acquired message at " << i->position << " from " << name);
+        messages.erase(i);
+        return true;
+    } 
     QPID_LOG(debug, "Could not acquire message at " << position << " from " << 
name << "; no message at that position");
     return false;
 }
@@ -266,21 +266,21 @@
 bool Queue::acquire(const QueuedMessage& msg) {
     Mutex::ScopedLock locker(messageLock);
     QPID_LOG(debug, "attempting to acquire " << msg.position);
-    for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
-        if ((i->position == msg.position && !lastValueQueue) // note that in 
some cases payload not be set
-            || (lastValueQueue && (i->position == msg.position) && 
-                msg.payload.get() == checkLvqReplace(*i).payload.get()) )  {
-
-            clearLVQIndex(msg);
-            QPID_LOG(debug,
-                     "Match found, acquire succeeded: " <<
-                     i->position << " == " << msg.position);
-            messages.erase(i);
-            return true;
-        } else {
-            QPID_LOG(debug, "No match: " << i->position << " != " << 
msg.position);
-        }
+    Messages::iterator i = findAt(msg.position); 
+    if ((i != messages.end() && !lastValueQueue) // note that in some cases 
payload not be set
+        || (lastValueQueue && (i->position == msg.position) && 
+            msg.payload.get() == checkLvqReplace(*i).payload.get()) )  {
+
+        clearLVQIndex(msg);
+        QPID_LOG(debug,
+                 "Match found, acquire succeeded: " <<
+                 i->position << " == " << msg.position);
+        messages.erase(i);
+        return true;
+    } else {
+        QPID_LOG(debug, "No match: " << i->position << " != " << msg.position);
     }
+    
     QPID_LOG(debug, "Acquire failed for " << msg.position);
     return false;
 }
@@ -449,19 +449,35 @@
     return false;
 }
 
-namespace {
-struct PositionEquals {
-    SequenceNumber pos;
-    PositionEquals(SequenceNumber p) : pos(p) {}
-    bool operator()(const QueuedMessage& msg) const { return msg.position == 
pos; }
-};
-}// namespace
+Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
+
+    if(!messages.empty()){
+        QueuedMessage compM;
+        compM.position = pos;
+        unsigned long diff = pos.getValue() - 
messages.front().position.getValue();
+        long maxEnd = diff < messages.size()? diff : messages.size();
+
+        Messages::iterator i = 
lower_bound(messages.begin(),messages.begin()+maxEnd,compM); 
+        if (i!= messages.end() && i->position == pos)
+            return i;
+    }    
+    return messages.end(); // no match found.
+}
+
 
 QueuedMessage Queue::find(SequenceNumber pos) const {
+
     Mutex::ScopedLock locker(messageLock);
-    Messages::const_iterator i = std::find_if(messages.begin(), 
messages.end(), PositionEquals(pos));
-    if (i != messages.end())
-        return *i;
+    if(!messages.empty()){
+        QueuedMessage compM;
+        compM.position = pos;
+        unsigned long diff = pos.getValue() - 
messages.front().position.getValue();
+        long maxEnd = diff < messages.size()? diff : messages.size();
+
+        Messages::const_iterator i = 
lower_bound(messages.begin(),messages.begin()+maxEnd,compM); 
+        if (i != messages.end())
+            return *i;
+    }
     return QueuedMessage();
 }
 
@@ -646,10 +662,9 @@
 }
 
 /** function only provided for unit tests, or code not in critical message 
path */
-uint32_t Queue::getMessageCount() const
+uint32_t Queue::getEnqueueCompleteMessageCount() const
 {
     Mutex::ScopedLock locker(messageLock);
-  
     uint32_t count = 0;
     for ( Messages::const_iterator i = messages.begin(); i != messages.end(); 
++i ) {
         //NOTE: don't need to use checkLvqReplace() here as it
@@ -661,6 +676,12 @@
     return count;
 }
 
+uint32_t Queue::getMessageCount() const
+{
+    Mutex::ScopedLock locker(messageLock);
+    return messages.size();
+}
+
 uint32_t Queue::getConsumerCount() const
 {
     Mutex::ScopedLock locker(consumerLock);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=834172&r1=834171&r2=834172&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Nov  9 18:41:36 2009
@@ -148,6 +148,8 @@
                     }
                 }
             }
+            
+            Messages::iterator findAt(framing::SequenceNumber pos);
 
         public:
 
@@ -221,6 +223,7 @@
             uint32_t move(const Queue::shared_ptr destq, uint32_t qty); 
 
             QPID_BROKER_EXTERN uint32_t getMessageCount() const;
+            QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
             QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
             inline const string& getName() const { return name; }
             bool isExclusiveOwner(const OwnershipToken* const o) const;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h?rev=834172&r1=834171&r2=834172&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h Mon Nov  9 18:41:36 2009
@@ -38,7 +38,9 @@
     QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, 
framing::SequenceNumber sn) : 
         payload(msg), position(sn), queue(q) {}
     QueuedMessage(Queue* q) : queue(q) {}
+    
 };
+    inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { 
return a.position < b.position; } 
 
 }}
 

Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=834172&r1=834171&r2=834172&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Mon Nov  9 18:41:36 2009
@@ -120,9 +120,10 @@
     queue->process(msg1);
     sleep(2);
     uint32_t compval=0;
-    BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
+    BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
     msg1->enqueueComplete();
     compval=1;
+    BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
     BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
 }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to