Author: aconway
Date: Thu Jan 19 23:03:12 2012
New Revision: 1233642
URL: http://svn.apache.org/viewvc?rev=1233642&view=rev
Log:
QPID-3603: Minor refactor.
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1233642&r1=1233641&r2=1233642&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Thu Jan
19 23:03:12 2012
@@ -31,6 +31,9 @@
#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
+namespace qpid {
+namespace ha {
+
using qmf::org::apache::qpid::broker::EventBind;
using qmf::org::apache::qpid::broker::EventExchangeDeclare;
using qmf::org::apache::qpid::broker::EventExchangeDelete;
@@ -38,50 +41,49 @@ using qmf::org::apache::qpid::broker::Ev
using qmf::org::apache::qpid::broker::EventQueueDelete;
using qmf::org::apache::qpid::broker::EventSubscribe;
-namespace qpid {
-namespace ha {
-
+using std::string;
using types::Variant;
using namespace broker;
namespace{
-const std::string QPID_REPLICATE("qpid.replicate");
-const std::string ALL("all");
-const std::string WIRING("wiring");
-
-const std::string CLASS_NAME("_class_name");
-const std::string PACKAGE_NAME("_package_name");
-const std::string VALUES("_values");
-const std::string EVENT("_event");
-const std::string SCHEMA_ID("_schema_id");
-const std::string QUERY_RESPONSE("_query_response");
-
-const std::string ARGUMENTS("arguments");
-const std::string QUEUE("queue");
-const std::string EXCHANGE("exchange");
-const std::string BIND("bind");
-const std::string ARGS("args");
-const std::string DURABLE("durable");
-const std::string QNAME("qName");
-const std::string AUTODEL("autoDel");
-const std::string ALTEX("altEx");
-const std::string USER("user");
-const std::string RHOST("rhost");
-const std::string EXTYPE("exType");
-const std::string EXNAME("exName");
-const std::string AUTODELETE("autoDelete");
-const std::string NAME("name");
-const std::string TYPE("type");
-const std::string DISP("disp");
-const std::string CREATED("created");
-
-
-const std::string QMF_OPCODE("qmf.opcode");
-const std::string QMF_CONTENT("qmf.content");
-const std::string QMF2("qmf2");
+const string QPID_REPLICATE("qpid.replicate");
+const string ALL("all");
+const string WIRING("wiring");
+
+const string CLASS_NAME("_class_name");
+const string PACKAGE_NAME("_package_name");
+const string VALUES("_values");
+const string EVENT("_event");
+const string SCHEMA_ID("_schema_id");
+const string QUERY_RESPONSE("_query_response");
+
+const string ARGUMENTS("arguments");
+const string QUEUE("queue");
+const string EXCHANGE("exchange");
+const string BIND("bind");
+const string ARGS("args");
+const string DURABLE("durable");
+const string QNAME("qName");
+const string AUTODEL("autoDel");
+const string ALTEX("altEx");
+const string USER("user");
+const string RHOST("rhost");
+const string EXTYPE("exType");
+const string EXNAME("exName");
+const string AUTODELETE("autoDelete");
+const string NAME("name");
+const string TYPE("type");
+const string DISP("disp");
+const string CREATED("created");
+const string KEY("key");
+
+
+const string QMF_OPCODE("qmf.opcode");
+const string QMF_CONTENT("qmf.content");
+const string QMF2("qmf2");
-const std::string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
+const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
bool isQMFv2(const Message& message)
@@ -95,7 +97,7 @@ template <class T> bool match(Variant::M
return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
}
-bool isReplicated(const std::string& value) {
+bool isReplicated(const string& value) {
return value == ALL || value == WIRING;
}
bool isReplicated(const framing::FieldTable& f) {
@@ -109,20 +111,22 @@ bool isReplicated(const Variant::Map& m)
} // namespace
-WiringReplicator::WiringReplicator(const std::string& name, Broker& b) :
Exchange(name), broker(b) {}
+WiringReplicator::WiringReplicator(const string& name, Broker& b) :
Exchange(name), broker(b) {}
WiringReplicator::~WiringReplicator() {}
-void WiringReplicator::route(Deliverable& msg, const std::string& /*key*/,
const framing::FieldTable* headers) {
+void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const
framing::FieldTable* headers) {
try {
// FIXME aconway 2011-11-21: outer error handling, e.g. for decoding
error.
if (!isQMFv2(msg.getMessage()) || !headers)
throw Exception("Unexpected message, not QMF2 event or query
response.");
- // FIXME aconway 2011-11-21: string constants
- if (headers->getAsString(QMF_CONTENT) == EVENT) { //decode as list
- std::string content = msg.getMessage().getFrames().getContent();
- Variant::List list;
- amqp_0_10::ListCodec::decode(content, list);
+ // decode as list
+ string content = msg.getMessage().getFrames().getContent();
+ Variant::List list;
+ amqp_0_10::ListCodec::decode(content, list);
+
+ QPID_LOG(critical, "FIXME WiringReplicator message: " << list);
+ if (headers->getAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end();
++i) {
// FIXME aconway 2011-11-18: should be iterating list?
Variant::Map& map = list.front().asMap();
@@ -133,16 +137,14 @@ void WiringReplicator::route(Deliverable
else if (match<EventExchangeDeclare>(schema))
doEventExchangeDeclare(values);
else if (match<EventExchangeDelete>(schema))
doEventExchangeDelete(values);
else if (match<EventBind>(schema)) doEventBind(values);
+ // FIXME aconway 2011-11-21: handle unbind & all other events.
else if (match<EventSubscribe>(schema)) {} // Deliberately
ignored.
else throw(Exception(QPID_MSG("WiringReplicator received
unexpected event, schema=" << schema)));
}
} else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
- //decode as list
- std::string content = msg.getMessage().getFrames().getContent();
- Variant::List list;
- amqp_0_10::ListCodec::decode(content, list);
+ QPID_LOG(critical, "FIXME WiringReplicator response: " << list);
for (Variant::List::iterator i = list.begin(); i != list.end();
++i) {
- std::string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME];
+ string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME];
Variant::Map& values = i->asMap()[VALUES].asMap();
if (isReplicated(values[ARGUMENTS].asMap())) {
framing::FieldTable args;
@@ -162,7 +164,7 @@ void WiringReplicator::route(Deliverable
}
void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
- std::string name = values[QNAME].asString();
+ string name = values[QNAME].asString();
if (values[DISP] == CREATED && isReplicated(values[ARGS].asMap())) {
QPID_LOG(debug, "Creating replicated queue " << name);
framing::FieldTable args;
@@ -182,7 +184,7 @@ void WiringReplicator::doEventQueueDecla
}
void WiringReplicator::doEventQueueDelete(Variant::Map& values) {
- std::string name = values[QNAME].asString();
+ string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
if (queue && isReplicated(queue->getSettings())) {
QPID_LOG(debug, "Deleting replicated queue " << name);
@@ -194,10 +196,11 @@ void WiringReplicator::doEventQueueDelet
}
void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) {
- if (values[DISP] == CREATED && isReplicated(values[ARGS].asMap())) {
- std::string name = values[EXNAME].asString();
+ Variant::Map argsMap(values[ARGS].asMap());
+ if (values[DISP] == CREATED && isReplicated(argsMap)) {
+ string name = values[EXNAME].asString();
framing::FieldTable args;
- amqp_0_10::translate(values[ARGS].asMap(), args);
+ amqp_0_10::translate(argsMap, args);
QPID_LOG(debug, "Creating replicated exchange " << name);
if (!broker.createExchange(
name,
@@ -213,11 +216,11 @@ void WiringReplicator::doEventExchangeDe
}
void WiringReplicator::doEventExchangeDelete(Variant::Map& values) {
- std::string name = values[EXNAME].asString();
+ string name = values[EXNAME].asString();
try {
boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
if (exchange && isReplicated(exchange->getArgs())) {
- QPID_LOG(warning, "Deleting replicated exchange " << name);
+ QPID_LOG(debug, "Deleting replicated exchange " << name);
broker.deleteExchange(
name,
values[USER].asString(),
@@ -226,9 +229,22 @@ void WiringReplicator::doEventExchangeDe
} catch (const framing::NotFoundException&) {}
}
-void WiringReplicator::doEventBind(Variant::Map&) {
- QPID_LOG(error, "FIXME WiringReplicator: Not yet implemented - replicate
bindings.");
- // FIXME aconway 2011-11-18: only replicated binds of replicated q to
replicated ex.
+void WiringReplicator::doEventBind(Variant::Map& values) {
+ QPID_LOG(critical, "FIXME doEventBind " << values);
+ try {
+ boost::shared_ptr<Exchange> exchange =
broker.getExchanges().get(values[EXNAME].asString());
+ boost::shared_ptr<Queue> queue =
broker.getQueues().find(values[QNAME].asString());
+ // We only replicated a binds for a replicated queue to replicated
exchange.
+ if (isReplicated(exchange->getArgs()) &&
isReplicated(queue->getSettings())) {
+ framing::FieldTable args;
+ amqp_0_10::translate(args, values[ARGS].asMap());
+ string key = values[KEY].asString();
+ QPID_LOG(debug, "Replicated binding exchange=" <<
exchange->getName()
+ << " queue=" << queue->getName()
+ << " key=" << key);
+ exchange->bind(queue, key, &args);
+ }
+ } catch (const framing::NotFoundException&) {} // Ignore unreplicated
queue or exchange.
}
void WiringReplicator::doResponseQueue(Variant::Map& values) {
@@ -260,11 +276,12 @@ void WiringReplicator::doResponseExchang
}
}
-void WiringReplicator::doResponseBind(Variant::Map& ) {
- QPID_LOG(error, "FIXME WiringReplicator: Not yet implemented - catch-up
replicate bindings.");
+void WiringReplicator::doResponseBind(Variant::Map& values) {
+ QPID_LOG(critical, "FIXME doResponseBind " << values);
+ throw Exception("FIXME WiringReplicator: Not yet implemented - catch-up
replicate bindings.");
}
-boost::shared_ptr<Exchange> WiringReplicator::create(const std::string&
target, Broker& broker)
+boost::shared_ptr<Exchange> WiringReplicator::create(const string& target,
Broker& broker)
{
boost::shared_ptr<Exchange> exchange;
if (isWiringReplicatorDestination(target)) {
@@ -274,18 +291,18 @@ boost::shared_ptr<Exchange> WiringReplic
return exchange;
}
-bool WiringReplicator::isWiringReplicatorDestination(const std::string& target)
+bool WiringReplicator::isWiringReplicatorDestination(const string& target)
{
return target == QPID_WIRING_REPLICATOR;
}
-bool WiringReplicator::bind(boost::shared_ptr<Queue>, const std::string&,
const framing::FieldTable*) { return false; }
-bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const std::string&,
const framing::FieldTable*) { return false; }
-bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const std::string*
const, const framing::FieldTable* const) { return false; }
+bool WiringReplicator::bind(boost::shared_ptr<Queue>, const string&, const
framing::FieldTable*) { return false; }
+bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const string&, const
framing::FieldTable*) { return false; }
+bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const string* const,
const framing::FieldTable* const) { return false; }
-const std::string WiringReplicator::typeName(QPID_WIRING_REPLICATOR);
+const string WiringReplicator::typeName(QPID_WIRING_REPLICATOR);
-std::string WiringReplicator::getType() const
+string WiringReplicator::getType() const
{
return typeName;
}
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1233642&r1=1233641&r2=1233642&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Thu Jan 19
23:03:12 2012
@@ -88,16 +88,16 @@ class ShortTests(BrokerTest):
# self.assert_browse(s, "q01", ["01", "04", "e01"])
# self.assert_browse(s, "q02", []) # wiring only
# self.assert_missing(s,"q03")
-# s.sender("e01").send(Message("e01")) # Verify bind
-# self.assert_browse(s, "q02", ["e01"])
+ s.sender("e01").send(Message("e01")) # Verify bind
+ self.assert_browse(s, "q02", ["e01"])
for a in ["q1", "q2", "e1"]: self.wait(s,a)
# FIXME aconway 2011-11-18: replicate messages
# self.assert_browse(s, "q1", ["1", "4", "e1"])
# self.assert_browse(s, "q2", []) # wiring only
# self.assert_missing(s,"q3")
-# s.sender("e1").send(Message("e1")) # Verify bind
-# self.assert_browse(s, "q2", ["e1"])
+ s.sender("e1").send(Message("e1")) # Verify bind
+ self.assert_browse(s, "q2", ["e1"])
if __name__ == "__main__":
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]