Author: kgiusti
Date: Thu Oct 13 14:26:06 2011
New Revision: 1182874

URL: http://svn.apache.org/viewvc?rev=1182874&view=rev
Log:
QPID-3543: augment existing test to check for implicit accept error instead of 
introducing a new test

Modified:
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py?rev=1182874&r1=1182873&r2=1182874&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py Thu Oct 13 
14:26:06 2011
@@ -262,19 +262,29 @@ class MessageTests(TestBase010):
 
     def test_ack(self):
         """
-        Test basic ack/recover behaviour
+        Test basic ack/recover behaviour using a combination of implicit and
+        explicit accept subscriptions.
         """
-        session = self.conn.session("alternate-session", timeout=10)
-        session.queue_declare(queue="test-ack-queue", auto_delete=True)
-
-        session.message_subscribe(queue = "test-ack-queue", destination = 
"consumer")
-        session.message_flow(destination="consumer", 
unit=session.credit_unit.message, value=0xFFFFFFFFL)
-        session.message_flow(destination="consumer", 
unit=session.credit_unit.byte, value=0xFFFFFFFFL)
-        queue = session.incoming("consumer")
+        self.startQmf()
+        session1 = self.conn.session("alternate-session", timeout=10)
+        session1.queue_declare(queue="test-ack-queue", auto_delete=True)
 
-        delivery_properties = 
session.delivery_properties(routing_key="test-ack-queue")
+        delivery_properties = 
session1.delivery_properties(routing_key="test-ack-queue")
         for i in ["One", "Two", "Three", "Four", "Five"]:
-            session.message_transfer(message=Message(delivery_properties, i))
+            session1.message_transfer(message=Message(delivery_properties, i))
+
+        # verify enqueued message count, use both QMF and session query to 
verify consistency
+        self.assertEqual(5, 
session1.queue_query(queue="test-ack-queue").message_count)
+        queueObj = self.qmf.getObjects(_class="queue", 
name="test-ack-queue")[0]
+        self.assertEquals(queueObj.msgDepth, 5)
+        self.assertEquals(queueObj.msgTotalEnqueues, 5)
+        self.assertEquals(queueObj.msgTotalDequeues, 0)
+
+        # subscribe with implied acquire, explicit accept:
+        session1.message_subscribe(queue = "test-ack-queue", destination = 
"consumer")
+        session1.message_flow(destination="consumer", 
unit=session1.credit_unit.message, value=0xFFFFFFFFL)
+        session1.message_flow(destination="consumer", 
unit=session1.credit_unit.byte, value=0xFFFFFFFFL)
+        queue = session1.incoming("consumer")
 
         msg1 = queue.get(timeout=1)
         msg2 = queue.get(timeout=1)
@@ -288,20 +298,46 @@ class MessageTests(TestBase010):
         self.assertEqual("Four", msg4.body)
         self.assertEqual("Five", msg5.body)
 
-        session.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two 
and Four
-
-        #subscribe from second session here to ensure queue is not
-        #auto-deleted when alternate session closes (no need to ack on these):
-        self.session.message_subscribe(queue = "test-ack-queue", destination = 
"checker", accept_mode=1)
+        # messages should not be on the queue:
+        self.assertEqual(0, 
session1.queue_query(queue="test-ack-queue").message_count)
+        # QMF shows the dequeues as not having happened yet, since they are 
have
+        # not been accepted
+        queueObj.update()
+        self.assertEquals(queueObj.msgDepth, 5)
+        self.assertEquals(queueObj.msgTotalEnqueues, 5)
+        self.assertEquals(queueObj.msgTotalDequeues, 0)
+
+        session1.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two 
and Four
+
+        # QMF should now reflect the accepted messages as being dequeued
+        self.assertEqual(0, 
session1.queue_query(queue="test-ack-queue").message_count)
+        queueObj.update()
+        self.assertEquals(queueObj.msgDepth, 2)
+        self.assertEquals(queueObj.msgTotalEnqueues, 5)
+        self.assertEquals(queueObj.msgTotalDequeues, 3)
+
+        #subscribe from second session here to ensure queue is not auto-deleted
+        #when alternate session closes.  Use implicit accept mode to test that
+        #we don't need to explicitly accept
+        session2 = self.conn.session("alternate-session-2", timeout=10)
+        session2.message_subscribe(queue = "test-ack-queue", destination = 
"checker", accept_mode=1)
 
-        #now close the session, and see that the unacked messages are
+        #now close the first session, and see that the unaccepted messages are
         #then redelivered to another subscriber:
-        session.close(timeout=10)
+        session1.close(timeout=10)
 
