Author: aconway
Date: Tue Dec 11 21:50:03 2012
New Revision: 1420438
URL: http://svn.apache.org/viewvc?rev=1420438&view=rev
Log:
QPID-4481: HA replication of propagated bindings can lead to incorrect
configuration
When using dynamic federation between two independent HA broker clusters, it is
possible under certain failover scenarios for the propagated bindings on the
source broker to become out-of-sync with the true state of bindings on the
destination broker.
How reproducible:
Often -- race condition between re-establishment of federated link and the
deletion of a binding on the destination broker
Steps to Reproduce:
1. Start a stand-alone broker (route destination) and an HA broker (route
source w/ primary and backup)
2. Configure a dynamic federated route between a destination broker and a
source broker. The dynamic federation needs to utilize an existing,
non-auto-delete queue on the source broker.
3. Subscribe to an auto-delete queue on the destination broker and bind the
auto-delete queue to the exchange configured for the dynamic federation
4. Kill the primary source broker
5. Kill the subscription to the auto-delete queue on the destination broker
6. Promote the backup source broker to primary
Actual results:
With the loss of the client subscription to the auto-delete queue, the binding
will be removed. If the binding is removed prior to the re-establishment of
the federated link to the source broker, the unbind command will not propagate.
Since the backup source broker had previously replicated the propagated
binding, the binding will incorrectly remain on the source broker.
Expected results:
Propagated bindings should not be replicated from the primary to backups since
they are transient and will be recreated when the route is re-established.)))
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
qpid/trunk/qpid/cpp/src/tests/federation.py
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1420438&r1=1420437&r2=1420438&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Tue Dec 11 21:50:03 2012
@@ -49,6 +49,11 @@ using qpid::management::ManagementAgent;
using std::string;
namespace _qmf = qmf::org::apache::qpid::broker;
+namespace {
+const std::string QPID_REPLICATE("qpid.replicate");
+const std::string NONE("none");
+}
+
namespace qpid {
namespace broker {
@@ -333,6 +338,7 @@ void Bridge::propagateBinding(const stri
}
string newTagList(tagList + string(tagList.empty() ? "" : ",") +
localTag);
+ bindArgs.setString(QPID_REPLICATE, NONE);
bindArgs.setString(qpidFedOp, op);
bindArgs.setString(qpidFedTags, newTagList);
if (origin.empty())
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1420438&r1=1420437&r2=1420438&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Dec 11 21:50:03 2012
@@ -1250,6 +1250,7 @@ void Broker::bind(const std::string& que
QPID_LOG_CAT(debug, model, "Create binding. exchange:" <<
exchangeName
<< " queue:" << queueName
<< " key:" << key
+ << " arguments:" << arguments
<< " user:" << userId
<< " rhost:" << connectionId);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=1420438&r1=1420437&r2=1420438&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Tue Dec 11 21:50:03
2012
@@ -70,7 +70,7 @@ bool DirectExchange::bind(Queue::shared_
if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
Mutex::ScopedLock l(lock);
- Binding::shared_ptr b(new Binding(routingKey, queue, this,
FieldTable(), fedOrigin));
+ Binding::shared_ptr b(new Binding(routingKey, queue, this, args ?
*args : FieldTable(), fedOrigin));
BoundKey& bk = bindings[routingKey];
if (exclusiveBinding) bk.queues.clear();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=1420438&r1=1420437&r2=1420438&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Tue Dec 11 21:50:03
2012
@@ -54,7 +54,7 @@ bool FanOutExchange::bind(Queue::shared_
bool propagate = false;
if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
- Binding::shared_ptr binding (new Binding ("", queue, this,
FieldTable(), fedOrigin));
+ Binding::shared_ptr binding (new Binding ("", queue, this, args ?
*args : FieldTable(), fedOrigin));
if (bindings.add_unless(binding, MatchQueue(queue))) {
binding->startManagement();
propagate = fedBinding.addOrigin(queue->getName(), fedOrigin);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=1420438&r1=1420437&r2=1420438&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Tue Dec 11 21:50:03
2012
@@ -48,6 +48,7 @@ namespace {
const std::string empty;
// federation related args and values
+ const std::string QPID_RESERVED("qpid.");
const std::string qpidFedOp("qpid.fed.op");
const std::string qpidFedTags("qpid.fed.tags");
const std::string qpidFedOrigin("qpid.fed.origin");
@@ -200,8 +201,8 @@ bool HeadersExchange::bind(Queue::shared
//matching (they are internally added properties
//controlling binding propagation but not relevant to
//actual routing)
- Binding::shared_ptr binding (new Binding (bindingKey, queue, this,
extra_args));
- BoundKey bk(binding);
+ Binding::shared_ptr binding (new Binding (bindingKey, queue, this,
args ? *args : FieldTable()));
+ BoundKey bk(binding, extra_args);
if (bindings.add_unless(bk, MatchArgs(queue, &extra_args))) {
binding->startManagement();
propagate = bk.fedBinding.addOrigin(queue->getName(),
fedOrigin);
@@ -282,7 +283,7 @@ void HeadersExchange::route(Deliverable&
Bindings::ConstPtr p = bindings.snapshot();
if (p.get()) {
for (std::vector<BoundKey>::const_iterator i = p->begin(); i !=
p->end(); ++i) {
- Matcher matcher(i->binding->args);
+ Matcher matcher(i->args);
msg.getMessage().processProperties(matcher);
if (matcher.matches()) {
b->push_back(i->binding);
@@ -298,7 +299,7 @@ bool HeadersExchange::isBound(Queue::sha
Bindings::ConstPtr p = bindings.snapshot();
if (p.get()){
for (std::vector<BoundKey>::const_iterator i = p->begin(); i !=
p->end(); ++i) {
- if ( (!args || equal((*i).binding->args, *args)) && (!queue ||
(*i).binding->queue == queue)) {
+ if ( (!args || equal((*i).args, *args)) && (!queue ||
(*i).binding->queue == queue)) {
return true;
}
}
@@ -315,10 +316,7 @@ void HeadersExchange::getNonFedArgs(cons
for (qpid::framing::FieldTable::ValueMap::const_iterator i=args->begin();
i != args->end(); ++i)
{
- const string & name(i->first);
- if (name == qpidFedOp ||
- name == qpidFedTags ||
- name == qpidFedOrigin)
+ if (i->first.find(QPID_RESERVED) == 0)
{
continue;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?rev=1420438&r1=1420437&r2=1420438&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Tue Dec 11 21:50:03
2012
@@ -38,8 +38,9 @@ class HeadersExchange : public virtual E
struct BoundKey
{
Binding::shared_ptr binding;
+ qpid::framing::FieldTable args;
FedBinding fedBinding;
- BoundKey(Binding::shared_ptr binding_) : binding(binding_) {}
+ BoundKey(Binding::shared_ptr binding_, const
qpid::framing::FieldTable& args_) : binding(binding_), args(args_) {}
};
struct MatchArgs
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1420438&r1=1420437&r2=1420438&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Dec 11 21:50:03
2012
@@ -37,9 +37,11 @@
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/broker/FedOps.h"
#include <boost/bind.hpp>
#include <boost/format.hpp>
+#include <boost/tuple/tuple_comparison.hpp>
#include <iostream>
#include <sstream>
@@ -48,6 +50,11 @@
#include <assert.h>
+namespace {
+const std::string X_SCOPE("x-scope");
+const std::string SESSION("session");
+}
+
namespace qpid {
namespace broker {
@@ -87,6 +94,7 @@ void SemanticState::closed() {
if (dtxBuffer.get()) {
dtxBuffer->fail();
}
+ unbindSessionBindings();
requeue();
//now unsubscribe, which may trigger queue deletion and thus
@@ -803,4 +811,63 @@ void SemanticState::detached()
}
}
+void SemanticState::addBinding(const string& queueName, const string&
exchangeName,
+ const string& routingKey, const
framing::FieldTable& arguments)
+{
+ QPID_LOG (debug, "SemanticState::addBinding ["
+ << "queue=" << queueName << ", "
+ << "exchange=" << exchangeName << ", "
+ << "key=" << routingKey << ", "
+ << "args=" << arguments << "]");
+ std::string fedOp = arguments.getAsString(qpidFedOp);
+ if ((arguments.isSet(qpidFedOp)) && (fedOp.empty())) {
+ fedOp = fedOpBind;
+ }
+ std::string fedOrigin = arguments.getAsString(qpidFedOrigin);
+ if ((arguments.getAsString(X_SCOPE) == SESSION) || (fedOp == fedOpBind)) {
+ bindings.insert(boost::make_tuple(queueName, exchangeName, routingKey,
fedOrigin));
+ }
+ else if (fedOp == fedOpUnbind) {
+ bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey,
fedOrigin));
+ }
+}
+
+void SemanticState::removeBinding(const string& queueName, const string&
exchangeName,
+ const string& routingKey)
+{
+ QPID_LOG (debug, "SemanticState::removeBinding ["
+ << "queue=" << queueName << ", "
+ << "exchange=" << exchangeName << ", "
+ << "key=" << routingKey)
+ bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, ""));
+}
+
+void SemanticState::unbindSessionBindings()
+{
+ //unbind session-scoped bindings
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
+ QPID_LOG (debug, "SemanticState::unbindSessionBindings ["
+ << "queue=" << i->get<0>() << ", "
+ << "exchange=" << i->get<1>()<< ", "
+ << "key=" << i->get<2>() << ", "
+ << "fedOrigin=" << i->get<3>() << "]");
+ try {
+ std::string fedOrigin = i->get<3>();
+ if (!fedOrigin.empty()) {
+ framing::FieldTable fedArguments;
+ fedArguments.setString(qpidFedOp, fedOpUnbind);
+ fedArguments.setString(qpidFedOrigin, fedOrigin);
+ session.getBroker().bind(i->get<0>(), i->get<1>(),
i->get<2>(), fedArguments,
+ userID, connectionId);
+ } else {
+ session.getBroker().unbind(i->get<0>(), i->get<1>(),
i->get<2>(),
+ userID, connectionId);
+ }
+ }
+ catch (...) {
+ }
+ }
+ bindings.clear();
+}
+
}} // namespace qpid::broker
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1420438&r1=1420437&r2=1420438&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Tue Dec 11 21:50:03 2012
@@ -46,10 +46,12 @@
#include <list>
#include <map>
+#include <set>
#include <vector>
#include <boost/enable_shared_from_this.hpp>
#include <boost/cast.hpp>
+#include <boost/tuple/tuple.hpp>
namespace qpid {
namespace broker {
@@ -173,6 +175,8 @@ class SemanticState : private boost::non
private:
typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
+ typedef boost::tuple<std::string, std::string, std::string, std::string>
Binding;
+ typedef std::set<Binding> Bindings;
SessionState& session;
ConsumerImplMap consumers;
@@ -190,6 +194,8 @@ class SemanticState : private boost::non
//needed for queue delete events in auto-delete:
const std::string connectionId;
+ Bindings bindings;
+
void checkDtxTimeout();
bool complete(DeliveryRecord&);
@@ -197,6 +203,7 @@ class SemanticState : private boost::non
void requestDispatch();
void cancel(ConsumerImpl::shared_ptr);
void disable(ConsumerImpl::shared_ptr);
+ void unbindSessionBindings();
public:
@@ -271,6 +278,11 @@ class SemanticState : private boost::non
void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck =
s; }
void record(const DeliveryRecord& delivery);
DtxBufferMap& getSuspendedXids() { return suspendedXids; }
+
+ void addBinding(const std::string& queueName, const std::string&
exchangeName,
+ const std::string& routingKey, const framing::FieldTable&
arguments);
+ void removeBinding(const std::string& queueName, const std::string&
exchangeName,
+ const std::string& routingKey);
};
}} // namespace qpid::broker
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1420438&r1=1420437&r2=1420438&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Tue Dec 11 21:50:03
2012
@@ -154,12 +154,14 @@ void SessionAdapter::ExchangeHandlerImpl
{
getBroker().bind(queueName, exchangeName, routingKey, arguments,
getConnection().getUserId(), getConnection().getUrl());
+ state.addBinding(queueName, exchangeName, routingKey, arguments);
}
void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
const string& exchangeName,
const string& routingKey)
{
+ state.removeBinding(queueName, exchangeName, routingKey);
getBroker().unbind(queueName, exchangeName, routingKey,
getConnection().getUserId(), getConnection().getUrl());
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=1420438&r1=1420437&r2=1420438&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Tue Dec 11 21:50:03
2012
@@ -179,7 +179,7 @@ bool TopicExchange::bind(Queue::shared_p
}
}
- Binding::shared_ptr binding (new Binding (routingPattern, queue,
this, FieldTable(), fedOrigin));
+ Binding::shared_ptr binding (new Binding (routingPattern, queue,
this, args ? *args : FieldTable(), fedOrigin));
binding->startManagement();
bk->bindingVector.push_back(binding);
nBindings++;
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1420438&r1=1420437&r2=1420438&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Tue Dec 11 21:50:03
2012
@@ -534,17 +534,19 @@ void BrokerReplicator::doEventBind(Varia
exchanges.find(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
queues.find(values[QNAME].asString());
+ framing::FieldTable args;
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
// We only replicate binds for a replicated queue to replicated
// exchange that both exist locally.
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue &&
replicationTest.replicateLevel(queue->getSettings().storeSettings))
+ queue &&
replicationTest.replicateLevel(queue->getSettings().storeSettings) &&
+ replicationTest.replicateLevel(args))
{
- framing::FieldTable args;
- qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Bind event: exchange=" <<
exchange->getName()
<< " queue=" << queue->getName()
- << " key=" << key);
+ << " key=" << key
+ << " args=" << args);
queue->bind(exchange, key, args);
}
}
@@ -559,13 +561,11 @@ void BrokerReplicator::doEventUnbind(Var
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
queue &&
replicationTest.replicateLevel(queue->getSettings().storeSettings))
{
- framing::FieldTable args;
- qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Unbind event: exchange=" <<
exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
- exchange->unbind(queue, key, &args);
+ exchange->unbind(queue, key, 0);
}
}
@@ -692,16 +692,19 @@ void BrokerReplicator::doResponseBind(Va
boost::shared_ptr<Exchange> exchange = exchanges.find(exName);
boost::shared_ptr<Queue> queue = queues.find(qName);
+ framing::FieldTable args;
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+
// Automatically replicate binding if queue and exchange exist and are
replicated
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue &&
replicationTest.replicateLevel(queue->getSettings().storeSettings))
+ queue &&
replicationTest.replicateLevel(queue->getSettings().storeSettings) &&
+ replicationTest.replicateLevel(args))
{
string key = values[BINDING_KEY].asString();
QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
<< " queue:" << qName
- << " key:" << key);
- framing::FieldTable args;
- qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+ << " key:" << key
+ << " args:" << args);
queue->bind(exchange, key, args);
}
}
Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1420438&r1=1420437&r2=1420438&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Tue Dec 11 21:50:03
2012
@@ -1196,6 +1196,27 @@ QPID_AUTO_TEST_CASE(testBrowseOnly)
fix.session.acknowledge();
}
+QPID_AUTO_TEST_CASE(testLinkBindingCleanup)
+{
+ MessagingFixture fix;
+
+ Sender sender =
fix.session.createSender("test.ex;{create:always,node:{type:topic}}");
+
+ Connection connection = fix.newConnection();
+ connection.open();
+
+ Session session(connection.createSession());
+ Receiver receiver1 = session.createReceiver("test.q;{create:always,
node:{type:queue,
x-bindings:[{exchange:test.ex,queue:test.q,key:#,arguments:{x-scope:session}}]}}");
+ Receiver receiver2 = fix.session.createReceiver("test.q;{create:never,
delete:always}");
+ connection.close();
+
+ sender.send(Message("test-message"), true);
+
+ // The session-scoped binding should be removed when receiver1's network
connection is lost
+ Message in;
+ BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE));
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
Modified: qpid/trunk/qpid/cpp/src/tests/federation.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=1420438&r1=1420437&r2=1420438&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/federation.py Tue Dec 11 21:50:03 2012
@@ -2604,3 +2604,109 @@ class FederationTests(TestBase010):
self.verify_cleanup()
+ def test_dynamic_bounce_unbinds_named_queue(self):
+ """ Verify that a propagated binding is removed when the connection is
+ bounced
+ """
+ session = self.session
+
+ # create the federation
+
+ self.startQmf()
+ qmf = self.qmf
+
+ self._setup_brokers()
+
+ # create exchange on each broker, and retrieve the corresponding
+ # management object for that exchange
+
+ exchanges=[]
+ for _b in self._brokers[0:2]:
+ _b.client_session.exchange_declare(exchange="fedX", type="direct")
+
self.assertEqual(_b.client_session.exchange_query(name="fedX").type,
+ "direct", "exchange_declare failed!")
+ # pull the exchange out of qmf...
+ retries = 0
+ my_exchange = None
+ timeout = time() + 10
+ while my_exchange is None and time() <= timeout:
+ objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange")
+ for ooo in objs:
+ if ooo.name == "fedX":
+ my_exchange = ooo
+ break
+ if my_exchange is None:
+ self.fail("QMF failed to find new exchange!")
+ exchanges.append(my_exchange)
+
+ # on the destination broker, create a binding for propagation
+ self._brokers[0].client_session.queue_declare(queue="fedDstQ")
+ self._brokers[0].client_session.exchange_bind(queue="fedDstQ",
exchange="fedX", binding_key="spud")
+
+ # on the source broker, create a bridge queue
+ self._brokers[1].client_session.queue_declare(queue="fedSrcQ")
+
+ # connect B1 --> B0
+ result = self._brokers[0].qmf_object.create( "link",
+ "Link-dynamic",
+
{"host":self._brokers[1].host,
+
"port":self._brokers[1].port}, False)
+ self.assertEqual(result.status, 0)
+
+ # bridge the "fedX" exchange:
+ result = self._brokers[0].qmf_object.create("bridge",
+ "Bridge-dynamic",
+ {"link":"Link-dynamic",
+ "src":"fedX",
+ "dest":"fedX",
+ "dynamic":True,
+ "queue":"fedSrcQ"}, False)
+ self.assertEqual(result.status, 0)
+
+ # wait for the inter-broker links to become operational
+ operational = False
+ timeout = time() + 10
+ while not operational and time() <= timeout:
+ operational = True
+ for _l in qmf.getObjects(_class="link"):
+ #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state)))
+ if _l.state != "Operational":
+ operational = False
+ self.failUnless(operational, "inter-broker links failed to become
operational.")
+
+ # wait until the binding key has propagated to the src broker
+ exchanges[1].update()
+ timeout = time() + 10
+ while exchanges[1].bindingCount < 1 and time() <= timeout:
+ exchanges[1].update()
+ self.failUnless(exchanges[1].bindingCount == 1)
+
+ #
+ # Tear down the bridges between the two exchanges, then wait
+ # for the bindings to be cleaned up
+ #
+ for _b in qmf.getObjects(_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+ exchanges[1].update()
+ timeout = time() + 10
+ while exchanges[1].bindingCount != 0 and time() <= timeout:
+ exchanges[1].update()
+ self.failUnless(exchanges[1].bindingCount == 0)
+
+ self._brokers[1].client_session.queue_delete(queue="fedSrcQ")
+
+ for _b in qmf.getObjects(_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+
+ for _l in qmf.getObjects(_class="link"):
+ result = _l.close()
+ self.assertEqual(result.status, 0)
+
+ for _b in self._brokers[0:2]:
+ _b.client_session.exchange_delete(exchange="fedX")
+
+ self._teardown_brokers()
+
+ self.verify_cleanup()
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1420438&r1=1420437&r2=1420438&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Tue Dec 11 21:50:03 2012
@@ -478,6 +478,23 @@ class ReplicationTests(HaBrokerTest):
self.fail("Excpected no-such-queue exception")
except NotFound: pass
+ def test_replicate_binding(self):
+ """Verify that binding replication can be disabled"""
+ primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
+ primary.promote()
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
+ ps = primary.connect().session()
+
ps.sender("ex;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all},
type:'fanout'}}}")
+
ps.sender("q;{create:always,node:{type:queue,x-declare:{arguments:{'qpid.replicate':all}},x-bindings:[{exchange:'ex',queue:'q',key:'',arguments:{'qpid.replicate':none}}]}}")
+ backup.wait_backup("q")
+
+ primary.kill()
+ assert retry(lambda: not is_running(primary.pid)) # Wait for primary
to die
+ backup.promote()
+ bs = backup.connect_admin().session()
+ bs.sender("ex").send(Message("msg"))
+ self.assert_browse_retry(bs, "q", [])
+
def test_invalid_replication(self):
"""Verify that we reject an attempt to declare a queue with invalid
replication value."""
cluster = HaCluster(self, 1, ha_replicate="all")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]