Author: aconway
Date: Mon May 28 18:24:06 2012
New Revision: 1343347

URL: http://svn.apache.org/viewvc?rev=1343347&view=rev
Log:
QPID-3603: Allow Queue::setPosition() to truncate the queue.

In the new HA code a backup may sometimes be ahead of the new primary after a
fail-over. In that case the backup truncates it's queues to the same position
as the primary so it can continue replicating.

(Note the assertions added to verify setPosition showed up a minor bug in the
old cluster code, which was leaving messages on the cluster update queue after
an update.  This patch fixes the issue.)

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h
    qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/tests/Makefile.am
    qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
    qpid/trunk/qpid/tools/src/py/.gitignore

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp?rev=1343347&r1=1343346&r2=1343347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp Mon May 28 18:24:06 
2012
@@ -173,6 +173,26 @@ void MessageDeque::updateAcquired(const 
     }
 }
 
+namespace {
+bool isNotDeleted(const QueuedMessage& qm) { return qm.status != 
QueuedMessage::DELETED; }
+} // namespace
+
+void MessageDeque::setPosition(const framing::SequenceNumber& n) {
+    size_t i = index(n+1);
+    if (i >= messages.size()) return; // Nothing to do.
+
+    // Assertion to verify the precondition: no messaages after n.
+    assert(std::find_if(messages.begin()+i, messages.end(), &isNotDeleted) ==
+           messages.end());
+    messages.erase(messages.begin()+i, messages.end());
+    if (head >= messages.size()) head = messages.size() - 1;
+    // Re-count the available messages
+    available = 0;
+    for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
+        if (i->status == QueuedMessage::AVAILABLE) ++available;
+    }
+}
+
 void MessageDeque::clean()
 {
     while (messages.size() && messages.front().status == 
QueuedMessage::DELETED) {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h?rev=1343347&r1=1343346&r2=1343347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h Mon May 28 18:24:06 2012
@@ -44,7 +44,7 @@ class MessageDeque : public Messages
     bool consume(QueuedMessage&);
     bool push(const QueuedMessage& added, QueuedMessage& removed);
     void updateAcquired(const QueuedMessage& acquired);
-
+    void setPosition(const framing::SequenceNumber&);
     void foreach(Functor);
     void removeIf(Predicate);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp?rev=1343347&r1=1343346&r2=1343347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp Mon May 28 18:24:06 2012
@@ -21,6 +21,7 @@
 #include "qpid/broker/MessageMap.h"
 #include "qpid/broker/QueuedMessage.h"
 #include "qpid/log/Statement.h"
+#include <algorithm>
 
 namespace qpid {
 namespace broker {
@@ -130,18 +131,24 @@ bool MessageMap::push(const QueuedMessag
         QueuedMessage& a = messages[added.position];
         a = added;
         a.status = QueuedMessage::AVAILABLE;
-        QPID_LOG(debug, "Added message at " << a.position);
+        QPID_LOG(debug, "Added message " << a);
         return false;
     } else {
         //there is already a message with that key which needs to be replaced
         removed = result.first->second;
         result.first->second = replace(result.first->second, added);
         result.first->second.status = QueuedMessage::AVAILABLE;
-        QPID_LOG(debug, "Displaced message at " << removed.position << " with 
" << result.first->second.position << ": " << result.first->first);
+        QPID_LOG(debug, "Displaced message " << removed << " with " << 
result.first->second << ": " << result.first->first);
         return true;
     }
 }
 
+void MessageMap::setPosition(const framing::SequenceNumber& seq) {
+    // Nothing to do, just assert that the precondition is respected and there
+    // are no undeleted messages after seq.
+    assert(messages.empty() || (--messages.end())->first <= seq);
+}
+
 void MessageMap::foreach(Functor f)
 {
     for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h?rev=1343347&r1=1343346&r2=1343347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h Mon May 28 18:24:06 2012
@@ -6,7 +6,7 @@
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+o * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
@@ -50,6 +50,7 @@ class MessageMap : public Messages
     virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
     bool consume(QueuedMessage&);
     virtual bool push(const QueuedMessage& added, QueuedMessage& removed);
+    void setPosition(const framing::SequenceNumber&);
 
     void foreach(Functor);
     virtual void removeIf(Predicate);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h?rev=1343347&r1=1343346&r2=1343347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h Mon May 28 18:24:06 2012
@@ -21,6 +21,7 @@
  * under the License.
  *
  */
+#include "qpid/framing/SequenceNumber.h"
 #include <boost/function.hpp>
 
 namespace qpid {
@@ -101,14 +102,22 @@ class Messages
     virtual void updateAcquired(const QueuedMessage&) { }
 
     /**
+     * Set the position of the back of the queue. Next message enqueued will 
be n+1.
+     *@pre Any messages with seq > n must already be dequeued.
+     */
+    virtual void setPosition(const framing::SequenceNumber& /*n*/) = 0;
+
+    /**
      * Apply, the functor to each message held
      */
+
     virtual void foreach(Functor) = 0;
     /**
      * Remove every message held that for which the specified
      * predicate returns true
      */
     virtual void removeIf(Predicate) = 0;
+
   private:
 };
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp?rev=1343347&r1=1343346&r2=1343347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp Mon May 28 18:24:06 
2012
@@ -121,6 +121,10 @@ void PriorityQueue::updateAcquired(const
     fifo.updateAcquired(acquired);
 }
 
+void PriorityQueue::setPosition(const framing::SequenceNumber& n) {
+    fifo.setPosition(n);
+}
+
 void PriorityQueue::foreach(Functor f)
 {
     fifo.foreach(f);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h?rev=1343347&r1=1343346&r2=1343347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h Mon May 28 18:24:06 2012
@@ -52,6 +52,7 @@ class PriorityQueue : public Messages
     bool consume(QueuedMessage&);
     bool push(const QueuedMessage& added, QueuedMessage& removed);
     void updateAcquired(const QueuedMessage& acquired);
+    void setPosition(const framing::SequenceNumber&);
     void foreach(Functor);
     void removeIf(Predicate);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1343347&r1=1343346&r2=1343347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon May 28 18:24:06 2012
@@ -588,21 +588,51 @@ QueuedMessage Queue::get(){
     return msg;
 }
 
-bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& 
message)
+namespace {
+bool collectIf(QueuedMessage& qm, Messages::Predicate predicate,
+               std::deque<QueuedMessage>& collection)
 {
-    if (message.payload->hasExpired()) {
-        expired.push_back(message);
+    if (predicate(qm)) {
+        collection.push_back(qm);
         return true;
     } else {
         return false;
     }
 }
 
+bool isExpired(const QueuedMessage& qm) { return qm.payload->hasExpired(); }
+} // namespace
+
+void Queue::dequeueIf(Messages::Predicate predicate,
+                      std::deque<QueuedMessage>& dequeued)
+{
+    {
+        Mutex::ScopedLock locker(messageLock);
+        messages->removeIf(boost::bind(&collectIf, _1, predicate, 
boost::ref(dequeued)));
+    }
+    if (!dequeued.empty()) {
+        if (mgmtObject) {
+            mgmtObject->inc_acquires(dequeued.size());
+            if (brokerMgmtObject)
+                brokerMgmtObject->inc_acquires(dequeued.size());
+        }
+        for (std::deque<QueuedMessage>::const_iterator i = dequeued.begin();
+             i != dequeued.end(); ++i) {
+            {
+                // KAG: should be safe to retake lock after the removeIf, since
+                // no other thread can touch these messages after the 
removeIf() call
+                Mutex::ScopedLock locker(messageLock);
+                observeAcquire(*i, locker);
+            }
+            dequeue( 0, *i );
+        }
+    }
+}
+
 /**
  *@param lapse: time since the last purgeExpired
  */
-void Queue::purgeExpired(qpid::sys::Duration lapse)
-{
+void Queue::purgeExpired(sys::Duration lapse) {
     //As expired messages are discarded during dequeue also, only
     //bother explicitly expiring if the rate of dequeues since last
     //attempt is less than one per second.
@@ -610,37 +640,18 @@ void Queue::purgeExpired(qpid::sys::Dura
     dequeueSincePurge -= count;
     int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
     if (seconds == 0 || count / seconds < 1) {
-        std::deque<QueuedMessage> expired;
-        {
-            Mutex::ScopedLock locker(messageLock);
-            messages->removeIf(boost::bind(&collect_if_expired, 
boost::ref(expired), _1));
-        }
-
-        if (!expired.empty()) {
+        std::deque<QueuedMessage> dequeued;
+        dequeueIf(boost::bind(&isExpired, _1), dequeued);
+        if (dequeued.size()) {
             if (mgmtObject) {
-                mgmtObject->inc_acquires(expired.size());
-                mgmtObject->inc_discardsTtl(expired.size());
-                if (brokerMgmtObject) {
-                    brokerMgmtObject->inc_acquires(expired.size());
-                    brokerMgmtObject->inc_discardsTtl(expired.size());
-                }
-            }
-
-            for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
-                 i != expired.end(); ++i) {
-                {
-                    // KAG: should be safe to retake lock after the removeIf, 
since
-                    // no other thread can touch these messages after the 
removeIf() call
-                    Mutex::ScopedLock locker(messageLock);
-                    observeAcquire(*i, locker);
-                }
-                dequeue( 0, *i );
+                mgmtObject->inc_discardsTtl(dequeued.size());
+                if (brokerMgmtObject)
+                    brokerMgmtObject->inc_discardsTtl(dequeued.size());
             }
         }
     }
 }
 
-
 namespace {
     // for use with purge/move below - collect messages that match a given 
filter
     //
@@ -1661,8 +1672,22 @@ void Queue::query(qpid::types::Variant::
     if (allocator) allocator->query(results);
 }
 
+namespace {
+struct After {
+    framing::SequenceNumber seq;
+    After(framing::SequenceNumber s) : seq(s) {}
+    bool operator()(const QueuedMessage& qm) { return qm.position > seq; }
+};
+} // namespace
+
+
 void Queue::setPosition(SequenceNumber n) {
     Mutex::ScopedLock locker(messageLock);
+    if (n < sequence) {
+        std::deque<QueuedMessage> dequeued;
+        dequeueIf(After(n), dequeued);
+        messages->setPosition(n);
+    }
     sequence = n;
     QPID_LOG(trace, "Set position to " << sequence << " on " << getName());
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1343347&r1=1343346&r2=1343347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon May 28 18:24:06 2012
@@ -175,6 +175,7 @@ class Queue : public boost::enable_share
     void configureImpl(const qpid::framing::FieldTable& settings);
     void checkNotDeleted(const Consumer::shared_ptr& c);
     void notifyDeleted();
+    void dequeueIf(Messages::Predicate predicate, std::deque<QueuedMessage>& 
dequeued);
 
   public:
 
@@ -375,12 +376,21 @@ class Queue : public boost::enable_share
         std::for_each<Observers::iterator, F>(observers.begin(), 
observers.end(), f);
     }
 
-    /** Set the position sequence number  for the next message on the queue.
-     * Must be >= the current sequence number.
-     * Used by cluster to replicate queues.
+    /**
+     * Set the sequence number for the back of the queue, the
+     * next message enqueued will be pos+1.
+     * If pos > getPosition() this creates a gap in the sequence numbers.
+     * if pos < getPosition() the back of the queue is reset to pos,
+     *
+     * The _caller_ must ensure that any messages after pos have been dequeued.
+     *
+     * Used by HA/cluster code for queue replication.
      */
     QPID_BROKER_EXTERN void setPosition(framing::SequenceNumber pos);
-    /** return current position sequence number for the next message on the 
queue.
+
+    /**
+     *@return sequence number for the back of the queue. The next message 
pushed
+     * will be at getPosition+1
      */
     QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
     QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1343347&r1=1343346&r2=1343347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Mon May 28 18:24:06 2012
@@ -524,6 +524,7 @@ broker::QueuedMessage Connection::getUpd
     boost::shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE);
     assert(!updateq->isDurable());
     broker::QueuedMessage m = updateq->get();
+    updateq->dequeue(0, m);
     if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update 
queue"));
     return m;
 }

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=1343347&r1=1343346&r2=1343347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Mon May 28 18:24:06 2012
@@ -67,7 +67,7 @@ tmodule_LTLIBRARIES=
 
 TESTS+=unit_test
 check_PROGRAMS+=unit_test
-unit_test_LDADD=-lboost_unit_test_framework \
+unit_test_LDADD=-lboost_unit_test_framework -lpthread \
        $(lib_messaging) $(lib_broker) $(lib_console) $(lib_qmf2)
 
 unit_test_SOURCES= unit_test.cpp unit_test.h \

Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=1343347&r1=1343346&r2=1343347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Mon May 28 18:24:06 2012
@@ -31,6 +31,8 @@
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/FieldTable.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/client/QueueOptions.h"
 #include "qpid/framing/AMQFrame.h"
@@ -40,10 +42,11 @@
 #include "qpid/broker/QueueFlowLimit.h"
 
 #include <iostream>
-#include "boost/format.hpp"
-
-using std::string;
+#include <vector>
+#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
 
+using namespace std;
 using boost::intrusive_ptr;
 using namespace qpid;
 using namespace qpid::broker;
@@ -85,7 +88,7 @@ public:
     Message& getMessage() { return *(msg.get()); }
 };
 
-intrusive_ptr<Message> create_message(std::string exchange, std::string 
routingKey, uint64_t ttl = 0) {
+intrusive_ptr<Message> createMessage(std::string exchange, std::string 
routingKey, uint64_t ttl = 0) {
     intrusive_ptr<Message> msg(new Message());
     AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
     AMQFrame header((AMQHeaderBody()));
@@ -96,6 +99,16 @@ intrusive_ptr<Message> create_message(st
     return msg;
 }
 
+intrusive_ptr<Message> contentMessage(string content) {
+    intrusive_ptr<Message> m(MessageUtils::createMessage());
+    MessageUtils::addContent(m, content);
+    return m;
+}
+
+string getContent(intrusive_ptr<Message> m) {
+    return m->getFrames().getContent();
+}
+
 QPID_AUTO_TEST_SUITE(QueueTestSuite)
 
 QPID_AUTO_TEST_CASE(testAsyncMessage) {
@@ -107,7 +120,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) {
 
 
     //Test basic delivery:
-    intrusive_ptr<Message> msg1 = create_message("e", "A");
+    intrusive_ptr<Message> msg1 = createMessage("e", "A");
     msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called 
from process
     queue->process(msg1);
     sleep(2);
@@ -122,7 +135,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) {
 
 QPID_AUTO_TEST_CASE(testAsyncMessageCount){
     Queue::shared_ptr queue(new Queue("my_test_queue", true));
-    intrusive_ptr<Message> msg1 = create_message("e", "A");
+    intrusive_ptr<Message> msg1 = createMessage("e", "A");
     msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called 
from process
 
     queue->process(msg1);
@@ -147,9 +160,9 @@ QPID_AUTO_TEST_CASE(testConsumers){
     BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount());
 
     //Test basic delivery:
-    intrusive_ptr<Message> msg1 = create_message("e", "A");
-    intrusive_ptr<Message> msg2 = create_message("e", "B");
-    intrusive_ptr<Message> msg3 = create_message("e", "C");
+    intrusive_ptr<Message> msg1 = createMessage("e", "A");
+    intrusive_ptr<Message> msg2 = createMessage("e", "B");
+    intrusive_ptr<Message> msg3 = createMessage("e", "C");
 
     queue->deliver(msg1);
     BOOST_CHECK(queue->dispatch(c1));
@@ -193,9 +206,9 @@ QPID_AUTO_TEST_CASE(testRegistry){
 
 QPID_AUTO_TEST_CASE(testDequeue){
     Queue::shared_ptr queue(new Queue("my_queue", true));
-    intrusive_ptr<Message> msg1 = create_message("e", "A");
-    intrusive_ptr<Message> msg2 = create_message("e", "B");
-    intrusive_ptr<Message> msg3 = create_message("e", "C");
+    intrusive_ptr<Message> msg1 = createMessage("e", "A");
+    intrusive_ptr<Message> msg2 = createMessage("e", "B");
+    intrusive_ptr<Message> msg3 = createMessage("e", "C");
     intrusive_ptr<Message> received;
 
     queue->deliver(msg1);
@@ -267,9 +280,9 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeS
     Queue::shared_ptr queue(new Queue("my-queue", true));
     queue->configure(args);
 
-    intrusive_ptr<Message> msg1 = create_message("e", "A");
-    intrusive_ptr<Message> msg2 = create_message("e", "B");
-    intrusive_ptr<Message> msg3 = create_message("e", "C");
+    intrusive_ptr<Message> msg1 = createMessage("e", "A");
+    intrusive_ptr<Message> msg2 = createMessage("e", "B");
+    intrusive_ptr<Message> msg3 = createMessage("e", "C");
 
     //enqueue 2 messages
     queue->deliver(msg1);
@@ -293,9 +306,9 @@ QPID_AUTO_TEST_CASE(testSeek){
 
     Queue::shared_ptr queue(new Queue("my-queue", true));
 
-    intrusive_ptr<Message> msg1 = create_message("e", "A");
-    intrusive_ptr<Message> msg2 = create_message("e", "B");
-    intrusive_ptr<Message> msg3 = create_message("e", "C");
+    intrusive_ptr<Message> msg1 = createMessage("e", "A");
+    intrusive_ptr<Message> msg2 = createMessage("e", "B");
+    intrusive_ptr<Message> msg3 = createMessage("e", "C");
 
     //enqueue 2 messages
     queue->deliver(msg1);
@@ -319,9 +332,9 @@ QPID_AUTO_TEST_CASE(testSearch){
 
     Queue::shared_ptr queue(new Queue("my-queue", true));
 
-    intrusive_ptr<Message> msg1 = create_message("e", "A");
-    intrusive_ptr<Message> msg2 = create_message("e", "B");
-    intrusive_ptr<Message> msg3 = create_message("e", "C");
+    intrusive_ptr<Message> msg1 = createMessage("e", "A");
+    intrusive_ptr<Message> msg2 = createMessage("e", "B");
+    intrusive_ptr<Message> msg3 = createMessage("e", "C");
 
     //enqueue 2 messages
     queue->deliver(msg1);
@@ -433,10 +446,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
     Queue::shared_ptr queue(new Queue("my-queue", true ));
     queue->configure(args);
 
-    intrusive_ptr<Message> msg1 = create_message("e", "A");
-    intrusive_ptr<Message> msg2 = create_message("e", "B");
-    intrusive_ptr<Message> msg3 = create_message("e", "C");
-    intrusive_ptr<Message> msg4 = create_message("e", "D");
+    intrusive_ptr<Message> msg1 = createMessage("e", "A");
+    intrusive_ptr<Message> msg2 = createMessage("e", "B");
+    intrusive_ptr<Message> msg3 = createMessage("e", "C");
+    intrusive_ptr<Message> msg4 = createMessage("e", "D");
     intrusive_ptr<Message> received;
 
     //set deliever match for LVQ a,b,c,a
@@ -468,9 +481,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
     received = queue->get().payload;
     BOOST_CHECK_EQUAL(msg3.get(), received.get());
 
-    intrusive_ptr<Message> msg5 = create_message("e", "A");
-    intrusive_ptr<Message> msg6 = create_message("e", "B");
-    intrusive_ptr<Message> msg7 = create_message("e", "C");
+    intrusive_ptr<Message> msg5 = createMessage("e", "A");
+    intrusive_ptr<Message> msg6 = createMessage("e", "B");
+    intrusive_ptr<Message> msg7 = createMessage("e", "C");
     msg5->insertCustomProperty(key,"a");
     msg6->insertCustomProperty(key,"b");
     msg7->insertCustomProperty(key,"c");
@@ -500,8 +513,8 @@ QPID_AUTO_TEST_CASE(testLVQEmptyKey){
     Queue::shared_ptr queue(new Queue("my-queue", true ));
     queue->configure(args);
 
-    intrusive_ptr<Message> msg1 = create_message("e", "A");
-    intrusive_ptr<Message> msg2 = create_message("e", "B");
+    intrusive_ptr<Message> msg1 = createMessage("e", "A");
+    intrusive_ptr<Message> msg2 = createMessage("e", "B");
 
     string key;
     args.getLVQKey(key);
@@ -526,12 +539,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
     Queue::shared_ptr queue(new Queue("my-queue", true ));
     queue->configure(args);
 
-    intrusive_ptr<Message> msg1 = create_message("e", "A");
-    intrusive_ptr<Message> msg2 = create_message("e", "B");
-    intrusive_ptr<Message> msg3 = create_message("e", "C");
-    intrusive_ptr<Message> msg4 = create_message("e", "D");
-    intrusive_ptr<Message> msg5 = create_message("e", "F");
-    intrusive_ptr<Message> msg6 = create_message("e", "G");
+    intrusive_ptr<Message> msg1 = createMessage("e", "A");
+    intrusive_ptr<Message> msg2 = createMessage("e", "B");
+    intrusive_ptr<Message> msg3 = createMessage("e", "C");
+    intrusive_ptr<Message> msg4 = createMessage("e", "D");
+    intrusive_ptr<Message> msg5 = createMessage("e", "F");
+    intrusive_ptr<Message> msg6 = createMessage("e", "G");
 
     //set deliever match for LVQ a,b,c,a
 
@@ -603,8 +616,8 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){
     queue1->configure(args);
     queue2->configure(args);
 
-    intrusive_ptr<Message> msg1 = create_message("e", "A");
-    intrusive_ptr<Message> msg2 = create_message("e", "A");
+    intrusive_ptr<Message> msg1 = createMessage("e", "A");
+    intrusive_ptr<Message> msg2 = createMessage("e", "A");
 
     string key;
     args.getLVQKey(key);
@@ -647,8 +660,8 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
     intrusive_ptr<Message> received;
     queue1->create(args);
 
-    intrusive_ptr<Message> msg1 = create_message("e", "A");
-    intrusive_ptr<Message> msg2 = create_message("e", "A");
+    intrusive_ptr<Message> msg1 = createMessage("e", "A");
+    intrusive_ptr<Message> msg2 = createMessage("e", "A");
     // 2
     string key;
     args.getLVQKey(key);
@@ -675,7 +688,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
 void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint 
evenTtl = 0)
 {
     for (uint i = 0; i < count; i++) {
-        intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? 
oddTtl : evenTtl);
+        intrusive_ptr<Message> m = createMessage("exchange", "key", i % 2 ? 
oddTtl : evenTtl);
         m->computeExpiration(new broker::ExpiryPolicy);
         queue.deliver(m);
     }
@@ -738,7 +751,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
                              std::string("b"), std::string("b"), 
std::string("b"),
                              std::string("c"), std::string("c"), 
std::string("c") };
     for (int i = 0; i < 9; ++i) {
-        intrusive_ptr<Message> msg = create_message("e", "A");
+        intrusive_ptr<Message> msg = createMessage("e", "A");
         msg->insertCustomProperty("GROUP-ID", groups[i]);
         msg->insertCustomProperty("MY-ID", i);
         queue->deliver(msg);
@@ -885,7 +898,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     // Queue = a-2,
     // Owners= ^C3,
 
-    intrusive_ptr<Message> msg = create_message("e", "A");
+    intrusive_ptr<Message> msg = createMessage("e", "A");
     msg->insertCustomProperty("GROUP-ID", "a");
     msg->insertCustomProperty("MY-ID", 9);
     queue->deliver(msg);
@@ -896,7 +909,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     gotOne = queue->dispatch(c2);
     BOOST_CHECK( !gotOne );
 
-    msg = create_message("e", "A");
+    msg = createMessage("e", "A");
     msg->insertCustomProperty("GROUP-ID", "b");
     msg->insertCustomProperty("MY-ID", 10);
     queue->deliver(msg);
@@ -927,7 +940,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     queue->configure(args);
 
     for (int i = 0; i < 3; ++i) {
-        intrusive_ptr<Message> msg = create_message("e", "A");
+        intrusive_ptr<Message> msg = createMessage("e", "A");
         // no "GROUP-ID" header
         msg->insertCustomProperty("MY-ID", i);
         queue->deliver(msg);
@@ -990,7 +1003,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNo
     Queue::shared_ptr queue2(new Queue("queue2", true, &testStore ));
     queue2->create(args);
 
-    intrusive_ptr<Message> msg1 = create_message("e", "A");
+    intrusive_ptr<Message> msg1 = createMessage("e", "A");
 
     queue1->deliver(msg1);
     queue2->deliver(msg1);
@@ -1006,7 +1019,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNo
     queue2->setLastNodeFailure();
     BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
 
-    intrusive_ptr<Message> msg2 = create_message("e", "B");
+    intrusive_ptr<Message> msg2 = createMessage("e", "B");
     queue1->deliver(msg2);
     queue2->deliver(msg2);
 
@@ -1021,7 +1034,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNo
     queue1->clearLastNodeFailure();
     queue2->clearLastNodeFailure();
 
-    intrusive_ptr<Message> msg3 = create_message("e", "B");
+    intrusive_ptr<Message> msg3 = createMessage("e", "B");
     queue1->deliver(msg3);
     queue2->deliver(msg3);
     BOOST_CHECK_EQUAL(testStore.enqCnt, 4u);
@@ -1035,8 +1048,8 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNo
      * internal details not part of the queue abstraction.
 
     // check requeue 1
-    intrusive_ptr<Message> msg4 = create_message("e", "C");
-    intrusive_ptr<Message> msg5 = create_message("e", "D");
+    intrusive_ptr<Message> msg4 = createMessage("e", "C");
+    intrusive_ptr<Message> msg5 = createMessage("e", "D");
 
     framing::SequenceNumber sequence(1);
     QueuedMessage qmsg1(queue1.get(), msg4, sequence);
@@ -1083,8 +1096,8 @@ not requeued to the store.
     queue1->create(args);
 
     // check requeue 1
-    intrusive_ptr<Message> msg1 = create_message("e", "C");
-    intrusive_ptr<Message> msg2 = create_message("e", "D");
+    intrusive_ptr<Message> msg1 = createMessage("e", "C");
+    intrusive_ptr<Message> msg2 = createMessage("e", "D");
 
     queue1->recover(msg1);
 
@@ -1116,7 +1129,7 @@ simulate store exception going into last
     queue1->configure(args);
 
     // check requeue 1
-    intrusive_ptr<Message> msg1 = create_message("e", "C");
+    intrusive_ptr<Message> msg1 = createMessage("e", "C");
 
     queue1->deliver(msg1);
     testStore.createError();
@@ -1403,6 +1416,133 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
     BOOST_CHECK_EQUAL(5u, tq9->getMessageCount());
 }
 
+QPID_AUTO_TEST_CASE(testSetPositionFifo) {
+    Queue::shared_ptr q(new Queue("my-queue", true));
+    BOOST_CHECK_EQUAL(q->getPosition(), SequenceNumber(0));
+    for (int i = 0; i < 10; ++i)
+        q->deliver(contentMessage(boost::lexical_cast<string>(i+1)));
+
+    // Verify the front of the queue
+    TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't 
acquire
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(1, c->last.position); // Numbered from 1
+    BOOST_CHECK_EQUAL("1", getContent(c->last.payload));
+    // Verify the back of the queue
+    QueuedMessage qm;
+    BOOST_CHECK_EQUAL(10, q->getPosition());
+    BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue
+    BOOST_CHECK_EQUAL("10", getContent(qm.payload));
+    BOOST_CHECK_EQUAL(10, q->getMessageCount());
+
+    // Using setPosition to introduce a gap in sequence numbers.
+    q->setPosition(15);
+    BOOST_CHECK_EQUAL(10, q->getMessageCount());
+    BOOST_CHECK_EQUAL(15, q->getPosition());
+    BOOST_CHECK(q->find(10, qm)); // Back of the queue
+    BOOST_CHECK_EQUAL("10", getContent(qm.payload));
+    q->deliver(contentMessage("16"));
+    c->setPosition(9);
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(10, c->last.position);
+    BOOST_CHECK_EQUAL("10", getContent(c->last.payload));
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(16, c->last.position);
+    BOOST_CHECK_EQUAL("16", getContent(c->last.payload));
+
+    // Using setPosition to trunkcate the queue
+    q->setPosition(5);
+    BOOST_CHECK_EQUAL(5, q->getMessageCount());
+    q->deliver(contentMessage("6a"));
+    c->setPosition(4);
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(5, c->last.position);
+    BOOST_CHECK_EQUAL("5", getContent(c->last.payload));
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(6, c->last.position);
+    BOOST_CHECK_EQUAL("6a", getContent(c->last.payload));
+    BOOST_CHECK(!q->dispatch(c)); // No more messages.
+}
+
+QPID_AUTO_TEST_CASE(testSetPositionLvq) {
+    Queue::shared_ptr q(new Queue("my-queue", true));
+    string key="key";
+    framing::FieldTable args;
+    args.setString("qpid.last_value_queue_key", "key");
+    q->configure(args);
+
+    const char* values[] = { "a", "b", "c", "a", "b", "c" };
+    for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) {
+        intrusive_ptr<Message> m = 
contentMessage(boost::lexical_cast<string>(i+1));
+        m->insertCustomProperty(key, values[i]);
+        q->deliver(m);
+    }
+    BOOST_CHECK_EQUAL(3, q->getMessageCount());
+    // Verify the front of the queue
+    TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't 
acquire
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(4, c->last.position); // Numbered from 1
+    BOOST_CHECK_EQUAL("4", getContent(c->last.payload));
+    // Verify the back of the queue
+    QueuedMessage qm;
+    BOOST_CHECK_EQUAL(6, q->getPosition());
+    BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue
+    BOOST_CHECK_EQUAL("6", getContent(qm.payload));
+
+    q->setPosition(5);
+    c->setPosition(4);
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(5, c->last.position); // Numbered from 1
+    BOOST_CHECK(!q->dispatch(c));
+}
+
+QPID_AUTO_TEST_CASE(testSetPositionPriority) {
+    Queue::shared_ptr q(new Queue("my-queue", true));
+    framing::FieldTable args;
+    args.setInt("qpid.priorities", 10);
+    q->configure(args);
+
+    const int priorities[] = { 1, 2, 3, 2, 1, 3 };
+    for (size_t i = 0; i < sizeof(priorities)/sizeof(priorities[0]); ++i) {
+        intrusive_ptr<Message> m = 
contentMessage(boost::lexical_cast<string>(i+1));
+        m->getFrames().getHeaders()->get<DeliveryProperties>(true)
+            ->setPriority(priorities[i]);
+        q->deliver(m);
+    }
+
+    // Truncation removes messages in fifo order, not priority order.
+    q->setPosition(3);
+    TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in 
FIFO order
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(1, c->last.position);
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(2, c->last.position);
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(3, c->last.position);
+    BOOST_CHECK(!q->dispatch(c));
+
+    intrusive_ptr<Message> m = contentMessage("4a");
+    m->getFrames().getHeaders()->get<DeliveryProperties>(true)
+        ->setPriority(4);
+    q->deliver(m);
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(4, c->last.position);
+    BOOST_CHECK_EQUAL("4a", getContent(c->last.payload));
+
+    // But consumers see priority order
+    c.reset(new TestConsumer("test", true));
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(4, c->last.position);
+    BOOST_CHECK_EQUAL("4a", getContent(c->last.payload));
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(3, c->last.position);
+    BOOST_CHECK_EQUAL("3", getContent(c->last.payload));
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(2, c->last.position);
+    BOOST_CHECK_EQUAL("2", getContent(c->last.payload));
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(1, c->last.position);
+    BOOST_CHECK_EQUAL("1", getContent(c->last.payload));
+}
 
 QPID_AUTO_TEST_SUITE_END()
 

Modified: qpid/trunk/qpid/tools/src/py/.gitignore
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/.gitignore?rev=1343347&r1=1343346&r2=1343347&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/.gitignore (original)
+++ qpid/trunk/qpid/tools/src/py/.gitignore Mon May 28 18:24:06 2012
@@ -19,4 +19,5 @@
 # with the License.  You may obtain a copy of the License at
 /qpid-clusterc
 /qpid-configc
+/qpid-hac
 /qpid-routec



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to