Author: gsim
Date: Thu Mar  3 12:45:25 2011
New Revision: 1076604

URL: http://svn.apache.org/viewvc?rev=1076604&view=rev
Log:
QPID-3107: If queue's alternate-exchange can't route message, try that 
exchange's alternate-exchange

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1076604&r1=1076603&r2=1076604&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Thu Mar  3 12:45:25 
2011
@@ -135,7 +135,7 @@ void DeliveryRecord::reject() 
         Exchange::shared_ptr alternate = queue->getAlternateExchange();
         if (alternate) {
             DeliverableMessage delivery(msg.payload);
-            alternate->route(delivery, msg.payload->getRoutingKey(), 
msg.payload->getApplicationHeaders());
+            alternate->routeWithAlternate(delivery);
             QPID_LOG(info, "Routed rejected message from " << queue->getName() 
<< " to "
                      << alternate->getName());
         } else {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1076604&r1=1076603&r2=1076604&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Thu Mar  3 12:45:25 2011
@@ -342,3 +342,12 @@ bool Exchange::MatchQueue::operator()(Ex
 void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) {
     msg->getProperties<DeliveryProperties>()->setExchange(getName());
 }
+
+bool Exchange::routeWithAlternate(Deliverable& msg)
+{
+    route(msg, msg.getMessage().getRoutingKey(), 
msg.getMessage().getApplicationHeaders());
+    if (!msg.delivered && alternate) {
+        alternate->route(msg, msg.getMessage().getRoutingKey(), 
msg.getMessage().getApplicationHeaders());
+    }
+    return msg.delivered;
+}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=1076604&r1=1076603&r2=1076604&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Thu Mar  3 12:45:25 2011
@@ -222,6 +222,8 @@ public:
      */
     void recoveryComplete(ExchangeRegistry& exchanges);
 
+    bool routeWithAlternate(Deliverable& message);
+
 protected:
     qpid::sys::Mutex bridgeLock;
     std::vector<DynamicBridge*> bridgeVector;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1076604&r1=1076603&r2=1076604&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Mar  3 12:45:25 2011
@@ -834,8 +834,7 @@ void Queue::destroyed()
         Mutex::ScopedLock locker(messageLock);
         while(!messages->empty()){
             DeliverableMessage msg(messages->front().payload);
-            alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
-                                     msg.getMessage().getApplicationHeaders());
+            alternateExchange->routeWithAlternate(msg);
             popAndDequeue();
         }
         alternateExchange->decAlternateUsers();

