Hi Carl, This patch looks great, I will update the selector patch later. Chenta
On Sat, Nov 7, 2009 at 3:30 AM, Carl Trieloff <[email protected]> wrote: > > I created a patch which seems to work well, it targets querying the queue, > count, acquire making the > queue access faster for large queues (best 1 {if no requeue or acquire}, > worst case binary-search) . In > most cases if it faster then binary search even if requeue or selector is > used. > > It does require that the re-queue order be corrected - which should be done > regardless. > > The remaining function that could use some similar dressing would be > Queue::seek() > > Any thoughts on the patch... This patch opens the way for reasonable > selector performance. > Carl. > > > Index: qpid/broker/Queue.cpp > =================================================================== > --- qpid/broker/Queue.cpp (revision 833135) > +++ qpid/broker/Queue.cpp (working copy) > @@ -243,18 +243,18 @@ > { > Mutex::ScopedLock locker(messageLock); > QPID_LOG(debug, "Attempting to acquire message at " << position); > - for (Messages::iterator i = messages.begin(); i != messages.end(); > i++) { > - if (i->position == position) { > - message = *i; > - if (lastValueQueue) { > - clearLVQIndex(*i); > - } > - QPID_LOG(debug, > - "Acquired message at " << i->position << " from " << > name); > - messages.erase(i); > - return true; > + > + Messages::iterator i = findAt(position); > + if (i != messages.end() ) { > + message = *i; > + if (lastValueQueue) { > + clearLVQIndex(*i); > } > - } > + QPID_LOG(debug, > + "Acquired message at " << i->position << " from " << > name); > + messages.erase(i); > + return true; > + } > QPID_LOG(debug, "Could not acquire message at " << position << " from " > << name << "; no message at that position"); > return false; > } > @@ -262,21 +262,21 @@ > bool Queue::acquire(const QueuedMessage& msg) { > Mutex::ScopedLock locker(messageLock); > QPID_LOG(debug, "attempting to acquire " << msg.position); > - for (Messages::iterator i = messages.begin(); i != messages.end(); > i++) { > - if ((i->position == msg.position && !lastValueQueue) // note that > in some cases payload not be set > - || (lastValueQueue && (i->position == msg.position) && > - msg.payload.get() == checkLvqReplace(*i).payload.get()) ) > { > + Messages::iterator i = findAt(msg.position); > + if ((i != messages.end() && !lastValueQueue) // note that in some > cases payload not be set > + || (lastValueQueue && (i->position == msg.position) && > + msg.payload.get() == checkLvqReplace(*i).payload.get()) ) { > > - clearLVQIndex(msg); > - QPID_LOG(debug, > - "Match found, acquire succeeded: " << > - i->position << " == " << msg.position); > - messages.erase(i); > - return true; > - } else { > - QPID_LOG(debug, "No match: " << i->position << " != " << > msg.position); > - } > + clearLVQIndex(msg); > + QPID_LOG(debug, > + "Match found, acquire succeeded: " << > + i->position << " == " << msg.position); > + messages.erase(i); > + return true; > + } else { > + QPID_LOG(debug, "No match: " << i->position << " != " << > msg.position); > } > + > QPID_LOG(debug, "Acquire failed for " << msg.position); > return false; > } > @@ -445,19 +445,35 @@ > return false; > } > > -namespace { > -struct PositionEquals { > - SequenceNumber pos; > - PositionEquals(SequenceNumber p) : pos(p) {} > - bool operator()(const QueuedMessage& msg) const { return msg.position > == pos; } > -}; > -}// namespace > +Queue::Messages::iterator Queue::findAt(SequenceNumber pos) { > > + if(!messages.empty()){ > + QueuedMessage compM; > + compM.position = pos; > + unsigned long diff = pos.getValue() - > messages.front().position.getValue(); > + long maxEnd = diff < messages.size()? diff : messages.size(); > + > + Messages::iterator i = > lower_bound(messages.begin(),messages.begin()+maxEnd,compM); > + if (i->position == pos) > + return i; > + } > + return messages.end(); // no match found. > +} > + > + > QueuedMessage Queue::find(SequenceNumber pos) const { > + > Mutex::ScopedLock locker(messageLock); > - Messages::const_iterator i = std::find_if(messages.begin(), > messages.end(), PositionEquals(pos)); > - if (i != messages.end()) > - return *i; > + if(!messages.empty()){ > + QueuedMessage compM; > + compM.position = pos; > + unsigned long diff = pos.getValue() - > messages.front().position.getValue(); > + long maxEnd = diff < messages.size()? diff : messages.size(); > + > + Messages::const_iterator i = > lower_bound(messages.begin(),messages.begin()+maxEnd,compM); > + if (i != messages.end()) > + return *i; > + } > return QueuedMessage(); > } > > @@ -642,10 +658,9 @@ > } > > /** function only provided for unit tests, or code not in critical message > path */ > -uint32_t Queue::getMessageCount() const > +uint32_t Queue::getEnqueueCompleteMessageCount() const > { > Mutex::ScopedLock locker(messageLock); > - > uint32_t count = 0; > for ( Messages::const_iterator i = messages.begin(); i != > messages.end(); ++i ) { > //NOTE: don't need to use checkLvqReplace() here as it > @@ -657,6 +672,12 @@ > return count; > } > > +uint32_t Queue::getMessageCount() const > +{ > + Mutex::ScopedLock locker(messageLock); > + return messages.size(); > +} > + > uint32_t Queue::getConsumerCount() const > { > Mutex::ScopedLock locker(consumerLock); > Index: qpid/broker/QueuedMessage.h > =================================================================== > --- qpid/broker/QueuedMessage.h (revision 833135) > +++ qpid/broker/QueuedMessage.h (working copy) > @@ -38,7 +38,9 @@ > QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, > framing::SequenceNumber sn) : > payload(msg), position(sn), queue(q) {} > QueuedMessage(Queue* q) : queue(q) {} > + > }; > + inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) > { return a.position < b.position; } > > }} > > Index: qpid/broker/Queue.h > =================================================================== > --- qpid/broker/Queue.h (revision 833135) > +++ qpid/broker/Queue.h (working copy) > @@ -148,6 +148,8 @@ > } > } > } > + > + Messages::iterator findAt(framing::SequenceNumber pos); > > public: > > @@ -221,6 +223,7 @@ > uint32_t move(const Queue::shared_ptr destq, uint32_t qty); > > QPID_BROKER_EXTERN uint32_t getMessageCount() const; > + QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() > const; > QPID_BROKER_EXTERN uint32_t getConsumerCount() const; > inline const string& getName() const { return name; } > bool isExclusiveOwner(const OwnershipToken* const o) const; > Index: tests/QueueTest.cpp > =================================================================== > --- tests/QueueTest.cpp (revision 833135) > +++ tests/QueueTest.cpp (working copy) > @@ -120,9 +120,10 @@ > queue->process(msg1); > sleep(2); > uint32_t compval=0; > - BOOST_CHECK_EQUAL(compval, queue->getMessageCount()); > + BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount()); > msg1->enqueueComplete(); > compval=1; > + BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount()); > BOOST_CHECK_EQUAL(compval, queue->getMessageCount()); > } > > > > --------------------------------------------------------------------- > Apache Qpid - AMQP Messaging Implementation > Project: http://qpid.apache.org > Use/Interact: mailto:[email protected] >
