Author: aconway
Date: Thu Jan 19 23:02:27 2012
New Revision: 1233636
URL: http://svn.apache.org/viewvc?rev=1233636&view=rev
Log:
QPID-3603: Refactored NodeClone, break out create functions.
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h
qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp?rev=1233636&r1=1233635&r2=1233636&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp Thu Jan 19
23:02:27 2012
@@ -41,25 +41,20 @@ using qmf::org::apache::qpid::broker::Ev
namespace qpid {
namespace broker {
+using types::Variant;
+
namespace{
bool isQMFv2(const Message& message)
{
- const qpid::framing::MessageProperties* props =
message.getProperties<qpid::framing::MessageProperties>();
+ const framing::MessageProperties* props =
message.getProperties<framing::MessageProperties>();
return props && props->getAppId() == "qmf2";
}
-template <class T> bool match(qpid::types::Variant::Map& schema)
+template <class T> bool match(Variant::Map& schema)
{
return T::match(schema["_class_name"], schema["_package_name"]);
}
-}
-
-NodeClone::NodeClone(const std::string& name, Broker& b) : Exchange(name),
broker(b) {}
-
-NodeClone::~NodeClone() {}
-
-namespace {
const std::string QPID_REPLICATE("qpid.replicate");
const std::string ALL("all");
const std::string WIRING("wiring");
@@ -70,95 +65,42 @@ bool isReplicated(const std::string& val
bool isReplicated(const framing::FieldTable& f) {
return f.isSet(QPID_REPLICATE) &&
isReplicated(f.getAsString(QPID_REPLICATE));
}
-bool isReplicated(const types::Variant::Map& m) {
- types::Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
+bool isReplicated(const Variant::Map& m) {
+ Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
return i != m.end() && isReplicated(i->second.asString());
}
-}
+} // namespace
-void NodeClone::route(Deliverable& msg, const std::string& /*key*/, const
qpid::framing::FieldTable* headers)
-{
+
+NodeClone::NodeClone(const std::string& name, Broker& b) : Exchange(name),
broker(b) {}
+
+NodeClone::~NodeClone() {}
+
+void NodeClone::route(Deliverable& msg, const std::string& /*key*/, const
framing::FieldTable* headers) {
+ // FIXME aconway 2011-11-21: outer error handling, e.g. for decoding error.
if (isQMFv2(msg.getMessage()) && headers) {
+ // FIXME aconway 2011-11-21: string constants
if (headers->getAsString("qmf.content") == "_event") { //decode as list
std::string content = msg.getMessage().getFrames().getContent();
- qpid::types::Variant::List list;
- qpid::amqp_0_10::ListCodec::decode(content, list);
- if (list.empty()) {
+ Variant::List list;
+ amqp_0_10::ListCodec::decode(content, list);
+ if (list.empty()) { // FIXME aconway 2011-11-21: remove
QPID_LOG(error, "Error parsing QMF event, empty list");
} else {
try {
// FIXME aconway 2011-11-18: should be iterating list?
- qpid::types::Variant::Map& map = list.front().asMap();
- qpid::types::Variant::Map& schema =
map["_schema_id"].asMap();
- qpid::types::Variant::Map& values = map["_values"].asMap();
- if (match<EventQueueDeclare>(schema)) {
- std::string name = values["qName"].asString();
- if (values["disp"] == "created" &&
isReplicated(values["args"].asMap())) {
- QPID_LOG(debug, "Creating replicated queue " <<
name);
- qpid::framing::FieldTable args;
- qpid::amqp_0_10::translate(values["args"].asMap(),
args);
- if (!broker.createQueue(
- name,
- values["durable"].asBool(),
- values["autoDel"].asBool(),
- 0 /*i.e. no owner regardless of
exclusivity on master*/,
- values["altEx"].asString(),
- args,
- values["user"].asString(),
- values["rhost"].asString()).second) {
- QPID_LOG(warning, "Replicated queue " << name
<< " already exists");
- }
- }
- } else if (match<EventQueueDelete>(schema)) {
- std::string name = values["qName"].asString();
- boost::shared_ptr<Queue> queue =
broker.getQueues().find(name);
- if (queue && isReplicated(queue->getSettings())) {
- QPID_LOG(debug, "Deleting replicated queue " <<
name);
- broker.deleteQueue(
- name,
- values["user"].asString(),
- values["rhost"].asString());
- }
- } else if (match<EventExchangeDeclare>(schema)) {
- if (values["disp"] == "created" &&
isReplicated(values["args"].asMap())) {
- std::string name = values["exName"].asString();
- qpid::framing::FieldTable args;
- qpid::amqp_0_10::translate(values["args"].asMap(),
args);
- QPID_LOG(debug, "Creating replicated exchange " <<
name);
- if (!broker.createExchange(
- name,
- values["exType"].asString(),
- values["durable"].asBool(),
- values["altEx"].asString(),
- args,
- values["user"].asString(),
- values["rhost"].asString()).second) {
- QPID_LOG(warning, "Replicated exchange " <<
name << " already exists");
- }
- }
- } else if (match<EventExchangeDelete>(schema)) {
- std::string name = values["exName"].asString();
- try {
- boost::shared_ptr<Exchange> exchange =
broker.getExchanges().get(name);
- if (exchange && isReplicated(exchange->getArgs()))
{
- QPID_LOG(warning, "Deleting replicated
exchange " << name);
- broker.deleteExchange(
- name,
- values["user"].asString(),
- values["rhost"].asString());
- }
- } catch (const qpid::framing::NotFoundException&) {}
- }
- else if (match<EventBind>(schema)) {
- QPID_LOG(error, "FIXME NodeClone: Not yet implemented
- replicate bindings.");
- }
- else if (match<EventSubscribe>(schema)) {
- // Deliberately ignore.
- }
- else {
- QPID_LOG(warning, "Replicator received unexpected
event, schema=" << schema);
- }
+ Variant::Map& map = list.front().asMap();
+ Variant::Map& schema = map[
+ "_schema_id"].asMap();
+ Variant::Map& values = map["_values"].asMap();
+ if (match<EventQueueDeclare>(schema))
doEventQueueDeclare(values);
+ else if (match<EventQueueDelete>(schema))
doEventQueueDelete(values);
+ else if (match<EventExchangeDeclare>(schema))
doEventExchangeDeclare(values);
+ else if (match<EventExchangeDelete>(schema))
doEventExchangeDelete(values);
+ else if (match<EventBind>(schema)) doEventBind(values);
+ else if (match<EventSubscribe>(schema)) {} // Deliberately
ignored.
+ else QPID_LOG(warning, "Replicator received unexpected
event, schema=" << schema);
} catch (const std::exception& e) {
QPID_LOG(error, "Error replicating configuration: " <<
e.what());
}
@@ -166,40 +108,18 @@ void NodeClone::route(Deliverable& msg,
} else if (headers->getAsString("qmf.opcode") == "_query_response") {
//decode as list
std::string content = msg.getMessage().getFrames().getContent();
- qpid::types::Variant::List list;
- qpid::amqp_0_10::ListCodec::decode(content, list);
- for (qpid::types::Variant::List::iterator i = list.begin(); i !=
list.end(); ++i) {
+ Variant::List list;
+ amqp_0_10::ListCodec::decode(content, list);
+ for (Variant::List::iterator i = list.begin(); i != list.end();
++i) {
std::string type =
i->asMap()["_schema_id"].asMap()["_class_name"];
- qpid::types::Variant::Map& values =
i->asMap()["_values"].asMap();
+ Variant::Map& values = i->asMap()["_values"].asMap();
if (isReplicated(values["arguments"].asMap())) {
- qpid::framing::FieldTable args;
- qpid::amqp_0_10::translate(values["arguments"].asMap(),
args);
- if (type == "queue") {
- QPID_LOG(debug, "Creating replicated queue " <<
values["name"].asString() << " (in catch-up)");
- if (!broker.createQueue(
- values["name"].asString(),
- values["durable"].asBool(),
- values["autoDelete"].asBool(),
- 0 /*i.e. no owner regardless of exclusivity on
master*/,
- ""/*TODO: need to include alternate-exchange*/,
- args,
- ""/*TODO: who is the user?*/,
- ""/*TODO: what should we use as connection
id?*/).second) {
- QPID_LOG(warning, "Replicated queue " <<
values["name"] << " already exists (in catch-up)");
- }
- } else if (type == "exchange") {
- QPID_LOG(debug, "Creating replicated exchange " <<
values["name"].asString() << " (in catch-up)");
- if (!broker.createExchange(
- values["name"].asString(),
- values["type"].asString(),
- values["durable"].asBool(),
- ""/*TODO: need to include alternate-exchange*/,
- args,
- ""/*TODO: who is the user?*/,
- ""/*TODO: what should we use as connection
id?*/).second) {
- QPID_LOG(warning, "Replicated exchange " <<
values["qName"] << " already exists (in catch-up)");
- }
- } else {
+ framing::FieldTable args;
+ amqp_0_10::translate(values["arguments"].asMap(), args);
+ if (type == "queue") doResponseQueue(values);
+ else if (type == "exchange") doResponseExchange(values);
+ else if (type == "bind") doResponseBind(values);
+ else {
QPID_LOG(warning, "Replicator ignoring unexpected
class: " << type);
}
}
@@ -212,9 +132,107 @@ void NodeClone::route(Deliverable& msg,
}
}
-bool NodeClone::isNodeCloneDestination(const std::string& target)
-{
- return target == "qpid.node-cloner";
+void NodeClone::doEventQueueDeclare(Variant::Map& values) {
+ std::string name = values["qName"].asString();
+ if (values["disp"] == "created" && isReplicated(values["args"].asMap())) {
+ QPID_LOG(debug, "Creating replicated queue " << name);
+ framing::FieldTable args;
+ amqp_0_10::translate(values["args"].asMap(), args);
+ if (!broker.createQueue(
+ name,
+ values["durable"].asBool(),
+ values["autoDel"].asBool(),
+ 0 /*i.e. no owner regardless of exclusivity on master*/,
+ values["altEx"].asString(),
+ args,
+ values["user"].asString(),
+ values["rhost"].asString()).second) {
+ QPID_LOG(warning, "Replicated queue " << name << " already
exists");
+ }
+ }
+}
+
+void NodeClone::doEventQueueDelete(Variant::Map& values) {
+ std::string name = values["qName"].asString();
+ boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
+ if (queue && isReplicated(queue->getSettings())) {
+ QPID_LOG(debug, "Deleting replicated queue " << name);
+ broker.deleteQueue(
+ name,
+ values["user"].asString(),
+ values["rhost"].asString());
+ }
+}
+
+void NodeClone::doEventExchangeDeclare(Variant::Map& values) {
+ if (values["disp"] == "created" && isReplicated(values["args"].asMap())) {
+ std::string name = values["exName"].asString();
+ framing::FieldTable args;
+ amqp_0_10::translate(values["args"].asMap(), args);
+ QPID_LOG(debug, "Creating replicated exchange " << name);
+ if (!broker.createExchange(
+ name,
+ values["exType"].asString(),
+ values["durable"].asBool(),
+ values["altEx"].asString(),
+ args,
+ values["user"].asString(),
+ values["rhost"].asString()).second) {
+ QPID_LOG(warning, "Replicated exchange " << name << " already
exists");
+ }
+ }
+}
+
+void NodeClone::doEventExchangeDelete(Variant::Map& values) {
+ std::string name = values["exName"].asString();
+ try {
+ boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
+ if (exchange && isReplicated(exchange->getArgs())) {
+ QPID_LOG(warning, "Deleting replicated exchange " << name);
+ broker.deleteExchange(
+ name,
+ values["user"].asString(),
+ values["rhost"].asString());
+ }
+ } catch (const framing::NotFoundException&) {}
+}
+
+void NodeClone::doEventBind(Variant::Map&) {
+ QPID_LOG(error, "FIXME NodeClone: Not yet implemented - replicate
bindings.");
+ // FIXME aconway 2011-11-18: only replicated binds of replicated q to
replicated ex.
+}
+
+void NodeClone::doResponseQueue(Variant::Map& values) {
+ QPID_LOG(debug, "Creating replicated queue " << values["name"].asString()
<< " (in catch-up)");
+ if (!broker.createQueue(
+ values["name"].asString(),
+ values["durable"].asBool(),
+ values["autoDelete"].asBool(),
+ 0 /*i.e. no owner regardless of exclusivity on master*/,
+ ""/*TODO: need to include alternate-exchange*/,
+ args,
+ ""/*TODO: who is the user?*/,
+ ""/*TODO: what should we use as connection id?*/).second) {
+ QPID_LOG(warning, "Replicated queue " << values["name"] << " already
exists (in catch-up)");
+ }
+}
+
+void NodeClone::doResponseExchange(Variant::Map& values) {
+ QPID_LOG(debug, "Creating replicated exchange " <<
values["name"].asString() << " (in catch-up)");
+ if (!broker.createExchange(
+ values["name"].asString(),
+ values["type"].asString(),
+ values["durable"].asBool(),
+ ""/*TODO: need to include alternate-exchange*/,
+ args,
+ ""/*TODO: who is the user?*/,
+ ""/*TODO: what should we use as connection id?*/).second) {
+ QPID_LOG(warning, "Replicated exchange " << values["qName"] << "
already exists (in catch-up)");
+ }
+}
+
+void NodeClone::doResponseBind(Variant::Map& ) {
+ QPID_LOG(error, "FIXME NodeClone: Not yet implemented - catch-up replicate
bindings.");
}
boost::shared_ptr<Exchange> NodeClone::create(const std::string& target,
Broker& broker)
@@ -228,15 +246,20 @@ boost::shared_ptr<Exchange> NodeClone::c
return exchange;
}
-bool NodeClone::bind(boost::shared_ptr<Queue>, const std::string&, const
qpid::framing::FieldTable*) { return false; }
-bool NodeClone::unbind(boost::shared_ptr<Queue>, const std::string&, const
qpid::framing::FieldTable*) { return false; }
-bool NodeClone::isBound(boost::shared_ptr<Queue>, const std::string* const,
const qpid::framing::FieldTable* const) { return false; }
+bool NodeClone::isNodeCloneDestination(const std::string& target)
+{
+ return target == "qpid.node-cloner";
+}
+
+bool NodeClone::bind(boost::shared_ptr<Queue>, const std::string&, const
framing::FieldTable*) { return false; }
+bool NodeClone::unbind(boost::shared_ptr<Queue>, const std::string&, const
framing::FieldTable*) { return false; }
+bool NodeClone::isBound(boost::shared_ptr<Queue>, const std::string* const,
const framing::FieldTable* const) { return false; }
-const std::string NodeClone::typeName("node-cloner");
+const std::string NodeClone::typeName("node-cloner"); // FIXME aconway
2011-11-21: qpid.replicator
std::string NodeClone::getType() const
{
return typeName;
}
-}} // namespace qpid::broker
+}} // namespace broker
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h?rev=1233636&r1=1233635&r2=1233636&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h Thu Jan 19
23:02:27 2012
@@ -21,11 +21,16 @@
* under the License.
*
*/
+
#include "qpid/broker/Exchange.h"
+#include "qpid/types/Variant.h"
// FIXME aconway 2011-11-17: relocate to ../ha
namespace qpid {
+namespace types {
+class Variant;
+}
namespace broker {
class Broker;
@@ -49,6 +54,16 @@ class NodeClone : public Exchange
static boost::shared_ptr<Exchange> create(const std::string&, Broker&);
static const std::string typeName;
private:
+
+ void doEventQueueDeclare(types::Variant::Map& values);
+ void doEventQueueDelete(types::Variant::Map& values);
+ void doEventExchangeDeclare(types::Variant::Map& values);
+ void doEventExchangeDelete(types::Variant::Map& values);
+ void doEventBind(types::Variant::Map&);
+ void doResponseQueue(types::Variant::Map& values);
+ void doResponseExchange(types::Variant::Map& values);
+ void doResponseBind(types::Variant::Map& values);
+
Broker& broker;
};
}} // namespace qpid::broker
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1233636&r1=1233635&r2=1233636&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Thu Jan 19
23:02:27 2012
@@ -89,8 +89,7 @@ class ShortTests(BrokerTest):
# self.assert_browse(s, "q02", []) # wiring only
# self.assert_missing(s,"q03")
s.sender("e01").send(Message("e01")) # Verify bind
- # FIXME aconway 2011-11-18: FIXME replicate bindings
- # self.assert_browse(s, "q02", ["e01"])
+ self.assert_browse(s, "q02", ["e01"])
for a in ["q1", "q2", "e1"]: self.wait(s,a)
# FIXME aconway 2011-11-18: replicate messages
@@ -98,8 +97,7 @@ class ShortTests(BrokerTest):
# self.assert_browse(s, "q2", []) # wiring only
# self.assert_missing(s,"q3")
s.sender("e1").send(Message("e1")) # Verify bind
- # FIXME aconway 2011-11-18: FIXME replicate bindings
- # self.assert_browse(s, "q2", ["e1"])
+ self.assert_browse(s, "q2", ["e1"])
if __name__ == "__main__":
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]