Author: kgiusti
Date: Tue Oct 11 20:54:24 2011
New Revision: 1182084
URL: http://svn.apache.org/viewvc?rev=1182084&view=rev
Log:
QPID-3543: correctly dequeue messages that are auto-acknowledged.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1182084&r1=1182083&r2=1182084&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Oct 11 20:54:24
2011
@@ -348,7 +348,8 @@ bool SemanticState::ConsumerImpl::delive
parent->record(record);
}
if (acquire && !ackExpected) { // auto acquire && auto accept
- record.accept( 0 /*no ctxt*/ );
+ queue->dequeue(0 /*ctxt*/, msg);
+ record.setEnded();
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
return true;
Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py?rev=1182084&r1=1182083&r2=1182084&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py Tue Oct 11
20:54:24 2011
@@ -954,6 +954,39 @@ class MessageTests(TestBase010):
self.assertEmpty(messages)
+ def test_auto_ack(self):
+ """
+ Test implicit accept function
+ """
+ self.startQmf()
+ session = self.session
+ session.queue_declare(queue = "auto-ack", exclusive=True,
auto_delete=True)
+
session.message_transfer(message=Message(session.delivery_properties(routing_key="auto-ack"),
"ackackack"))
+
+ # verify one enqueued message, use both QMF and session query to
verify consistency
+ self.assertEqual(1,
session.queue_query(queue="auto-ack").message_count)
+ for queue in self.qmf.getObjects(_class="queue"):
+ if queue.name == "auto-ack":
+ break;
+ self.assertEquals("auto-ack", queue.name)
+ self.assertEquals(queue.msgDepth, 1)
+ self.assertEquals(queue.msgTotalEnqueues, 1)
+ self.assertEquals(queue.msgTotalDequeues, 0)
+
+ # implicit acquire and acknowledge
+ session.message_subscribe(queue = "auto-ack", destination = "a",
acquire_mode=0, accept_mode=1)
+ session.message_flow(destination="a",
unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="a", unit=session.credit_unit.byte,
value=0xFFFFFFFFL)
+ msg = session.incoming("a").get(timeout = 1)
+ self.assertEquals("ackackack", msg.body)
+
+ #message should not be on the queue:
+ self.assertEqual(0,
session.queue_query(queue="auto-ack").message_count)
+ queue.update()
+ self.assertEquals(queue.msgDepth, 0)
+ self.assertEquals(queue.msgTotalEnqueues, 1)
+ self.assertEquals(queue.msgTotalDequeues, 1)
+
def assertDataEquals(self, session, msg, expected):
self.assertEquals(expected, msg.body)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]