Carl, Did those patches merge into the current source tree? And could you tell me which patches they are?
Chenta On Wed, Nov 25, 2009 at 9:49 PM, Carl Trieloff <[email protected]>wrote: > > Chenta, > > Two things to note, Gordon has now put a patch in that corrects the order > of the messages on requeue, i.e. they now stay in order by sequence number > even after rollback. Thus with his patch and my seek() seekAt() we should > now be able to resolve the acquire case quite easily and update the > position. > > Take a look and let me know if you need some help with that. > > (i.e. before these two patches it would not have been possible, but now I > believe it is) > > Carl. > > > > chenta lee wrote: > > I did the consumer sequence number wrap-around is because that when we > requeue the message, I cannot know which consumer consume it. Therefore, I > cannot not update the sequence number of consumer and the messages rollback. > The consequence is that when a consumer rollback (requeue) messages, they > can not acquire them anymore (because requeue_msg.position is always larger > than consumer.position ). > > However, my patch is not that dirty :), I didn't change the original > algorithm. We do not update the consumer sequence in consumeNextMessage at > the very beginning. From my point of view, the only concern is that when a > user decide to use selector in their messages, they might suffer from > performance issue, however, the other users who do not use selector will be > just fine. > > Chenta > > On Wed, Nov 25, 2009 at 10:54 AM, Carl Trieloff <[email protected]>wrote: > >> >> Thanks, the one remaining issue I know of with the selector patch is that >> consumer >> sequence number wrap-around I don't think works. >> >> We need a test there and maybe change the comp operators in your patch. I >> was looking into >> that last week on the selector patch, I'm itching to get the patch in. >> >> Carl. >> >> >> >> chenta lee wrote: >> >> Hi Carl, >> This patch looks great, I will update the selector patch later. >> >> Chenta >> >> On Sat, Nov 7, 2009 at 3:30 AM, Carl Trieloff <[email protected]>wrote: >> >>> >>> I created a patch which seems to work well, it targets querying the >>> queue, count, acquire making the >>> queue access faster for large queues (best 1 {if no requeue or acquire}, >>> worst case binary-search) . In >>> most cases if it faster then binary search even if requeue or selector is >>> used. >>> >>> It does require that the re-queue order be corrected - which should be >>> done regardless. >>> >>> The remaining function that could use some similar dressing would be >>> Queue::seek() >>> >>> Any thoughts on the patch... This patch opens the way for reasonable >>> selector performance. >>> Carl. >>> >>> >>> Index: qpid/broker/Queue.cpp >>> =================================================================== >>> --- qpid/broker/Queue.cpp (revision 833135) >>> +++ qpid/broker/Queue.cpp (working copy) >>> @@ -243,18 +243,18 @@ >>> { >>> Mutex::ScopedLock locker(messageLock); >>> QPID_LOG(debug, "Attempting to acquire message at " << position); >>> - for (Messages::iterator i = messages.begin(); i != messages.end(); >>> i++) { >>> - if (i->position == position) { >>> - message = *i; >>> - if (lastValueQueue) { >>> - clearLVQIndex(*i); >>> - } >>> - QPID_LOG(debug, >>> - "Acquired message at " << i->position << " from " >>> << name); >>> - messages.erase(i); >>> - return true; >>> + >>> + Messages::iterator i = findAt(position); >>> + if (i != messages.end() ) { >>> + message = *i; >>> + if (lastValueQueue) { >>> + clearLVQIndex(*i); >>> } >>> - } >>> + QPID_LOG(debug, >>> + "Acquired message at " << i->position << " from " << >>> name); >>> + messages.erase(i); >>> + return true; >>> + } >>> QPID_LOG(debug, "Could not acquire message at " << position << " from >>> " << name << "; no message at that position"); >>> return false; >>> } >>> @@ -262,21 +262,21 @@ >>> bool Queue::acquire(const QueuedMessage& msg) { >>> Mutex::ScopedLock locker(messageLock); >>> QPID_LOG(debug, "attempting to acquire " << msg.position); >>> - for (Messages::iterator i = messages.begin(); i != messages.end(); >>> i++) { >>> - if ((i->position == msg.position && !lastValueQueue) // note >>> that in some cases payload not be set >>> - || (lastValueQueue && (i->position == msg.position) && >>> - msg.payload.get() == checkLvqReplace(*i).payload.get()) >>> ) { >>> + Messages::iterator i = findAt(msg.position); >>> + if ((i != messages.end() && !lastValueQueue) // note that in some >>> cases payload not be set >>> + || (lastValueQueue && (i->position == msg.position) && >>> + msg.payload.get() == checkLvqReplace(*i).payload.get()) ) { >>> >>> - clearLVQIndex(msg); >>> - QPID_LOG(debug, >>> - "Match found, acquire succeeded: " << >>> - i->position << " == " << msg.position); >>> - messages.erase(i); >>> - return true; >>> - } else { >>> - QPID_LOG(debug, "No match: " << i->position << " != " << >>> msg.position); >>> - } >>> + clearLVQIndex(msg); >>> + QPID_LOG(debug, >>> + "Match found, acquire succeeded: " << >>> + i->position << " == " << msg.position); >>> + messages.erase(i); >>> + return true; >>> + } else { >>> + QPID_LOG(debug, "No match: " << i->position << " != " << >>> msg.position); >>> } >>> + >>> QPID_LOG(debug, "Acquire failed for " << msg.position); >>> return false; >>> } >>> @@ -445,19 +445,35 @@ >>> return false; >>> } >>> >>> -namespace { >>> -struct PositionEquals { >>> - SequenceNumber pos; >>> - PositionEquals(SequenceNumber p) : pos(p) {} >>> - bool operator()(const QueuedMessage& msg) const { return >>> msg.position == pos; } >>> -}; >>> -}// namespace >>> +Queue::Messages::iterator Queue::findAt(SequenceNumber pos) { >>> >>> + if(!messages.empty()){ >>> + QueuedMessage compM; >>> + compM.position = pos; >>> + unsigned long diff = pos.getValue() - >>> messages.front().position.getValue(); >>> + long maxEnd = diff < messages.size()? diff : messages.size(); >>> + >>> + Messages::iterator i = >>> lower_bound(messages.begin(),messages.begin()+maxEnd,compM); >>> + if (i->position == pos) >>> + return i; >>> + } >>> + return messages.end(); // no match found. >>> +} >>> + >>> + >>> QueuedMessage Queue::find(SequenceNumber pos) const { >>> + >>> Mutex::ScopedLock locker(messageLock); >>> - Messages::const_iterator i = std::find_if(messages.begin(), >>> messages.end(), PositionEquals(pos)); >>> - if (i != messages.end()) >>> - return *i; >>> + if(!messages.empty()){ >>> + QueuedMessage compM; >>> + compM.position = pos; >>> + unsigned long diff = pos.getValue() - >>> messages.front().position.getValue(); >>> + long maxEnd = diff < messages.size()? diff : messages.size(); >>> + >>> + Messages::const_iterator i = >>> lower_bound(messages.begin(),messages.begin()+maxEnd,compM); >>> + if (i != messages.end()) >>> + return *i; >>> + } >>> return QueuedMessage(); >>> } >>> >>> @@ -642,10 +658,9 @@ >>> } >>> >>> /** function only provided for unit tests, or code not in critical >>> message path */ >>> -uint32_t Queue::getMessageCount() const >>> +uint32_t Queue::getEnqueueCompleteMessageCount() const >>> { >>> Mutex::ScopedLock locker(messageLock); >>> - >>> uint32_t count = 0; >>> for ( Messages::const_iterator i = messages.begin(); i != >>> messages.end(); ++i ) { >>> //NOTE: don't need to use checkLvqReplace() here as it >>> @@ -657,6 +672,12 @@ >>> return count; >>> } >>> >>> +uint32_t Queue::getMessageCount() const >>> +{ >>> + Mutex::ScopedLock locker(messageLock); >>> + return messages.size(); >>> +} >>> + >>> uint32_t Queue::getConsumerCount() const >>> { >>> Mutex::ScopedLock locker(consumerLock); >>> Index: qpid/broker/QueuedMessage.h >>> =================================================================== >>> --- qpid/broker/QueuedMessage.h (revision 833135) >>> +++ qpid/broker/QueuedMessage.h (working copy) >>> @@ -38,7 +38,9 @@ >>> QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, >>> framing::SequenceNumber sn) : >>> payload(msg), position(sn), queue(q) {} >>> QueuedMessage(Queue* q) : queue(q) {} >>> + >>> }; >>> + inline bool operator<(const QueuedMessage& a, const QueuedMessage& >>> b) { return a.position < b.position; } >>> >>> }} >>> >>> Index: qpid/broker/Queue.h >>> =================================================================== >>> --- qpid/broker/Queue.h (revision 833135) >>> +++ qpid/broker/Queue.h (working copy) >>> @@ -148,6 +148,8 @@ >>> } >>> } >>> } >>> + >>> + Messages::iterator findAt(framing::SequenceNumber pos); >>> >>> public: >>> >>> @@ -221,6 +223,7 @@ >>> uint32_t move(const Queue::shared_ptr destq, uint32_t qty); >>> >>> QPID_BROKER_EXTERN uint32_t getMessageCount() const; >>> + QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() >>> const; >>> QPID_BROKER_EXTERN uint32_t getConsumerCount() const; >>> inline const string& getName() const { return name; } >>> bool isExclusiveOwner(const OwnershipToken* const o) const; >>> Index: tests/QueueTest.cpp >>> =================================================================== >>> --- tests/QueueTest.cpp (revision 833135) >>> +++ tests/QueueTest.cpp (working copy) >>> @@ -120,9 +120,10 @@ >>> queue->process(msg1); >>> sleep(2); >>> uint32_t compval=0; >>> - BOOST_CHECK_EQUAL(compval, queue->getMessageCount()); >>> + BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount()); >>> msg1->enqueueComplete(); >>> compval=1; >>> + BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount()); >>> BOOST_CHECK_EQUAL(compval, queue->getMessageCount()); >>> } >>> >>> >>> >>> --------------------------------------------------------------------- >>> Apache Qpid - AMQP Messaging Implementation >>> Project: http://qpid.apache.org >>> Use/Interact: mailto:[email protected] >>> >> >> >> > >
