Hi Carl,
This patch looks great, I will update the selector patch later.

Chenta

On Sat, Nov 7, 2009 at 3:30 AM, Carl Trieloff <[email protected]> wrote:

>
> I created a patch which seems to work well, it targets querying the queue,
> count, acquire making the
> queue access faster for large queues (best 1 {if no requeue or acquire},
> worst case binary-search) . In
> most cases if it faster then binary search even if requeue or selector is
> used.
>
> It does require that the re-queue order be corrected - which should be done
> regardless.
>
> The remaining function that could use some similar dressing would be
> Queue::seek()
>
> Any thoughts on the patch... This patch opens the way for reasonable
> selector performance.
> Carl.
>
>
> Index: qpid/broker/Queue.cpp
> ===================================================================
> --- qpid/broker/Queue.cpp       (revision 833135)
> +++ qpid/broker/Queue.cpp       (working copy)
> @@ -243,18 +243,18 @@
>  {
>     Mutex::ScopedLock locker(messageLock);
>     QPID_LOG(debug, "Attempting to acquire message at " << position);
> -    for (Messages::iterator i = messages.begin(); i != messages.end();
> i++) {
> -        if (i->position == position) {
> -            message = *i;
> -            if (lastValueQueue) {
> -                clearLVQIndex(*i);
> -            }
> -            QPID_LOG(debug,
> -                     "Acquired message at " << i->position << " from " <<
> name);
> -            messages.erase(i);
> -            return true;
> +
> +    Messages::iterator i = findAt(position);
> +    if (i != messages.end() ) {
> +        message = *i;
> +        if (lastValueQueue) {
> +            clearLVQIndex(*i);
>         }
> -    }
> +        QPID_LOG(debug,
> +                 "Acquired message at " << i->position << " from " <<
> name);
> +        messages.erase(i);
> +        return true;
> +    }
>     QPID_LOG(debug, "Could not acquire message at " << position << " from "
> << name << "; no message at that position");
>     return false;
>  }
> @@ -262,21 +262,21 @@
>  bool Queue::acquire(const QueuedMessage& msg) {
>     Mutex::ScopedLock locker(messageLock);
>     QPID_LOG(debug, "attempting to acquire " << msg.position);
> -    for (Messages::iterator i = messages.begin(); i != messages.end();
> i++) {
> -        if ((i->position == msg.position && !lastValueQueue) // note that
> in some cases payload not be set
> -            || (lastValueQueue && (i->position == msg.position) &&
> -                msg.payload.get() == checkLvqReplace(*i).payload.get()) )
>  {
> +    Messages::iterator i = findAt(msg.position);
> +    if ((i != messages.end() && !lastValueQueue) // note that in some
> cases payload not be set
> +        || (lastValueQueue && (i->position == msg.position) &&
> +            msg.payload.get() == checkLvqReplace(*i).payload.get()) )  {
>
> -            clearLVQIndex(msg);
> -            QPID_LOG(debug,
> -                     "Match found, acquire succeeded: " <<
> -                     i->position << " == " << msg.position);
> -            messages.erase(i);
> -            return true;
> -        } else {
> -            QPID_LOG(debug, "No match: " << i->position << " != " <<
> msg.position);
> -        }
> +        clearLVQIndex(msg);
> +        QPID_LOG(debug,
> +                 "Match found, acquire succeeded: " <<
> +                 i->position << " == " << msg.position);
> +        messages.erase(i);
> +        return true;
> +    } else {
> +        QPID_LOG(debug, "No match: " << i->position << " != " <<
> msg.position);
>     }
> +
>     QPID_LOG(debug, "Acquire failed for " << msg.position);
>     return false;
>  }
> @@ -445,19 +445,35 @@
>     return false;
>  }
>
> -namespace {
> -struct PositionEquals {
> -    SequenceNumber pos;
> -    PositionEquals(SequenceNumber p) : pos(p) {}
> -    bool operator()(const QueuedMessage& msg) const { return msg.position
> == pos; }
> -};
> -}// namespace
> +Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
>
> +    if(!messages.empty()){
> +        QueuedMessage compM;
> +        compM.position = pos;
> +        unsigned long diff = pos.getValue() -
> messages.front().position.getValue();
> +        long maxEnd = diff < messages.size()? diff : messages.size();
> +
> +        Messages::iterator i =
> lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
> +        if (i->position == pos)
> +            return i;
> +    }
> +    return messages.end(); // no match found.
> +}
> +
> +
>  QueuedMessage Queue::find(SequenceNumber pos) const {
> +
>     Mutex::ScopedLock locker(messageLock);
> -    Messages::const_iterator i = std::find_if(messages.begin(),
> messages.end(), PositionEquals(pos));
> -    if (i != messages.end())
> -        return *i;
> +    if(!messages.empty()){
> +        QueuedMessage compM;
> +        compM.position = pos;
> +        unsigned long diff = pos.getValue() -
> messages.front().position.getValue();
> +        long maxEnd = diff < messages.size()? diff : messages.size();
> +
> +        Messages::const_iterator i =
> lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
> +        if (i != messages.end())
> +            return *i;
> +    }
>     return QueuedMessage();
>  }
>
> @@ -642,10 +658,9 @@
>  }
>
>  /** function only provided for unit tests, or code not in critical message
> path */
> -uint32_t Queue::getMessageCount() const
> +uint32_t Queue::getEnqueueCompleteMessageCount() const
>  {
>     Mutex::ScopedLock locker(messageLock);
> -
>     uint32_t count = 0;
>     for ( Messages::const_iterator i = messages.begin(); i !=
> messages.end(); ++i ) {
>         //NOTE: don't need to use checkLvqReplace() here as it
> @@ -657,6 +672,12 @@
>     return count;
>  }
>
> +uint32_t Queue::getMessageCount() const
> +{
> +    Mutex::ScopedLock locker(messageLock);
> +    return messages.size();
> +}
> +
>  uint32_t Queue::getConsumerCount() const
>  {
>     Mutex::ScopedLock locker(consumerLock);
> Index: qpid/broker/QueuedMessage.h
> ===================================================================
> --- qpid/broker/QueuedMessage.h (revision 833135)
> +++ qpid/broker/QueuedMessage.h (working copy)
> @@ -38,7 +38,9 @@
>     QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg,
> framing::SequenceNumber sn) :
>         payload(msg), position(sn), queue(q) {}
>     QueuedMessage(Queue* q) : queue(q) {}
> +
>  };
> +    inline bool operator<(const QueuedMessage& a, const QueuedMessage& b)
> { return a.position < b.position; }
>
>  }}
>
> Index: qpid/broker/Queue.h
> ===================================================================
> --- qpid/broker/Queue.h (revision 833135)
> +++ qpid/broker/Queue.h (working copy)
> @@ -148,6 +148,8 @@
>                     }
>                 }
>             }
> +
> +            Messages::iterator findAt(framing::SequenceNumber pos);
>
>         public:
>
> @@ -221,6 +223,7 @@
>             uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
>
>             QPID_BROKER_EXTERN uint32_t getMessageCount() const;
> +            QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount()
> const;
>             QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
>             inline const string& getName() const { return name; }
>             bool isExclusiveOwner(const OwnershipToken* const o) const;
> Index: tests/QueueTest.cpp
> ===================================================================
> --- tests/QueueTest.cpp (revision 833135)
> +++ tests/QueueTest.cpp (working copy)
> @@ -120,9 +120,10 @@
>     queue->process(msg1);
>     sleep(2);
>     uint32_t compval=0;
> -    BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
> +    BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
>     msg1->enqueueComplete();
>     compval=1;
> +    BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
>     BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>  }
>
>
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:[email protected]
>

Reply via email to