Author: gsim
Date: Mon Nov 9 11:50:39 2009
New Revision: 834026
URL: http://svn.apache.org/viewvc?rev=834026&view=rev
Log:
QPID-2191: Fix browsing behaviour where messages may have been released out of
order
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
qpid/trunk/qpid/python/tests_0-10/message.py
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=834026&r1=834025&r2=834026&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Nov 9 11:50:39 2009
@@ -217,7 +217,11 @@
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
msg.payload->enqueueComplete(); // mark the message as enqueued
- messages.push_front(msg);
+ //put message back in correct position:
+ Messages::reverse_iterator i = messages.rbegin();
+ while (i != messages.rend() && msg.position < i->position) { ++i; }
+ messages.insert(i.base(), msg);
+
listeners.populate(copy);
// for persistLastNode - don't force a message twice to disk, but
force it if no force before
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=834026&r1=834025&r2=834026&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Mon Nov 9 11:50:39 2009
@@ -378,6 +378,7 @@
if (acquired) { // Message is on the update queue
m = getUpdateMessage();
m.queue = queue.get();
+ m.position = position;
if (enqueued) queue->enqueued(m); //inform queue of the message
} else { // Message at original position in original
queue
m = queue->find(position);
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=834026&r1=834025&r2=834026&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Mon Nov 9 11:50:39
2009
@@ -252,6 +252,7 @@
MessageUpdater updater(q->getName(), s, expiry);
q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater,
_1));
q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s,
q->getName(), _1));
+ ClusterConnectionProxy(s).queuePosition(q->getName(), q->getPosition());
}
void UpdateClient::updateExclusiveQueue(const
boost::shared_ptr<broker::Queue>& q) {
Modified: qpid/trunk/qpid/python/tests_0-10/message.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/tests_0-10/message.py?rev=834026&r1=834025&r2=834026&view=diff
==============================================================================
--- qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ qpid/trunk/qpid/python/tests_0-10/message.py Mon Nov 9 11:50:39 2009
@@ -815,6 +815,41 @@
#ensure there are no other messages
self.assertEmpty(queueC)
+ def test_release_order(self):
+ session = self.session
+
+ #create queue
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+ #send messages
+ for i in range(1, 11):
+
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
"message-%d" % (i)))
+
+ #subscribe:
+ session.message_subscribe(queue="q", destination="a")
+ a = session.incoming("a")
+ session.message_flow(unit = session.credit_unit.byte, value =
0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 10,
destination = "a")
+
+ for i in range(1, 11):
+ msg = a.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+ if (i % 2):
+ #accept all odd messages
+ session.message_accept(RangedSet(msg.id))
+ else:
+ #release all even messages
+ session.message_release(RangedSet(msg.id))
+
+ #browse:
+ session.message_subscribe(queue="q", destination="b", acquire_mode=1)
+ b = session.incoming("b")
+ b.start()
+ for i in [2, 4, 6, 8, 10]:
+ msg = b.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+
+
def test_empty_body(self):
session = self.session
session.queue_declare(queue="xyz", exclusive=True, auto_delete=True)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]