Chenta,

Two things to note, Gordon has now put a patch in that corrects the order of the messages on requeue, i.e. they now stay in order by sequence number even after rollback. Thus with his patch and my seek() seekAt() we should now be able to resolve the acquire case quite easily and update the position.

Take a look and let me know if you need some help with that.

(i.e. before these two patches it would not have been possible, but now I believe it is)

Carl.


chenta lee wrote:
I did the consumer sequence number wrap-around is because that when we requeue the message, I cannot know which consumer consume it. Therefore, I cannot not update the sequence number of consumer and the messages rollback. The consequence is that when a consumer rollback (requeue) messages, they can not acquire them anymore (because requeue_msg.position is always larger than consumer.position ).

However, my patch is not that dirty :), I didn't change the original algorithm. We do not update the consumer sequence in consumeNextMessage at the very beginning. From my point of view, the only concern is that when a user decide to use selector in their messages, they might suffer from performance issue, however, the other users who do not use selector will be just fine.

Chenta

On Wed, Nov 25, 2009 at 10:54 AM, Carl Trieloff <[email protected] <mailto:[email protected]>> wrote:


    Thanks, the one remaining issue I know of with the selector patch
    is that consumer
    sequence number wrap-around I don't think works.

    We need a test there and maybe change the comp operators in your
    patch. I was looking into
    that last week on the selector patch, I'm itching to get the patch in.

    Carl.



    chenta lee wrote:
    Hi Carl,
    This patch looks great, I will update the selector patch later.

    Chenta

    On Sat, Nov 7, 2009 at 3:30 AM, Carl Trieloff
    <[email protected] <mailto:[email protected]>> wrote:


        I created a patch which seems to work well, it targets
        querying the queue, count, acquire making the
        queue access faster for large queues (best 1 {if no requeue
        or acquire}, worst case binary-search) . In
        most cases if it faster then binary search even if requeue or
        selector is used.

        It does require that the re-queue order be corrected - which
        should be done regardless.

        The remaining function that could use some similar dressing
        would be Queue::seek()

        Any thoughts on the patch... This patch opens the way for
        reasonable selector performance.
        Carl.


        Index: qpid/broker/Queue.cpp
        ===================================================================
        --- qpid/broker/Queue.cpp       (revision 833135)
        +++ qpid/broker/Queue.cpp       (working copy)
        @@ -243,18 +243,18 @@
         {
            Mutex::ScopedLock locker(messageLock);
            QPID_LOG(debug, "Attempting to acquire message at " <<
        position);
        -    for (Messages::iterator i = messages.begin(); i !=
        messages.end(); i++) {
        -        if (i->position == position) {
        -            message = *i;
        -            if (lastValueQueue) {
        -                clearLVQIndex(*i);
        -            }
        -            QPID_LOG(debug,
        -                     "Acquired message at " << i->position
        << " from " << name);
        -            messages.erase(i);
        -            return true;
        +
        +    Messages::iterator i = findAt(position);
        +    if (i != messages.end() ) {
        +        message = *i;
        +        if (lastValueQueue) {
        +            clearLVQIndex(*i);
                }
        -    }
        +        QPID_LOG(debug,
        +                 "Acquired message at " << i->position << "
        from " << name);
        +        messages.erase(i);
        +        return true;
        +    }
            QPID_LOG(debug, "Could not acquire message at " <<
        position << " from " << name << "; no message at that position");
            return false;
         }
        @@ -262,21 +262,21 @@
         bool Queue::acquire(const QueuedMessage& msg) {
            Mutex::ScopedLock locker(messageLock);
            QPID_LOG(debug, "attempting to acquire " << msg.position);
        -    for (Messages::iterator i = messages.begin(); i !=
        messages.end(); i++) {
        -        if ((i->position == msg.position && !lastValueQueue)
        // note that in some cases payload not be set
        -            || (lastValueQueue && (i->position ==
        msg.position) &&
        -                msg.payload.get() ==
        checkLvqReplace(*i).payload.get()) )  {
        +    Messages::iterator i = findAt(msg.position);
        +    if ((i != messages.end() && !lastValueQueue) // note
        that in some cases payload not be set
        +        || (lastValueQueue && (i->position == msg.position) &&
        +            msg.payload.get() ==
        checkLvqReplace(*i).payload.get()) )  {

        -            clearLVQIndex(msg);
        -            QPID_LOG(debug,
        -                     "Match found, acquire succeeded: " <<
        -                     i->position << " == " << msg.position);
        -            messages.erase(i);
        -            return true;
        -        } else {
        -            QPID_LOG(debug, "No match: " << i->position << "
        != " << msg.position);
        -        }
        +        clearLVQIndex(msg);
        +        QPID_LOG(debug,
        +                 "Match found, acquire succeeded: " <<
        +                 i->position << " == " << msg.position);
        +        messages.erase(i);
        +        return true;
        +    } else {
        +        QPID_LOG(debug, "No match: " << i->position << " !=
        " << msg.position);
            }
        +
            QPID_LOG(debug, "Acquire failed for " << msg.position);
            return false;
         }
        @@ -445,19 +445,35 @@
            return false;
         }

        -namespace {
        -struct PositionEquals {
        -    SequenceNumber pos;
        -    PositionEquals(SequenceNumber p) : pos(p) {}
        -    bool operator()(const QueuedMessage& msg) const { return
        msg.position == pos; }
        -};
        -}// namespace
        +Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {

        +    if(!messages.empty()){
        +        QueuedMessage compM;
        +        compM.position = pos;
        +        unsigned long diff = pos.getValue() -
        messages.front().position.getValue();
        +        long maxEnd = diff < messages.size()? diff :
        messages.size();
        +
        +        Messages::iterator i =
        lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
        +        if (i->position == pos)
        +            return i;
        +    }
        +    return messages.end(); // no match found.
        +}
        +
        +
         QueuedMessage Queue::find(SequenceNumber pos) const {
        +
            Mutex::ScopedLock locker(messageLock);
        -    Messages::const_iterator i =
        std::find_if(messages.begin(), messages.end(),
        PositionEquals(pos));
        -    if (i != messages.end())
        -        return *i;
        +    if(!messages.empty()){
        +        QueuedMessage compM;
        +        compM.position = pos;
        +        unsigned long diff = pos.getValue() -
        messages.front().position.getValue();
        +        long maxEnd = diff < messages.size()? diff :
        messages.size();
        +
        +        Messages::const_iterator i =
        lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
        +        if (i != messages.end())
        +            return *i;
        +    }
            return QueuedMessage();
         }

        @@ -642,10 +658,9 @@
         }

         /** function only provided for unit tests, or code not in
        critical message path */
        -uint32_t Queue::getMessageCount() const
        +uint32_t Queue::getEnqueueCompleteMessageCount() const
         {
            Mutex::ScopedLock locker(messageLock);
        -
            uint32_t count = 0;
            for ( Messages::const_iterator i = messages.begin(); i !=
        messages.end(); ++i ) {
                //NOTE: don't need to use checkLvqReplace() here as it
        @@ -657,6 +672,12 @@
            return count;
         }

        +uint32_t Queue::getMessageCount() const
        +{
        +    Mutex::ScopedLock locker(messageLock);
        +    return messages.size();
        +}
        +
         uint32_t Queue::getConsumerCount() const
         {
            Mutex::ScopedLock locker(consumerLock);
        Index: qpid/broker/QueuedMessage.h
        ===================================================================
        --- qpid/broker/QueuedMessage.h (revision 833135)
        +++ qpid/broker/QueuedMessage.h (working copy)
        @@ -38,7 +38,9 @@
            QueuedMessage(Queue* q, boost::intrusive_ptr<Message>
        msg, framing::SequenceNumber sn) :
                payload(msg), position(sn), queue(q) {}
            QueuedMessage(Queue* q) : queue(q) {}
        +
         };
        +    inline bool operator<(const QueuedMessage& a, const
        QueuedMessage& b) { return a.position < b.position; }

         }}

        Index: qpid/broker/Queue.h
        ===================================================================
        --- qpid/broker/Queue.h (revision 833135)
        +++ qpid/broker/Queue.h (working copy)
        @@ -148,6 +148,8 @@
                            }
                        }
                    }
        +
        +            Messages::iterator
        findAt(framing::SequenceNumber pos);

                public:

        @@ -221,6 +223,7 @@
                    uint32_t move(const Queue::shared_ptr destq,
        uint32_t qty);

                    QPID_BROKER_EXTERN uint32_t getMessageCount() const;
        +            QPID_BROKER_EXTERN uint32_t
        getEnqueueCompleteMessageCount() const;
                    QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
                    inline const string& getName() const { return name; }
                    bool isExclusiveOwner(const OwnershipToken* const
        o) const;
        Index: tests/QueueTest.cpp
        ===================================================================
        --- tests/QueueTest.cpp (revision 833135)
        +++ tests/QueueTest.cpp (working copy)
        @@ -120,9 +120,10 @@
            queue->process(msg1);
            sleep(2);
            uint32_t compval=0;
        -    BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
        +    BOOST_CHECK_EQUAL(compval,
        queue->getEnqueueCompleteMessageCount());
            msg1->enqueueComplete();
            compval=1;
        +    BOOST_CHECK_EQUAL(compval,
        queue->getEnqueueCompleteMessageCount());
            BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
         }



        ---------------------------------------------------------------------
        Apache Qpid - AMQP Messaging Implementation
        Project:      http://qpid.apache.org
        Use/Interact: mailto:[email protected]
        <mailto:[email protected]>





Reply via email to