Author: jonathan
Date: Thu Jan 13 20:54:03 2011
New Revision: 1058747
URL: http://svn.apache.org/viewvc?rev=1058747&view=rev
Log:
Fixes QPID-2499: Stale federation routes remain after route deletion.
Federated binds and unbinds need to know which federation origins
are associated with the bindings for each queue. When origins are
added or deleted, the corresponding bindings need to be
propagated.
fedBindings[queueName] contains the origins associated with the
given queue.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
qpid/trunk/qpid/cpp/src/tests/federation.py
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=1058747&r1=1058746&r2=1058747&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Thu Jan 13 20:54:03
2011
@@ -76,13 +76,13 @@ bool DirectExchange::bind(Queue::shared_
if (bk.queues.add_unless(b, MatchQueue(queue))) {
b->startManagement();
- propagate = bk.fedBinding.addOrigin(fedOrigin);
+ propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
}
} else {
// queue already present - still need to track fedOrigin
- bk.fedBinding.addOrigin(fedOrigin);
+ bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
return false;
}
} else if (fedOp == fedOpUnbind) {
@@ -90,11 +90,12 @@ bool DirectExchange::bind(Queue::shared_
BoundKey& bk = bindings[routingKey];
QPID_LOG(debug, "Bind - fedOpUnbind key [" << routingKey << "] queue "
<< queue->getName()
- << " (origin=" << fedOrigin << ")");
+ << " (origin=" << fedOrigin << ")" << " (count=" <<
bk.fedBinding.count() << ")");
- propagate = bk.fedBinding.delOrigin(fedOrigin);
- if (bk.fedBinding.count() == 0)
+ propagate = bk.fedBinding.delOrigin(queue->getName(), fedOrigin);
+ if (bk.fedBinding.countFedBindings(queue->getName()) == 0)
unbind(queue, routingKey, 0);
+
} else if (fedOp == fedOpReorigin) {
/** gather up all the keys that need rebinding in a local vector
* while holding the lock. Then propagate once the lock is
@@ -142,6 +143,7 @@ bool DirectExchange::unbind(Queue::share
}
}
+ // If I delete my local binding, propagate this unbind to any upstream
brokers
if (propagate)
propagateFedOp(routingKey, string(), fedOpUnbind, string());
return true;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=1058747&r1=1058746&r2=1058747&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Thu Jan 13 20:54:03 2011
@@ -95,46 +95,61 @@ protected:
bool operator()(Exchange::Binding::shared_ptr b);
};
+ /** A FedBinding keeps track of information that Federation needs
+ to know when to propagate changes.
+
+ Dynamic federation needs to know which exchanges have at least
+ one local binding. The bindings on these exchanges need to be
+ propagated.
+
+ Federated binds and unbinds need to know which federation
+ origins are associated with the bindings for each queue. When
+ origins are added or deleted, the corresponding bindings need
+ to be propagated.
+
+ fedBindings[queueName] contains the origins associated with
+ the given queue.
+ */
+
class FedBinding {
uint32_t localBindings;
- std::set<std::string> originSet;
+
+ typedef std::set<std::string> originSet;
+ std::map<std::string, originSet> fedBindings;
+
public:
FedBinding() : localBindings(0) {}
bool hasLocal() const { return localBindings != 0; }
- /**
- * Returns 'true' if and only if this is the first local
- * binding.
- *
- * The first local binding may need to be propagated.
- */
- bool addOrigin(const std::string& origin) {
+ /** Returns true if propagation is needed. */
+ bool addOrigin(const std::string& queueName, const std::string&
origin) {
if (origin.empty()) {
localBindings++;
return localBindings == 1;
}
- originSet.insert(origin);
+ fedBindings[queueName].insert(origin);
return true;
}
- bool delOrigin(const std::string& origin) {
- originSet.erase(origin);
+
+ /** Returns true if propagation is needed. */
+ bool delOrigin(const std::string& queueName, const std::string&
origin){
+ fedBindings[queueName].erase(origin);
return true;
}
- /**
- * Returns 'true' if and only if the last local binding is
- * deleted.
- *
- * When the last local binding is deleted, it may need to
- * be propagated.
- */
+ /** Returns true if propagation is needed. */
bool delOrigin() {
if (localBindings > 0)
localBindings--;
return localBindings == 0;
}
+
uint32_t count() {
- return localBindings + originSet.size();
+ return localBindings + fedBindings.size();
+ }
+
+ uint32_t countFedBindings(const std::string& queueName) {
+ return fedBindings[queueName].size();
}
};
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=1058747&r1=1058746&r2=1058747&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Thu Jan 13 20:54:03
2011
@@ -53,18 +53,18 @@ bool FanOutExchange::bind(Queue::shared_
Binding::shared_ptr binding (new Binding ("", queue, this,
FieldTable(), fedOrigin));
if (bindings.add_unless(binding, MatchQueue(queue))) {
binding->startManagement();
- propagate = fedBinding.addOrigin(fedOrigin);
+ propagate = fedBinding.addOrigin(queue->getName(), fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
}
} else {
// queue already present - still need to track fedOrigin
- fedBinding.addOrigin(fedOrigin);
+ fedBinding.addOrigin(queue->getName(), fedOrigin);
return false;
}
} else if (fedOp == fedOpUnbind) {
- propagate = fedBinding.delOrigin(fedOrigin);
- if (fedBinding.count() == 0)
+ propagate = fedBinding.delOrigin(queue->getName(), fedOrigin);
+ if (fedBinding.countFedBindings(queue->getName()) == 0)
unbind(queue, "", 0);
} else if (fedOp == fedOpReorigin) {
if (fedBinding.hasLocal()) {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=1058747&r1=1058746&r2=1058747&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Thu Jan 13 20:54:03
2011
@@ -116,12 +116,12 @@ bool HeadersExchange::bind(Queue::shared
BoundKey bk(binding);
if (bindings.add_unless(bk, MatchArgs(queue, args))) {
binding->startManagement();
- propagate = bk.fedBinding.addOrigin(fedOrigin);
+ propagate = bk.fedBinding.addOrigin(queue->getName(),
fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
}
} else {
- bk.fedBinding.addOrigin(fedOrigin);
+ bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
return false;
}
} // lock dropped
@@ -129,7 +129,7 @@ bool HeadersExchange::bind(Queue::shared
} else if (fedOp == fedOpUnbind) {
Mutex::ScopedLock l(lock);
- FedUnbindModifier modifier(fedOrigin);
+ FedUnbindModifier modifier(queue->getName(), fedOrigin);
bindings.modify_if(MatchKey(queue, bindingKey), modifier);
propagate = modifier.shouldPropagate;
if (modifier.shouldUnbind) {
@@ -325,7 +325,7 @@ bool HeadersExchange::MatchKey::operator
}
//----------
-HeadersExchange::FedUnbindModifier::FedUnbindModifier(string & origin) :
fedOrigin(origin), shouldUnbind(false), shouldPropagate(false) {}
+HeadersExchange::FedUnbindModifier::FedUnbindModifier(const string& queueName,
const string& origin) : queueName(queueName), fedOrigin(origin),
shouldUnbind(false), shouldPropagate(false) {}
HeadersExchange::FedUnbindModifier::FedUnbindModifier() : shouldUnbind(false),
shouldPropagate(false) {}
bool HeadersExchange::FedUnbindModifier::operator()(BoundKey & bk)
@@ -333,9 +333,9 @@ bool HeadersExchange::FedUnbindModifier:
if ("" == fedOrigin) {
shouldPropagate = bk.fedBinding.delOrigin();
} else {
- shouldPropagate = bk.fedBinding.delOrigin(fedOrigin);
+ shouldPropagate = bk.fedBinding.delOrigin(queueName, fedOrigin);
}
- if (bk.fedBinding.count() == 0)
+ if (bk.fedBinding.countFedBindings(queueName) == 0)
{
shouldUnbind = true;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?rev=1058747&r1=1058746&r2=1058747&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Thu Jan 13 20:54:03
2011
@@ -60,11 +60,12 @@ class HeadersExchange : public virtual E
struct FedUnbindModifier
{
+ std::string queueName;
std::string fedOrigin;
bool shouldUnbind;
bool shouldPropagate;
FedUnbindModifier();
- FedUnbindModifier(std::string & origin);
+ FedUnbindModifier(const std::string& queueName, const std::string&
origin);
bool operator()(BoundKey & bk);
};
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=1058747&r1=1058746&r2=1058747&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Thu Jan 13 20:54:03
2011
@@ -236,7 +236,7 @@ bool TopicExchange::bind(Queue::shared_p
for (q = qv.begin(); q != qv.end(); q++) {
if ((*q)->queue == queue) {
// already bound, but may be from a different fedOrigin
- bk->fedBinding.addOrigin(fedOrigin);
+ bk->fedBinding.addOrigin(queue->getName(), fedOrigin);
return false;
}
}
@@ -245,7 +245,7 @@ bool TopicExchange::bind(Queue::shared_p
binding->startManagement();
bk->bindingVector.push_back(binding);
nBindings++;
- propagate = bk->fedBinding.addOrigin(fedOrigin);
+ propagate = bk->fedBinding.addOrigin(queue->getName(), fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
}
@@ -258,8 +258,8 @@ bool TopicExchange::bind(Queue::shared_p
RWlock::ScopedWlock l(lock);
BindingKey* bk = bindingTree.getBindingKey(routingPattern);
if (bk) {
- propagate = bk->fedBinding.delOrigin(fedOrigin);
- reallyUnbind = bk->fedBinding.count() == 0;
+ propagate = bk->fedBinding.delOrigin(queue->getName(),
fedOrigin);
+ reallyUnbind =
bk->fedBinding.countFedBindings(queue->getName()) == 0;
}
}
if (reallyUnbind)
Modified: qpid/trunk/qpid/cpp/src/tests/federation.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=1058747&r1=1058746&r2=1058747&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/federation.py Thu Jan 13 20:54:03 2011
@@ -1200,15 +1200,15 @@ class FederationTests(TestBase010):
# @todo - restore code when QPID-2499 fixed!!
sleep(6)
# wait for the binding count on B1 to drop from 2 to 1
- # retries = 0
- # exchanges[1].update()
- # while exchanges[1].bindingCount != 1:
- # retries += 1
- # self.failIfEqual(retries, 10,
- # "unbinding failed to propagate to broker B1: %d"
- # % exchanges[1].bindingCount)
- # sleep(1)
- # exchanges[1].update()
+ retries = 0
+ exchanges[1].update()
+ while exchanges[1].bindingCount != 1:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "unbinding failed to propagate to broker B1: %d"
+ % exchanges[1].bindingCount)
+ sleep(1)
+ exchanges[1].update()
# send 10 msgs from B0
for i in range(11, 21):
@@ -1216,13 +1216,12 @@ class FederationTests(TestBase010):
self._brokers[0].client_session.message_transfer(destination="fedX.direct",
message=Message(dp, "Message_drp %d" % i))
# verify messages are forwarded to B3 only
- # note: why exchanges[1].msgRoutes == 40???, not 20??? QPID-2499?
retries = 0
for ex in exchanges:
ex.update()
while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20
or
- exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40
or
- exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10
or exchanges[2].msgRoutes != 10 or
+ exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 30
or
+ exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10
or
exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20):
retries += 1
self.failIfEqual(retries, 10,
@@ -1455,18 +1454,16 @@ class FederationTests(TestBase010):
self._brokers[2].client_session.message_cancel(destination="f1")
self._brokers[2].client_session.queue_delete(queue="fedX1")
- # @todo - restore code when QPID-2499 fixed!!
- sleep(6)
# wait for the binding count on B1 to drop from 2 to 1
- # retries = 0
- # exchanges[1].update()
- # while exchanges[1].bindingCount != 1:
- # retries += 1
- # self.failIfEqual(retries, 10,
- # "unbinding failed to propagate to broker B1: %d"
- # % exchanges[1].bindingCount)
- # sleep(1)
- # exchanges[1].update()
+ retries = 0
+ exchanges[1].update()
+ while exchanges[1].bindingCount != 1:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "unbinding failed to propagate to broker B1: %d"
+ % exchanges[1].bindingCount)
+ sleep(1)
+ exchanges[1].update()
# send 10 msgs from B0
for i in range(11, 21):
@@ -1474,13 +1471,12 @@ class FederationTests(TestBase010):
self._brokers[0].client_session.message_transfer(destination="fedX.topic",
message=Message(dp, "Message_trp %d" % i))
# verify messages are forwarded to B3 only
- # note: why exchanges[1].msgRoutes == 40???, not 20??? QPID-2499?
retries = 0
for ex in exchanges:
ex.update()
while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20
or
- exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40
or
- exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10
or exchanges[2].msgRoutes != 10 or
+ exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 30
or
+ exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10
or
exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20):
retries += 1
self.failIfEqual(retries, 10,
@@ -1714,18 +1710,16 @@ class FederationTests(TestBase010):
self._brokers[2].client_session.message_cancel(destination="f1")
self._brokers[2].client_session.queue_delete(queue="fedX1")
- # @todo - find a proper way to check the propagation here!
- sleep(6)
# wait for the binding count on B1 to drop from 2 to 1
- # retries = 0
- # exchanges[1].update()
- # while exchanges[1].bindingCount != 1:
- # retries += 1
- # self.failIfEqual(retries, 10,
- # "unbinding failed to propagate to broker B1: %d"
- # % exchanges[1].bindingCount)
- # sleep(1)
- # exchanges[1].update()
+ retries = 0
+ exchanges[1].update()
+ while exchanges[1].bindingCount != 1:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "unbinding failed to propagate to broker B1: %d"
+ % exchanges[1].bindingCount)
+ sleep(1)
+ exchanges[1].update()
# send 10 msgs from B0
for i in range(11, 21):
@@ -1733,13 +1727,12 @@ class FederationTests(TestBase010):
self._brokers[0].client_session.message_transfer(destination="fedX.fanout",
message=Message(dp, "Message_frp %d" % i))
# verify messages are forwarded to B3 only
- # note: why exchanges[1].msgRoutes == 40???, not 20??? QPID-2499?
retries = 0
for ex in exchanges:
ex.update()
while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20
or
- exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40
or
- exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10
or exchanges[2].msgRoutes != 10 or
+ exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 30
or
+ exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10
or
exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20):
retries += 1
self.failIfEqual(retries, 10,
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]