Modified: 
qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py?rev=1076604&r1=1076603&r2=1076604&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py 
(original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py 
Thu Mar  3 12:45:25 2011
@@ -18,7 +18,7 @@
 #
 import traceback
 from qpid.queue import Empty
-from qpid.datatypes import Message
+from qpid.datatypes import Message, RangedSet
 from qpid.testlib import TestBase010
 from qpid.session import SessionException
 
@@ -77,13 +77,7 @@ class AlternateExchangeTests(TestBase010
         """
         session = self.session
         #set up a 'dead letter queue':
-        session.exchange_declare(exchange="dlq", type="fanout")
-        session.queue_declare(queue="deleted", exclusive=True, 
auto_delete=True)
-        session.exchange_bind(exchange="dlq", queue="deleted")
-        session.message_subscribe(destination="dlq", queue="deleted")
-        session.message_flow(destination="dlq", 
unit=session.credit_unit.message, value=0xFFFFFFFFL)
-        session.message_flow(destination="dlq", unit=session.credit_unit.byte, 
value=0xFFFFFFFFL)
-        dlq = session.incoming("dlq")
+        dlq = self.setup_dlq()
 
         #create a queue using the dlq as its alternate exchange:
         session.queue_declare(queue="delete-me", alternate_exchange="dlq")
@@ -236,6 +230,121 @@ class AlternateExchangeTests(TestBase010
             self.assertEqual("Three", dlq.get(timeout=1).body)
             self.assertEmpty(dlq)
 
+    def test_queue_delete_loop(self):
+        """
+        Test that if a queue is bound to its own alternate exchange,
+        then on deletion there is no infinite looping
+        """
+        session = self.session
+        dlq = self.setup_dlq()
+
+        #create a queue using the dlq as its alternate exchange:
+        session.queue_declare(queue="delete-me", alternate_exchange="dlq")
+        #bind that queue to the dlq as well:
+        session.exchange_bind(exchange="dlq", queue="delete-me")
+        #send it some messages:
+        dp=self.session.delivery_properties(routing_key="delete-me")
+        for m in ["One", "Two", "Three"]:
+            session.message_transfer(message=Message(dp, m))
+        #delete it:
+        session.queue_delete(queue="delete-me")
+        #cleanup:
+        session.exchange_delete(exchange="dlq")
+
+        #check the messages were delivered to the dlq:
+        for m in ["One", "Two", "Three"]:
+            self.assertEqual(m, dlq.get(timeout=1).body)
+        self.assertEmpty(dlq)
+
+    def test_queue_delete_no_match(self):
+        """
+        Test that on queue deletion, if the queues own alternate
+        exchange cannot find a match for the message, the
+        alternate-exchange of that exchange will be tried. Note:
+        though the spec rules out going to the alternate-exchanges
+        alternate exchange when sending to an exchange, it does not
+        cover this case.
+        """
+        session = self.session
+        dlq = self.setup_dlq()
+
+        #setu up an 'intermediary' exchange
+        session.exchange_declare(exchange="my-exchange", type="direct", 
alternate_exchange="dlq")
+
+        #create a queue using the intermediary as its alternate exchange:
+        session.queue_declare(queue="delete-me", 
alternate_exchange="my-exchange")
+        #bind that queue to the dlq as well:
+        session.exchange_bind(exchange="dlq", queue="delete-me")
+        #send it some messages:
+        dp=self.session.delivery_properties(routing_key="delete-me")
+        for m in ["One", "Two", "Three"]:
+            session.message_transfer(message=Message(dp, m))
+
+        #delete it:
+        session.queue_delete(queue="delete-me")
+        #cleanup:
+        session.exchange_delete(exchange="my-exchange")
+        session.exchange_delete(exchange="dlq")
+
+        #check the messages were delivered to the dlq:
+        for m in ["One", "Two", "Three"]:
+            self.assertEqual(m, dlq.get(timeout=1).body)
+        self.assertEmpty(dlq)
+
+    def test_reject_no_match(self):
+        """
+        Test that on rejecting a message, if the queues own alternate
+        exchange cannot find a match for the message, the
+        alternate-exchange of that exchange will be tried. Note:
+        though the spec rules out going to the alternate-exchanges
+        alternate exchange when sending to an exchange, it does not
+        cover this case.
+        """
+        session = self.session
+        dlq = self.setup_dlq()
+
+        #setu up an 'intermediary' exchange
+        session.exchange_declare(exchange="my-exchange", type="direct", 
alternate_exchange="dlq")
+
+        #create a queue using the intermediary as its alternate exchange:
+        session.queue_declare(queue="delivery-queue", 
alternate_exchange="my-exchange", auto_delete=True)
+        #bind that queue to the dlq as well:
+        session.exchange_bind(exchange="dlq", queue="delivery-queue")
+        #send it some messages:
+        dp=self.session.delivery_properties(routing_key="delivery-queue")
+        for m in ["One", "Two", "Three"]:
+            session.message_transfer(message=Message(dp, m))
+
+        #get and reject those messages:
+        session.message_subscribe(destination="a", queue="delivery-queue")
+        session.message_flow(destination="a", 
unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="a", unit=session.credit_unit.byte, 
value=0xFFFFFFFFL)
+        incoming = session.incoming("a")
+        for m in ["One", "Two", "Three"]:
+            msg = incoming.get(timeout=1)
+            self.assertEqual(m, msg.body)
+            session.message_reject(RangedSet(msg.id))
+        session.message_cancel(destination="a")
+
+        #check the messages were delivered to the dlq:
+        for m in ["One", "Two", "Three"]:
+            self.assertEqual(m, dlq.get(timeout=1).body)
+        self.assertEmpty(dlq)
+        #cleanup:
+        session.exchange_delete(exchange="my-exchange")
+        session.exchange_delete(exchange="dlq")
+
+    def setup_dlq(self):
+        session = self.session
+        #set up 'dead-letter' handling:
+        session.exchange_declare(exchange="dlq", type="fanout")
+        session.queue_declare(queue="deleted", exclusive=True, 
auto_delete=True)
+        session.exchange_bind(exchange="dlq", queue="deleted")
+        session.message_subscribe(destination="dlq", queue="deleted")
+        session.message_flow(destination="dlq", 
unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="dlq", unit=session.credit_unit.byte, 
value=0xFFFFFFFFL)
+        dlq = session.incoming("dlq")
+        return dlq
 
     def assertEmpty(self, queue):
         try:



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to