Author: aconway
Date: Thu Jan 19 23:01:07 2012
New Revision: 1233627
URL: http://svn.apache.org/viewvc?rev=1233627&view=rev
Log:
QPID-3603: Checked in files left out by accident in last commit to this branch
Added:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp (with
props)
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h (with
props)
Added: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp?rev=1233627&view=auto
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp (added)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp Thu Jan 19
23:01:07 2012
@@ -0,0 +1,219 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "NodeClone.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/log/Statement.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
+
+using qmf::org::apache::qpid::broker::EventQueueDeclare;
+using qmf::org::apache::qpid::broker::EventQueueDelete;
+using qmf::org::apache::qpid::broker::EventExchangeDeclare;
+using qmf::org::apache::qpid::broker::EventExchangeDelete;
+
+namespace qpid {
+namespace broker {
+
+namespace{
+bool isQMFv2(const Message& message)
+{
+ const qpid::framing::MessageProperties* props =
message.getProperties<qpid::framing::MessageProperties>();
+ return props && props->getAppId() == "qmf2";
+}
+
+template <class T> bool match(qpid::types::Variant::Map& schema)
+{
+ return T::match(schema["_class_name"], schema["_package_name"]);
+}
+
+}
+
+NodeClone::NodeClone(const std::string& name, Broker& b) : Exchange(name),
broker(b) {}
+
+NodeClone::~NodeClone() {}
+
+void NodeClone::route(Deliverable& msg, const std::string& /*key*/, const
qpid::framing::FieldTable* headers)
+{
+ if (isQMFv2(msg.getMessage()) && headers) {
+ if (headers->getAsString("qmf.content") == "_event") {
+ //decode as list
+ std::string content = msg.getMessage().getFrames().getContent();
+ qpid::types::Variant::List list;
+ qpid::amqp_0_10::ListCodec::decode(content, list);
+ if (list.empty()) {
+ QPID_LOG(error, "Error parsing QMF event, expected non-empty
list");
+ } else {
+ try {
+ qpid::types::Variant::Map& map = list.front().asMap();
+ qpid::types::Variant::Map& schema =
map["_schema_id"].asMap();
+ qpid::types::Variant::Map& values = map["_values"].asMap();
+ if (match<EventQueueDeclare>(schema)) {
+ if (values["disp"] == "created" &&
values["args"].asMap()["qpid.propagate"]) {
+ qpid::framing::FieldTable args;
+ qpid::amqp_0_10::translate(values["args"].asMap(),
args);
+ if (!broker.createQueue(
+ values["qName"].asString(),
+ values["durable"].asBool(),
+ values["autoDel"].asBool(),
+ 0 /*i.e. no owner regardless of
exclusivity on master*/,
+ values["altEx"].asString(),
+ args,
+ values["user"].asString(),
+ values["rhost"].asString()).second) {
+ QPID_LOG(warning, "Propagatable queue " <<
values["qName"] << " already exists");
+ }
+ }
+ } else if (match<EventQueueDelete>(schema)) {
+ std::string name = values["qName"].asString();
+ QPID_LOG(debug, "Notified of deletion of queue " <<
name);
+ boost::shared_ptr<Queue> queue =
broker.getQueues().find(name);
+ if (queue &&
queue->getSettings().isSet("qpid.propagate")/*TODO: check value*/) {
+ broker.deleteQueue(
+ name,
+ values["user"].asString(),
+ values["rhost"].asString());
+ } else {
+ if (queue) {
+ QPID_LOG(debug, "Ignoring deletion
notification for non-propagated queue " << name);
+ } else {
+ QPID_LOG(debug, "No such queue " << name);
+ }
+ }
+ } else if (match<EventExchangeDeclare>(schema)) {
+ if (values["disp"] == "created" &&
values["args"].asMap()["qpid.propagate"]) {
+ qpid::framing::FieldTable args;
+ qpid::amqp_0_10::translate(values["args"].asMap(),
args);
+ if (!broker.createExchange(
+ values["exName"].asString(),
+ values["exType"].asString(),
+ values["durable"].asBool(),
+ values["altEx"].asString(),
+ args,
+ values["user"].asString(),
+ values["rhost"].asString()).second) {
+ QPID_LOG(warning, "Propagatable queue " <<
values["qName"] << " already exists");
+ }
+ }
+ } else if (match<EventExchangeDelete>(schema)) {
+ std::string name = values["exName"].asString();
+ QPID_LOG(debug, "Notified of deletion of exchange " <<
name);
+ try {
+ boost::shared_ptr<Exchange> exchange =
broker.getExchanges().get(name);
+ if (exchange &&
exchange->getArgs().isSet("qpid.propagate")/*TODO: check value*/) {
+ broker.deleteExchange(
+ name,
+ values["user"].asString(),
+ values["rhost"].asString());
+ } else {
+ if (exchange) {
+ QPID_LOG(debug, "Ignoring deletion
notification for non-propagated exchange " << name);
+ } else {
+ QPID_LOG(debug, "No such exchange " <<
name);
+ }
+ }
+ } catch (const qpid::framing::NotFoundException&) {}
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Error propagating configuration: " <<
e.what());
+ }
+ }
+ } else if (headers->getAsString("qmf.opcode") == "_query_response") {
+ //decode as list
+ std::string content = msg.getMessage().getFrames().getContent();
+ qpid::types::Variant::List list;
+ qpid::amqp_0_10::ListCodec::decode(content, list);
+ QPID_LOG(debug, "Got query response (" << list.size() << ")");
+ for (qpid::types::Variant::List::iterator i = list.begin(); i !=
list.end(); ++i) {
+ std::string type =
i->asMap()["_schema_id"].asMap()["_class_name"];
+ qpid::types::Variant::Map& values =
i->asMap()["_values"].asMap();
+ QPID_LOG(debug, "class: " << type << ", values: " << values);
+ if (values["arguments"].asMap()["qpid.propagate"]) {
+ qpid::framing::FieldTable args;
+ qpid::amqp_0_10::translate(values["arguments"].asMap(),
args);
+ if (type == "queue") {
+ if (!broker.createQueue(
+ values["name"].asString(),
+ values["durable"].asBool(),
+ values["autoDelete"].asBool(),
+ 0 /*i.e. no owner regardless of exclusivity on
master*/,
+ ""/*TODO: need to include alternate-exchange*/,
+ args,
+ ""/*TODO: who is the user?*/,
+ ""/*TODO: what should we use as connection
id?*/).second) {
+ QPID_LOG(warning, "Propagatable queue " <<
values["name"] << " already exists");
+ }
+ } else if (type == "exchange") {
+ if (!broker.createExchange(
+ values["name"].asString(),
+ values["type"].asString(),
+ values["durable"].asBool(),
+ ""/*TODO: need to include alternate-exchange*/,
+ args,
+ ""/*TODO: who is the user?*/,
+ ""/*TODO: what should we use as connection
id?*/).second) {
+ QPID_LOG(warning, "Propagatable queue " <<
values["qName"] << " already exists");
+ }
+ } else {
+ QPID_LOG(warning, "Ignoring unknow object class: " <<
type);
+ }
+ }
+ }
+ } else {
+ QPID_LOG(debug, "Dropping QMFv2 message with headers: " <<
*headers);
+ }
+ } else {
+ QPID_LOG(warning, "Ignoring message which is not a valid QMFv2 event
or query response");
+ }
+}
+
+bool NodeClone::isNodeCloneDestination(const std::string& target)
+{
+ return target == "qpid.node-cloner";
+}
+
+boost::shared_ptr<Exchange> NodeClone::create(const std::string& target,
Broker& broker)
+{
+ boost::shared_ptr<Exchange> exchange;
+ if (isNodeCloneDestination(target)) {
+ //TODO: need to cache the exchange
+ QPID_LOG(info, "Creating node cloner");
+ exchange.reset(new NodeClone(target, broker));
+ }
+ return exchange;
+}
+
+bool NodeClone::bind(boost::shared_ptr<Queue>, const std::string&, const
qpid::framing::FieldTable*) { return false; }
+bool NodeClone::unbind(boost::shared_ptr<Queue>, const std::string&, const
qpid::framing::FieldTable*) { return false; }
+bool NodeClone::isBound(boost::shared_ptr<Queue>, const std::string* const,
const qpid::framing::FieldTable* const) { return false; }
+
+const std::string NodeClone::typeName("node-cloner");
+
+std::string NodeClone::getType() const
+{
+ return typeName;
+}
+
+}} // namespace qpid::broker
Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h?rev=1233627&view=auto
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h (added)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h Thu Jan 19
23:01:07 2012
@@ -0,0 +1,54 @@
+#ifndef QPID_BROKER_NODEPROPAGATOR_H
+#define QPID_BROKER_NODEPROPAGATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Exchange.h"
+
+namespace qpid {
+namespace broker {
+
+class Broker;
+
+/**
+ * Pseudo-exchange for recreating local queues and/or exchanges on
+ * receipt of QMF events indicating their creation on another node
+ */
+class NodeClone : public Exchange
+{
+ public:
+ NodeClone(const std::string&, Broker&);
+ ~NodeClone();
+ std::string getType() const;
+ bool bind(boost::shared_ptr<Queue>, const std::string&, const
qpid::framing::FieldTable*);
+ bool unbind(boost::shared_ptr<Queue>, const std::string&, const
qpid::framing::FieldTable*);
+ void route(Deliverable&, const std::string&, const
qpid::framing::FieldTable*);
+ bool isBound(boost::shared_ptr<Queue>, const std::string* const, const
qpid::framing::FieldTable* const);
+
+ static bool isNodeCloneDestination(const std::string&);
+ static boost::shared_ptr<Exchange> create(const std::string&, Broker&);
+ static const std::string typeName;
+ private:
+ Broker& broker;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_NODEPROPAGATOR_H*/
Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]