Author: gsim
Date: Mon Nov  9 11:50:39 2009
New Revision: 834026

URL: http://svn.apache.org/viewvc?rev=834026&view=rev
Log:
QPID-2191: Fix browsing behaviour where messages may have been released out of 
order

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/python/tests_0-10/message.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=834026&r1=834025&r2=834026&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Nov  9 11:50:39 2009
@@ -217,7 +217,11 @@
         Mutex::ScopedLock locker(messageLock);
         if (!isEnqueued(msg)) return;
         msg.payload->enqueueComplete(); // mark the message as enqueued
-        messages.push_front(msg);
+        //put message back in correct position:
+        Messages::reverse_iterator i = messages.rbegin();
+        while (i != messages.rend() && msg.position < i->position) { ++i; }
+        messages.insert(i.base(), msg);
+
         listeners.populate(copy);
 
         // for persistLastNode - don't force a message twice to disk, but 
force it if no force before 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=834026&r1=834025&r2=834026&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Mon Nov  9 11:50:39 2009
@@ -378,6 +378,7 @@
         if (acquired) {         // Message is on the update queue
             m = getUpdateMessage();
             m.queue = queue.get();
+            m.position = position;
             if (enqueued) queue->enqueued(m); //inform queue of the message 
         } else {                // Message at original position in original 
queue
             m = queue->find(position);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=834026&r1=834025&r2=834026&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Mon Nov  9 11:50:39 
2009
@@ -252,6 +252,7 @@
     MessageUpdater updater(q->getName(), s, expiry);
     q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, 
_1));
     q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s, 
q->getName(), _1));
+    ClusterConnectionProxy(s).queuePosition(q->getName(), q->getPosition());
 }
 
 void UpdateClient::updateExclusiveQueue(const 
boost::shared_ptr<broker::Queue>& q) {

Modified: qpid/trunk/qpid/python/tests_0-10/message.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/tests_0-10/message.py?rev=834026&r1=834025&r2=834026&view=diff
==============================================================================
--- qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ qpid/trunk/qpid/python/tests_0-10/message.py Mon Nov  9 11:50:39 2009
@@ -815,6 +815,41 @@
         #ensure there are no other messages
         self.assertEmpty(queueC)
 
+    def test_release_order(self):
+        session = self.session
+
+        #create queue
+        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+        #send messages
+        for i in range(1, 11):
+            
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
 "message-%d" % (i)))
+
+        #subscribe:
+        session.message_subscribe(queue="q", destination="a")
+        a = session.incoming("a")
+        session.message_flow(unit = session.credit_unit.byte, value = 
0xFFFFFFFFL, destination = "a")
+        session.message_flow(unit = session.credit_unit.message, value = 10, 
destination = "a")
+
+        for i in range(1, 11):
+            msg = a.get(timeout = 1)
+            self.assertEquals("message-%d" % (i), msg.body)
+            if (i % 2):
+                #accept all odd messages
+                session.message_accept(RangedSet(msg.id))
+            else:
+                #release all even messages
+                session.message_release(RangedSet(msg.id))
+
+        #browse:
+        session.message_subscribe(queue="q", destination="b", acquire_mode=1)
+        b = session.incoming("b")
+        b.start()
+        for i in [2, 4, 6, 8, 10]:
+            msg = b.get(timeout = 1)
+            self.assertEquals("message-%d" % (i), msg.body)
+
+
     def test_empty_body(self):
         session = self.session
         session.queue_declare(queue="xyz", exclusive=True, auto_delete=True)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to