-        session = self.session
-        session.message_flow(destination="checker", 
unit=session.credit_unit.message, value=0xFFFFFFFFL)
-        session.message_flow(destination="checker", 
unit=session.credit_unit.byte, value=0xFFFFFFFFL)
-        queue = session.incoming("checker")
+        # check the statistics - the queue_query will show the non-accepted
+        # messages have been released. QMF never considered them dequeued, so
+        # those counts won't change
+        self.assertEqual(2, 
session2.queue_query(queue="test-ack-queue").message_count)
+        queueObj.update()
+        self.assertEquals(queueObj.msgDepth, 2)
+        self.assertEquals(queueObj.msgTotalEnqueues, 5)
+        self.assertEquals(queueObj.msgTotalDequeues, 3)
+
+        session2.message_flow(destination="checker", 
unit=session2.credit_unit.message, value=0xFFFFFFFFL)
+        session2.message_flow(destination="checker", 
unit=session2.credit_unit.byte, value=0xFFFFFFFFL)
+        queue = session2.incoming("checker")
 
         msg3b = queue.get(timeout=1)
         msg5b = queue.get(timeout=1)
@@ -314,6 +350,33 @@ class MessageTests(TestBase010):
             self.fail("Got unexpected message: " + extra.body)
         except Empty: None
 
+        self.assertEqual(0, 
session2.queue_query(queue="test-ack-queue").message_count)
+        queueObj.update()
+        self.assertEquals(queueObj.msgDepth, 0)
+        self.assertEquals(queueObj.msgTotalEnqueues, 5)
+        self.assertEquals(queueObj.msgTotalDequeues, 5)
+
+        # Subscribe one last time to keep the queue available, and to verify
+        # that the implied accept worked by verifying no messages have been
+        # returned when session2 is closed.
+        self.session.message_subscribe(queue = "test-ack-queue", destination = 
"final-checker")
+
+        session2.close(timeout=10)
+
+        # check the statistics - they should not have changed
+        self.assertEqual(0, 
self.session.queue_query(queue="test-ack-queue").message_count)
+        queueObj.update()
+        self.assertEquals(queueObj.msgDepth, 0)
+        self.assertEquals(queueObj.msgTotalEnqueues, 5)
+        self.assertEquals(queueObj.msgTotalDequeues, 5)
+
+        self.session.message_flow(destination="final-checker", 
unit=self.session.credit_unit.message, value=0xFFFFFFFFL)
+        self.session.message_flow(destination="final-checker", 
unit=self.session.credit_unit.byte, value=0xFFFFFFFFL)
+        try:
+            extra = self.session.incoming("final-checker").get(timeout=1)
+            self.fail("Got unexpected message: " + extra.body)
+        except Empty: None
+
     def test_reject(self):
         session = self.session
         session.queue_declare(queue = "q", exclusive=True, auto_delete=True, 
alternate_exchange="amq.fanout")
@@ -953,40 +1016,6 @@ class MessageTests(TestBase010):
         assert messages.get(timeout=1).body == "second"
         self.assertEmpty(messages)
 
-
-    def test_auto_ack(self):
-        """
-        Test implicit accept function
-        """
-        self.startQmf()
-        session = self.session
-        session.queue_declare(queue = "auto-ack", exclusive=True, 
auto_delete=True)
-        
session.message_transfer(message=Message(session.delivery_properties(routing_key="auto-ack"),
 "ackackack"))
-
-        # verify one enqueued message, use both QMF and session query to 
verify consistency
-        self.assertEqual(1, 
session.queue_query(queue="auto-ack").message_count)
-        for queue in self.qmf.getObjects(_class="queue"):
-            if queue.name == "auto-ack":
-                break;
-        self.assertEquals("auto-ack", queue.name)
-        self.assertEquals(queue.msgDepth, 1)
-        self.assertEquals(queue.msgTotalEnqueues, 1)
-        self.assertEquals(queue.msgTotalDequeues, 0)
-
-        # implicit acquire and acknowledge
-        session.message_subscribe(queue = "auto-ack", destination = "a", 
acquire_mode=0, accept_mode=1)
-        session.message_flow(destination="a", 
unit=session.credit_unit.message, value=0xFFFFFFFFL)
-        session.message_flow(destination="a", unit=session.credit_unit.byte, 
value=0xFFFFFFFFL)
-        msg = session.incoming("a").get(timeout = 1)
-        self.assertEquals("ackackack", msg.body)
-
-        #message should not be on the queue:
-        self.assertEqual(0, 
session.queue_query(queue="auto-ack").message_count)
-        queue.update()
-        self.assertEquals(queue.msgDepth, 0)
-        self.assertEquals(queue.msgTotalEnqueues, 1)
-        self.assertEquals(queue.msgTotalDequeues, 1)
-
     def assertDataEquals(self, session, msg, expected):
         self.assertEquals(expected, msg.body)
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to