Author: jonathan
Date: Thu Jan 13 20:54:03 2011
New Revision: 1058747

URL: http://svn.apache.org/viewvc?rev=1058747&view=rev
Log:
Fixes QPID-2499: Stale federation routes remain after route deletion.

Federated binds and unbinds need to know which federation origins
are associated with the bindings for each queue. When origins are
added or deleted, the corresponding bindings need to be
propagated.

fedBindings[queueName] contains the origins associated with the
given queue.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
    qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
    qpid/trunk/qpid/cpp/src/tests/federation.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=1058747&r1=1058746&r2=1058747&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Thu Jan 13 20:54:03 
2011
@@ -76,13 +76,13 @@ bool DirectExchange::bind(Queue::shared_
 
         if (bk.queues.add_unless(b, MatchQueue(queue))) {
             b->startManagement();
-            propagate = bk.fedBinding.addOrigin(fedOrigin);
+            propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
             if (mgmtExchange != 0) {
                 mgmtExchange->inc_bindingCount();
             }
         } else {
             // queue already present - still need to track fedOrigin
-            bk.fedBinding.addOrigin(fedOrigin);
+            bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
             return false;
         }
     } else if (fedOp == fedOpUnbind) {
@@ -90,11 +90,12 @@ bool DirectExchange::bind(Queue::shared_
         BoundKey& bk = bindings[routingKey];
 
         QPID_LOG(debug, "Bind - fedOpUnbind key [" << routingKey << "] queue " 
<< queue->getName()
-                 << " (origin=" << fedOrigin << ")");
+                 << " (origin=" << fedOrigin << ")" << " (count=" << 
bk.fedBinding.count() << ")");
 
-        propagate = bk.fedBinding.delOrigin(fedOrigin);
-        if (bk.fedBinding.count() == 0)
+        propagate = bk.fedBinding.delOrigin(queue->getName(), fedOrigin);
+        if (bk.fedBinding.countFedBindings(queue->getName()) == 0)
             unbind(queue, routingKey, 0);
+
     } else if (fedOp == fedOpReorigin) {
         /** gather up all the keys that need rebinding in a local vector
          * while holding the lock.  Then propagate once the lock is
@@ -142,6 +143,7 @@ bool DirectExchange::unbind(Queue::share
         }
     }
 
+    // If I delete my local binding, propagate this unbind to any upstream 
brokers
     if (propagate)
         propagateFedOp(routingKey, string(), fedOpUnbind, string());
     return true;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=1058747&r1=1058746&r2=1058747&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Thu Jan 13 20:54:03 2011
@@ -95,46 +95,61 @@ protected:
         bool operator()(Exchange::Binding::shared_ptr b);
     };
 
+    /** A FedBinding keeps track of information that Federation needs
+        to know when to propagate changes.
+
+        Dynamic federation needs to know which exchanges have at least
+        one local binding. The bindings on these exchanges need to be
+        propagated.
+
+        Federated binds and unbinds need to know which federation
+        origins are associated with the bindings for each queue. When
+        origins are added or deleted, the corresponding bindings need
+        to be propagated.
+
+        fedBindings[queueName] contains the origins associated with
+        the given queue.
+    */
+
     class FedBinding {
         uint32_t localBindings;
-        std::set<std::string> originSet;
+
+        typedef std::set<std::string> originSet;
+        std::map<std::string, originSet> fedBindings;
+
     public:
         FedBinding() : localBindings(0) {}
         bool hasLocal() const { return localBindings != 0; }
 
-        /**
-         *  Returns 'true' if and only if this is the first local
-         *  binding.
-         *
-         *  The first local binding may need to be propagated.
-         */
-        bool addOrigin(const std::string& origin) {
+        /** Returns true if propagation is needed. */
+        bool addOrigin(const std::string& queueName, const std::string& 
origin) {
             if (origin.empty()) {
                 localBindings++;
                 return localBindings == 1;
             }
-            originSet.insert(origin);
+            fedBindings[queueName].insert(origin);
             return true;
         }
-        bool delOrigin(const std::string& origin) {
-            originSet.erase(origin);
+
+        /** Returns true if propagation is needed. */
+        bool delOrigin(const std::string& queueName, const std::string& 
origin){
+            fedBindings[queueName].erase(origin);
             return true;
         }
 
-        /**
-         *  Returns 'true' if and only if the last local binding is
-         *  deleted.
-         *
-         *  When the last local binding is deleted, it may need to
-         *  be propagated.
-         */
+        /** Returns true if propagation is needed. */
         bool delOrigin() {
             if (localBindings > 0)
                 localBindings--;
             return localBindings == 0;
         }
+
         uint32_t count() {
-            return localBindings + originSet.size();
+            return localBindings + fedBindings.size();
+        }
+
+        uint32_t countFedBindings(const std::string& queueName) {
+            return  fedBindings[queueName].size();
         }
     };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=1058747&r1=1058746&r2=1058747&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Thu Jan 13 20:54:03 
2011
@@ -53,18 +53,18 @@ bool FanOutExchange::bind(Queue::shared_
         Binding::shared_ptr binding (new Binding ("", queue, this, 
FieldTable(), fedOrigin));
         if (bindings.add_unless(binding, MatchQueue(queue))) {
             binding->startManagement();
-            propagate = fedBinding.addOrigin(fedOrigin);
+            propagate = fedBinding.addOrigin(queue->getName(), fedOrigin);
             if (mgmtExchange != 0) {
                 mgmtExchange->inc_bindingCount();
             }
         } else {
             // queue already present - still need to track fedOrigin
-            fedBinding.addOrigin(fedOrigin);
+            fedBinding.addOrigin(queue->getName(), fedOrigin);
             return false;
         }
     } else if (fedOp == fedOpUnbind) {
-        propagate = fedBinding.delOrigin(fedOrigin);
-        if (fedBinding.count() == 0)
+        propagate = fedBinding.delOrigin(queue->getName(), fedOrigin);
+        if (fedBinding.countFedBindings(queue->getName()) == 0)
             unbind(queue, "", 0);
     } else if (fedOp == fedOpReorigin) {
         if (fedBinding.hasLocal()) {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=1058747&r1=1058746&r2=1058747&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Thu Jan 13 20:54:03 
2011
@@ -116,12 +116,12 @@ bool HeadersExchange::bind(Queue::shared
             BoundKey bk(binding);
             if (bindings.add_unless(bk, MatchArgs(queue, args))) {
                 binding->startManagement();
-                propagate = bk.fedBinding.addOrigin(fedOrigin);
+                propagate = bk.fedBinding.addOrigin(queue->getName(), 
fedOrigin);
                 if (mgmtExchange != 0) {
                     mgmtExchange->inc_bindingCount();
                 }
             } else {
-                bk.fedBinding.addOrigin(fedOrigin);
+                bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
                 return false;
             }
         } // lock dropped
@@ -129,7 +129,7 @@ bool HeadersExchange::bind(Queue::shared
     } else if (fedOp == fedOpUnbind) {
         Mutex::ScopedLock l(lock);
  
-        FedUnbindModifier modifier(fedOrigin);
+        FedUnbindModifier modifier(queue->getName(), fedOrigin);
         bindings.modify_if(MatchKey(queue, bindingKey), modifier);
         propagate = modifier.shouldPropagate;
         if (modifier.shouldUnbind) {
@@ -325,7 +325,7 @@ bool HeadersExchange::MatchKey::operator
 }
 
 //----------
-HeadersExchange::FedUnbindModifier::FedUnbindModifier(string & origin) : 
fedOrigin(origin), shouldUnbind(false), shouldPropagate(false) {}
+HeadersExchange::FedUnbindModifier::FedUnbindModifier(const string& queueName, 
const string& origin) : queueName(queueName), fedOrigin(origin), 
shouldUnbind(false), shouldPropagate(false) {}
 HeadersExchange::FedUnbindModifier::FedUnbindModifier() : shouldUnbind(false), 
shouldPropagate(false) {}
 
 bool HeadersExchange::FedUnbindModifier::operator()(BoundKey & bk)
@@ -333,9 +333,9 @@ bool HeadersExchange::FedUnbindModifier:
     if ("" == fedOrigin) {
         shouldPropagate = bk.fedBinding.delOrigin();
     } else {
-        shouldPropagate = bk.fedBinding.delOrigin(fedOrigin);
+        shouldPropagate = bk.fedBinding.delOrigin(queueName, fedOrigin);
     }
-    if (bk.fedBinding.count() == 0)
+    if (bk.fedBinding.countFedBindings(queueName) == 0)
     {
         shouldUnbind = true;
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?rev=1058747&r1=1058746&r2=1058747&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Thu Jan 13 20:54:03 
2011
@@ -60,11 +60,12 @@ class HeadersExchange : public virtual E
 
     struct FedUnbindModifier
     {
+        std::string queueName;
         std::string fedOrigin;
         bool shouldUnbind;
         bool shouldPropagate;
         FedUnbindModifier();
-        FedUnbindModifier(std::string & origin);
+        FedUnbindModifier(const std::string& queueName, const std::string& 
origin);
         bool operator()(BoundKey & bk);
     };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=1058747&r1=1058746&r2=1058747&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Thu Jan 13 20:54:03 
2011
@@ -236,7 +236,7 @@ bool TopicExchange::bind(Queue::shared_p
             for (q = qv.begin(); q != qv.end(); q++) {
                 if ((*q)->queue == queue) {
                     // already bound, but may be from a different fedOrigin
-                    bk->fedBinding.addOrigin(fedOrigin);
+                    bk->fedBinding.addOrigin(queue->getName(), fedOrigin);
                     return false;
                 }
             }
@@ -245,7 +245,7 @@ bool TopicExchange::bind(Queue::shared_p
             binding->startManagement();
             bk->bindingVector.push_back(binding);
             nBindings++;
-            propagate = bk->fedBinding.addOrigin(fedOrigin);
+            propagate = bk->fedBinding.addOrigin(queue->getName(), fedOrigin);
             if (mgmtExchange != 0) {
                 mgmtExchange->inc_bindingCount();
             }
@@ -258,8 +258,8 @@ bool TopicExchange::bind(Queue::shared_p
             RWlock::ScopedWlock l(lock);
             BindingKey* bk = bindingTree.getBindingKey(routingPattern);
             if (bk) {
-                propagate = bk->fedBinding.delOrigin(fedOrigin);
-                reallyUnbind = bk->fedBinding.count() == 0;
+                propagate = bk->fedBinding.delOrigin(queue->getName(), 
fedOrigin);
+                reallyUnbind = 
bk->fedBinding.countFedBindings(queue->getName()) == 0;
             }
         }
         if (reallyUnbind)

Modified: qpid/trunk/qpid/cpp/src/tests/federation.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=1058747&r1=1058746&r2=1058747&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/federation.py Thu Jan 13 20:54:03 2011
@@ -1200,15 +1200,15 @@ class FederationTests(TestBase010):
         # @todo - restore code when QPID-2499 fixed!!
         sleep(6)
         # wait for the binding count on B1 to drop from 2 to 1
-        # retries = 0
-        # exchanges[1].update()
-        # while exchanges[1].bindingCount != 1:
-        #     retries += 1
-        #     self.failIfEqual(retries, 10,
-        #                      "unbinding failed to propagate to broker B1: %d"
-        #                      % exchanges[1].bindingCount)
-        #     sleep(1)
-        #     exchanges[1].update()
+        retries = 0
+        exchanges[1].update()
+        while exchanges[1].bindingCount != 1:
+            retries += 1
+            self.failIfEqual(retries, 10,
+                             "unbinding failed to propagate to broker B1: %d"
+                             % exchanges[1].bindingCount)
+            sleep(1)
+            exchanges[1].update()
 
         # send 10 msgs from B0
         for i in range(11, 21):
@@ -1216,13 +1216,12 @@ class FederationTests(TestBase010):
             
self._brokers[0].client_session.message_transfer(destination="fedX.direct", 
message=Message(dp, "Message_drp %d" % i))
 
         # verify messages are forwarded to B3 only
-        # note: why exchanges[1].msgRoutes == 40???, not 20???  QPID-2499?
         retries = 0
         for ex in exchanges:
             ex.update()
         while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 
or
-               exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 
or
-               exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 
or exchanges[2].msgRoutes != 10 or
+               exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 30 
or
+               exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 
or
                exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20):
             retries += 1
             self.failIfEqual(retries, 10,
@@ -1455,18 +1454,16 @@ class FederationTests(TestBase010):
         self._brokers[2].client_session.message_cancel(destination="f1")
         self._brokers[2].client_session.queue_delete(queue="fedX1")
 
-        # @todo - restore code when QPID-2499 fixed!!
-        sleep(6)
         # wait for the binding count on B1 to drop from 2 to 1
-        # retries = 0
-        # exchanges[1].update()
-        # while exchanges[1].bindingCount != 1:
-        #     retries += 1
-        #     self.failIfEqual(retries, 10,
-        #                      "unbinding failed to propagate to broker B1: %d"
-        #                      % exchanges[1].bindingCount)
-        #     sleep(1)
-        #     exchanges[1].update()
+        retries = 0
+        exchanges[1].update()
+        while exchanges[1].bindingCount != 1:
+            retries += 1
+            self.failIfEqual(retries, 10,
+                             "unbinding failed to propagate to broker B1: %d"
+                             % exchanges[1].bindingCount)
+            sleep(1)
+            exchanges[1].update()
 
         # send 10 msgs from B0
         for i in range(11, 21):
@@ -1474,13 +1471,12 @@ class FederationTests(TestBase010):
             
self._brokers[0].client_session.message_transfer(destination="fedX.topic", 
message=Message(dp, "Message_trp %d" % i))
 
         # verify messages are forwarded to B3 only
-        # note: why exchanges[1].msgRoutes == 40???, not 20???  QPID-2499?
         retries = 0
         for ex in exchanges:
             ex.update()
         while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 
or
-               exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 
or
-               exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 
or exchanges[2].msgRoutes != 10 or
+               exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 30 
or
+               exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 
or
                exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20):
             retries += 1
             self.failIfEqual(retries, 10,
@@ -1714,18 +1710,16 @@ class FederationTests(TestBase010):
         self._brokers[2].client_session.message_cancel(destination="f1")
         self._brokers[2].client_session.queue_delete(queue="fedX1")
 
-        # @todo - find a proper way to check the propagation here!
-        sleep(6)
         # wait for the binding count on B1 to drop from 2 to 1
-        # retries = 0
-        # exchanges[1].update()
-        # while exchanges[1].bindingCount != 1:
-        #     retries += 1
-        #     self.failIfEqual(retries, 10,
-        #                      "unbinding failed to propagate to broker B1: %d"
-        #                      % exchanges[1].bindingCount)
-        #     sleep(1)
-        #     exchanges[1].update()
+        retries = 0
+        exchanges[1].update()
+        while exchanges[1].bindingCount != 1:
+            retries += 1
+            self.failIfEqual(retries, 10,
+                             "unbinding failed to propagate to broker B1: %d"
+                             % exchanges[1].bindingCount)
+            sleep(1)
+            exchanges[1].update()
 
         # send 10 msgs from B0
         for i in range(11, 21):
@@ -1733,13 +1727,12 @@ class FederationTests(TestBase010):
             
self._brokers[0].client_session.message_transfer(destination="fedX.fanout", 
message=Message(dp, "Message_frp %d" % i))
 
         # verify messages are forwarded to B3 only
-        # note: why exchanges[1].msgRoutes == 40???, not 20???  QPID-2499?
         retries = 0
         for ex in exchanges:
             ex.update()
         while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 
or
-               exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 
or
-               exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 
or exchanges[2].msgRoutes != 10 or
+               exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 30 
or
+               exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 
or
                exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20):
             retries += 1
             self.failIfEqual(retries, 10,



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to