Author: aconway
Date: Thu Jan 19 23:03:20 2012
New Revision: 1233643
URL: http://svn.apache.org/viewvc?rev=1233643&view=rev
Log:
QPID-3603: Replicate bindings to backup brokers.
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1233643&r1=1233642&r2=1233643&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp Thu Jan 19
23:03:20 2012
@@ -37,70 +37,95 @@ namespace ha {
using namespace framing;
using namespace broker;
using types::Variant;
+using std::string;
namespace {
-const std::string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
+const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
+const string _WHAT("_what");
+const string _CLASS_NAME("_class_name");
+const string _PACKAGE_NAME("_package_name");
+const string _SCHEMA_ID("_schema_id");
+const string OBJECT("OBJECT");
+const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
+const string QMF_DEFAULT_DIRECT("qmf.default.direct");
+const string QMF2("qmf2");
+const string QMF_OPCODE("qmf.opcode");
+const string _QUERY_REQUEST("_query_request");
+const string BROKER("broker");
+}
+
+void sendQuery(const string className, const string& queueName,
SessionHandler& sessionHandler) {
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ Variant::Map request;
+ request[_WHAT] = OBJECT;
+ Variant::Map schema;
+ schema[_CLASS_NAME] = className;
+ schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
+ request[_SCHEMA_ID] = schema;
+
+ AMQFrame method((MessageTransferBody(ProtocolVersion(),
QMF_DEFAULT_DIRECT, 0, 0)));
+ method.setBof(true);
+ method.setEof(false);
+ method.setBos(true);
+ method.setEos(true);
+ AMQHeaderBody headerBody;
+ MessageProperties* props = headerBody.get<MessageProperties>(true);
+ props->setReplyTo(qpid::framing::ReplyTo("", queueName));
+ props->setAppId(QMF2);
+ props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST);
+
headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER);
+ AMQFrame header(headerBody);
+ header.setBof(false);
+ header.setEof(false);
+ header.setBos(true);
+ header.setEos(true);
+ AMQContentBody data;
+ qpid::amqp_0_10::MapCodec::encode(request, data.getData());
+ AMQFrame content(data);
+ content.setBof(false);
+ content.setEof(true);
+ content.setBos(true);
+ content.setEos(true);
+ sessionHandler.out->handle(method);
+ sessionHandler.out->handle(header);
+ sessionHandler.out->handle(content);
+}
+
+namespace {
+const string QMF_DEFAULT_TOPIC("qmf.default.topic");
+const string
AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
+const string QUEUE("queue");
+const string EXCHANGE("exchange");
+const string BINDING("binding");
}
// Initialize a bridge as a wiring replicator.
void bridgeInitWiringReplicator(Bridge& bridge, SessionHandler&
sessionHandler) {
framing::AMQP_ServerProxy peer(sessionHandler.out);
- std::string queueName = bridge.getQueueName();
+ string queueName = bridge.getQueueName();
const qmf::org::apache::qpid::broker::ArgsLinkBridge&
args(bridge.getArgs());
//declare and bind an event queue
peer.getQueue().declare(queueName, "", false, false, true, true,
FieldTable());
- peer.getExchange().bind(queueName, "qmf.default.topic",
"agent.ind.event.org_apache_qpid_broker.#", FieldTable());
+ peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC,
AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
//subscribe to the queue
peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0,
FieldTable());
peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
//issue a query request for queues and another for exchanges using event
queue as the reply-to address
- for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable
utility functions
- Variant::Map request;
- request["_what"] = "OBJECT";
- Variant::Map schema;
- schema["_class_name"] = (i == 0 ? "queue" : "exchange");
- schema["_package_name"] = "org.apache.qpid.broker";
- request["_schema_id"] = schema;
-
- AMQFrame method((MessageTransferBody(ProtocolVersion(),
"qmf.default.direct", 0, 0)));
- method.setBof(true);
- method.setEof(false);
- method.setBos(true);
- method.setEos(true);
- AMQHeaderBody headerBody;
- MessageProperties* props = headerBody.get<MessageProperties>(true);
- props->setReplyTo(qpid::framing::ReplyTo("", queueName));
- props->setAppId("qmf2");
- props->getApplicationHeaders().setString("qmf.opcode",
"_query_request");
-
headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker");
- AMQFrame header(headerBody);
- header.setBof(false);
- header.setEof(false);
- header.setBos(true);
- header.setEos(true);
- AMQContentBody data;
- qpid::amqp_0_10::MapCodec::encode(request, data.getData());
- AMQFrame content(data);
- content.setBof(false);
- content.setEof(true);
- content.setBos(true);
- content.setEos(true);
- sessionHandler.out->handle(method);
- sessionHandler.out->handle(header);
- sessionHandler.out->handle(content);
- }
+ sendQuery(QUEUE, queueName, sessionHandler);
+ sendQuery(EXCHANGE, queueName, sessionHandler);
+ sendQuery(BINDING, queueName, sessionHandler);
}
Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
// Create a link to replicate wiring
- if (s.brokerUrl != "dummy") {
+ if (s.brokerUrl != "dummy") { // FIXME aconway 2011-11-22: temporary hack
to identify primary.
Url url(s.brokerUrl);
QPID_LOG(info, "HA backup broker connecting to: " << url);
- std::string protocol = url[0].protocol.empty() ? "tcp" :
url[0].protocol;
+ string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
broker.getLinks().declare( // Declare the link
url[0].host, url[0].port, protocol,
false, // durable
@@ -111,7 +136,7 @@ Backup::Backup(broker::Broker& b, const
false, // durable
QPID_WIRING_REPLICATOR, // src
QPID_WIRING_REPLICATOR, // dest
- "x", // key
+ "", // key
false, // isQueue
false, // isLocal
"", // id/tag
@@ -121,8 +146,7 @@ Backup::Backup(broker::Broker& b, const
bridgeInitWiringReplicator
);
}
- // FIXME aconway 2011-11-17: need to enhance the link code to
- // handle discovery of the primary broker and fail-over correctly.
+ // FIXME aconway 2011-11-17: handle discovery of the primary broker and
fail-over correctly.
}
}} // namespace qpid::ha
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1233643&r1=1233642&r2=1233643&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Thu Jan
19 23:03:20 2012
@@ -52,6 +52,7 @@ const string ALL("all");
const string WIRING("wiring");
const string CLASS_NAME("_class_name");
+const string OBJECT_NAME("_object_name");
const string PACKAGE_NAME("_package_name");
const string VALUES("_values");
const string EVENT("_event");
@@ -59,10 +60,11 @@ const string SCHEMA_ID("_schema_id");
const string QUERY_RESPONSE("_query_response");
const string ARGUMENTS("arguments");
+const string ARGS("args");
const string QUEUE("queue");
const string EXCHANGE("exchange");
const string BIND("bind");
-const string ARGS("args");
+const string BINDING("binding");
const string DURABLE("durable");
const string QNAME("qName");
const string AUTODEL("autoDel");
@@ -116,19 +118,16 @@ WiringReplicator::WiringReplicator(const
WiringReplicator::~WiringReplicator() {}
void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const
framing::FieldTable* headers) {
+ Variant::List list;
try {
- // FIXME aconway 2011-11-21: outer error handling, e.g. for decoding
error.
if (!isQMFv2(msg.getMessage()) || !headers)
throw Exception("Unexpected message, not QMF2 event or query
response.");
// decode as list
string content = msg.getMessage().getFrames().getContent();
- Variant::List list;
amqp_0_10::ListCodec::decode(content, list);
- QPID_LOG(critical, "FIXME WiringReplicator message: " << list);
if (headers->getAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end();
++i) {
- // FIXME aconway 2011-11-18: should be iterating list?
Variant::Map& map = list.front().asMap();
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
@@ -142,33 +141,32 @@ void WiringReplicator::route(Deliverable
else throw(Exception(QPID_MSG("WiringReplicator received
unexpected event, schema=" << schema)));
}
} else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
- QPID_LOG(critical, "FIXME WiringReplicator response: " << list);
for (Variant::List::iterator i = list.begin(); i != list.end();
++i) {
string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME];
Variant::Map& values = i->asMap()[VALUES].asMap();
- if (isReplicated(values[ARGUMENTS].asMap())) {
- framing::FieldTable args;
- amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
- if (type == QUEUE) doResponseQueue(values);
- else if (type == EXCHANGE) doResponseExchange(values);
- else if (type == BIND) doResponseBind(values);
- else throw Exception(QPID_MSG("Ignoring unexpected class:
" << type));
- }
+ framing::FieldTable args;
+ amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
+ if (type == QUEUE) doResponseQueue(values);
+ else if (type == EXCHANGE) doResponseExchange(values);
+ else if (type == BINDING) doResponseBind(values);
+ else throw Exception(QPID_MSG("Ignoring unexpected class: " <<
type));
}
} else {
- QPID_LOG(warning, QPID_MSG("Ignoring QMFv2 message with headers: "
<< *headers));
+ QPID_LOG(warning, QPID_MSG("Replicator: Ignoring QMFv2 message
with headers: " << *headers));
}
} catch (const std::exception& e) {
- QPID_LOG(warning, "Error replicating configuration: " << e.what());
+ QPID_LOG(warning, "Replicator: Error replicating configuration: " <<
e.what());
+ QPID_LOG(debug, "Replicator: Error processing: " << list);
}
}
void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
string name = values[QNAME].asString();
- if (values[DISP] == CREATED && isReplicated(values[ARGS].asMap())) {
+ Variant::Map argsMap = values[ARGS].asMap();
+ if (values[DISP] == CREATED && isReplicated(argsMap)) {
QPID_LOG(debug, "Creating replicated queue " << name);
framing::FieldTable args;
- amqp_0_10::translate(values[ARGS].asMap(), args);
+ amqp_0_10::translate(argsMap, args);
if (!broker.createQueue(
name,
values[DURABLE].asBool(),
@@ -178,6 +176,7 @@ void WiringReplicator::doEventQueueDecla
args,
values[USER].asString(),
values[RHOST].asString()).second) {
+ // FIXME aconway 2011-11-22: should delete old queue and re-create
from exchanges.
QPID_LOG(warning, "Replicated queue " << name << " already
exists");
}
}
@@ -201,7 +200,6 @@ void WiringReplicator::doEventExchangeDe
string name = values[EXNAME].asString();
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
- QPID_LOG(debug, "Creating replicated exchange " << name);
if (!broker.createExchange(
name,
values[EXTYPE].asString(),
@@ -210,6 +208,8 @@ void WiringReplicator::doEventExchangeDe
args,
values[USER].asString(),
values[RHOST].asString()).second) {
+ // FIXME aconway 2011-11-22: should delete pre-exisitng exchange
+ // and re-create from event. Likewise for queues.
QPID_LOG(warning, "Replicated exchange " << name << " already
exists");
}
}
@@ -230,14 +230,13 @@ void WiringReplicator::doEventExchangeDe
}
void WiringReplicator::doEventBind(Variant::Map& values) {
- QPID_LOG(critical, "FIXME doEventBind " << values);
try {
boost::shared_ptr<Exchange> exchange =
broker.getExchanges().get(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
broker.getQueues().find(values[QNAME].asString());
// We only replicated a binds for a replicated queue to replicated
exchange.
if (isReplicated(exchange->getArgs()) &&
isReplicated(queue->getSettings())) {
framing::FieldTable args;
- amqp_0_10::translate(args, values[ARGS].asMap());
+ amqp_0_10::translate(values[ARGS].asMap(), args);
string key = values[KEY].asString();
QPID_LOG(debug, "Replicated binding exchange=" <<
exchange->getName()
<< " queue=" << queue->getName()
@@ -248,6 +247,11 @@ void WiringReplicator::doEventBind(Varia
}
void WiringReplicator::doResponseQueue(Variant::Map& values) {
+ // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate
replication
+ Variant::Map argsMap(values[ARGUMENTS].asMap());
+ if (!isReplicated(argsMap)) return;
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
QPID_LOG(debug, "Creating replicated queue " << values[NAME].asString() <<
" (in catch-up)");
if (!broker.createQueue(
values[NAME].asString(),
@@ -258,11 +262,17 @@ void WiringReplicator::doResponseQueue(V
args,
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/).second) {
+ // FIXME aconway 2011-11-22: Normal to find queue already
+ // exists if we're failing over.
QPID_LOG(warning, "Replicated queue " << values[NAME] << " already
exists (in catch-up)");
}
}
void WiringReplicator::doResponseExchange(Variant::Map& values) {
+ Variant::Map argsMap(values[ARGUMENTS].asMap());
+ if (!isReplicated(argsMap)) return;
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
QPID_LOG(debug, "Creating replicated exchange " << values[NAME].asString()
<< " (in catch-up)");
if (!broker.createExchange(
values[NAME].asString(),
@@ -276,9 +286,54 @@ void WiringReplicator::doResponseExchang
}
}
+// FIXME aconway 2011-11-21: refactor to remove redundancy between do*
functions.
+
+namespace {
+const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:");
+const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:");
+
+std::string getRefName(const std::string& prefix, const Variant& ref) {
+ Variant::Map map(ref.asMap());
+ Variant::Map::const_iterator i = map.find(OBJECT_NAME);
+ if (i == map.end())
+ throw Exception(QPID_MSG("Replicator: invalid object reference: " <<
ref));
+ const std::string name = i->second.asString();
+ if (name.compare(0, prefix.size(), prefix) != 0)
+ throw Exception(QPID_MSG("Replicator unexpected reference prefix: " <<
name));
+ std::string ret = name.substr(prefix.size());
+ return ret;
+}
+
+const std::string EXCHANGE_REF("exchangeRef");
+const std::string QUEUE_REF("queueRef");
+
+} // namespace
+
void WiringReplicator::doResponseBind(Variant::Map& values) {
- QPID_LOG(critical, "FIXME doResponseBind " << values);
- throw Exception("FIXME WiringReplicator: Not yet implemented - catch-up
replicate bindings.");
+ try {
+ std::string exName = getRefName(EXCHANGE_REF_PREFIX,
values[EXCHANGE_REF]);
+ boost::shared_ptr<Exchange> exchange =
broker.getExchanges().get(exName);
+ if (!exchange) return;
+
+ std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
+ boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
+ if (!queue) return;
+
+ // We only replicated a bind for a replicated queue to replicated
exchange.
+ // FIXME aconway 2011-11-22: do we always log binds between replicated
ex/q
+ // or do we consider the bind arguments as well?
+ if (exchange && queue &&
+ isReplicated(exchange->getArgs()) &&
isReplicated(queue->getSettings()))
+ {
+ framing::FieldTable args;
+ amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
+ string key = values[KEY].asString();
+ QPID_LOG(debug, "Replicated binding exchange=" <<
exchange->getName()
+ << " queue=" << queue->getName()
+ << " key=" << key);
+ exchange->bind(queue, key, &args);
+ }
+ } catch (const framing::NotFoundException& e) {} // Ignore unreplicated
queue or exchange.
}
boost::shared_ptr<Exchange> WiringReplicator::create(const string& target,
Broker& broker)
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1233643&r1=1233642&r2=1233643&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Thu Jan 19
23:03:20 2012
@@ -50,7 +50,7 @@ class ShortTests(BrokerTest):
def wait(self, session, address):
def check():
try:
- session.receiver(address)
+ session.sender(address)
return True
except NotFound: return False
assert retry(check), "Timed out waiting for %s"%(address)
@@ -67,37 +67,39 @@ class ShortTests(BrokerTest):
# Create some wiring before starting the backup, to test catch-up
primary = self.ha_broker(name="primary")
- s = primary.connect().session()
- s.sender(queue%("q1", "all")).send(Message("1"))
- s.sender(queue%("q2", "wiring")).send(Message("2"))
- s.sender(queue%("q3", "none")).send(Message("3"))
- s.sender(exchange%("e1", "all", "e1", "q2")).send(Message("4"))
+ p = primary.connect().session()
+ p.sender(queue%("q1", "all")).send(Message("1"))
+ p.sender(queue%("q2", "wiring")).send(Message("2"))
+ p.sender(queue%("q3", "none")).send(Message("3"))
+ p.sender(exchange%("e1", "all", "e1", "q2")).send(Message("4"))
# Create some after starting backup, test steady-state replication
backup = self.ha_broker(name="backup", broker_url=primary.host_port())
- s.sender(queue%("q01", "all")).send(Message("01"))
- s.sender(queue%("q02", "wiring")).send(Message("02"))
- s.sender(queue%("q03", "none")).send(Message("03"))
- s.sender(exchange%("e01", "all", "e01", "q02")).send(Message("04"))
+ b = backup.connect().session()
+ # FIXME aconway 2011-11-21: need to wait for backup to be ready to
test event replication
+ for a in ["q1", "q2", "e1"]: self.wait(b,a)
+ p.sender(queue%("q11", "all")).send(Message("11"))
+ p.sender(queue%("q12", "wiring")).send(Message("12"))
+ p.sender(queue%("q13", "none")).send(Message("13"))
+ p.sender(exchange%("e11", "all", "e11", "q12")).send(Message("14"))
# Verify replication
# FIXME aconway 2011-11-18: We should kill primary here and fail over.
- s = backup.connect().session()
- for a in ["q01", "q02", "e01"]: self.wait(s,a)
+ for a in ["q11", "q12", "e11"]: self.wait(b,a)
# FIXME aconway 2011-11-18: replicate messages
-# self.assert_browse(s, "q01", ["01", "04", "e01"])
-# self.assert_browse(s, "q02", []) # wiring only
-# self.assert_missing(s,"q03")
- s.sender("e01").send(Message("e01")) # Verify bind
- self.assert_browse(s, "q02", ["e01"])
+# self.assert_browse(b, "q11", ["11", "14", "e11"])
+# self.assert_browse(b, "q12", []) # wiring only
+# self.assert_missing(b,"q13")
+ b.sender("e11").send(Message("e11")) # Verify bind
+ self.assert_browse(b, "q12", ["e11"])
- for a in ["q1", "q2", "e1"]: self.wait(s,a)
+ for a in ["q1", "q2", "e1"]: self.wait(b,a)
# FIXME aconway 2011-11-18: replicate messages
-# self.assert_browse(s, "q1", ["1", "4", "e1"])
-# self.assert_browse(s, "q2", []) # wiring only
-# self.assert_missing(s,"q3")
- s.sender("e1").send(Message("e1")) # Verify bind
- self.assert_browse(s, "q2", ["e1"])
+# self.assert_browse(b, "q1", ["1", "4", "e1"])
+# self.assert_browse(b, "q2", []) # wiring only
+# self.assert_missing(b,"q3")
+ b.sender("e1").send(Message("e1")) # Verify bind
+ self.assert_browse(b, "q2", ["e1"])
if __name__ == "__main__":
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]