Author: aconway
Date: Thu Jan 19 23:01:07 2012
New Revision: 1233627

URL: http://svn.apache.org/viewvc?rev=1233627&view=rev
Log:
QPID-3603: Checked in files left out by accident in last commit to this branch

Added:
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp   (with 
props)
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h   (with 
props)

Added: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp?rev=1233627&view=auto
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp (added)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp Thu Jan 19 
23:01:07 2012
@@ -0,0 +1,219 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "NodeClone.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/log/Statement.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
+
+using qmf::org::apache::qpid::broker::EventQueueDeclare;
+using qmf::org::apache::qpid::broker::EventQueueDelete;
+using qmf::org::apache::qpid::broker::EventExchangeDeclare;
+using qmf::org::apache::qpid::broker::EventExchangeDelete;
+
+namespace qpid {
+namespace broker {
+
+namespace{
+bool isQMFv2(const Message& message)
+{
+    const qpid::framing::MessageProperties* props = 
message.getProperties<qpid::framing::MessageProperties>();
+    return props && props->getAppId() == "qmf2";
+}
+
+template <class T> bool match(qpid::types::Variant::Map& schema)
+{
+    return T::match(schema["_class_name"], schema["_package_name"]);
+}
+
+}
+
+NodeClone::NodeClone(const std::string& name, Broker& b) : Exchange(name), 
broker(b) {}
+
+NodeClone::~NodeClone() {}
+
+void NodeClone::route(Deliverable& msg, const std::string& /*key*/, const 
qpid::framing::FieldTable* headers)
+{
+    if (isQMFv2(msg.getMessage()) && headers) {
+        if (headers->getAsString("qmf.content") == "_event") {
+            //decode as list
+            std::string content = msg.getMessage().getFrames().getContent();
+            qpid::types::Variant::List list;
+            qpid::amqp_0_10::ListCodec::decode(content, list);
+            if (list.empty()) {
+                QPID_LOG(error, "Error parsing QMF event, expected non-empty 
list");
+            } else {
+                try {
+                    qpid::types::Variant::Map& map = list.front().asMap();
+                    qpid::types::Variant::Map& schema = 
map["_schema_id"].asMap();
+                    qpid::types::Variant::Map& values = map["_values"].asMap();
+                    if (match<EventQueueDeclare>(schema)) {
+                        if (values["disp"] == "created" && 
values["args"].asMap()["qpid.propagate"]) {
+                            qpid::framing::FieldTable args;
+                            qpid::amqp_0_10::translate(values["args"].asMap(), 
args);
+                            if (!broker.createQueue(
+                                    values["qName"].asString(),
+                                    values["durable"].asBool(),
+                                    values["autoDel"].asBool(),
+                                    0 /*i.e. no owner regardless of 
exclusivity on master*/,
+                                    values["altEx"].asString(),
+                                    args,
+                                    values["user"].asString(),
+                                    values["rhost"].asString()).second) {
+                                QPID_LOG(warning, "Propagatable queue " << 
values["qName"] << " already exists");
+                            }
+                        }
+                    } else if (match<EventQueueDelete>(schema)) {
+                        std::string name = values["qName"].asString();
+                        QPID_LOG(debug, "Notified of deletion of queue " << 
name);
+                        boost::shared_ptr<Queue> queue = 
broker.getQueues().find(name);
+                        if (queue && 
queue->getSettings().isSet("qpid.propagate")/*TODO: check value*/) {
+                            broker.deleteQueue(
+                                name,
+                                values["user"].asString(),
+                                values["rhost"].asString());
+                        } else {
+                            if (queue) {
+                                QPID_LOG(debug, "Ignoring deletion 
notification for non-propagated queue " << name);
+                            } else {
+                                QPID_LOG(debug, "No such queue " << name);
+                            }
+                        }
+                    } else if (match<EventExchangeDeclare>(schema)) {
+                        if (values["disp"] == "created" && 
values["args"].asMap()["qpid.propagate"]) {
+                            qpid::framing::FieldTable args;
+                            qpid::amqp_0_10::translate(values["args"].asMap(), 
args);
+                            if (!broker.createExchange(
+                                    values["exName"].asString(),
+                                    values["exType"].asString(),
+                                    values["durable"].asBool(),
+                                    values["altEx"].asString(),
+                                    args,
+                                    values["user"].asString(),
+                                    values["rhost"].asString()).second) {
+                                QPID_LOG(warning, "Propagatable queue " << 
values["qName"] << " already exists");
+                            }
+                        }
+                    } else if (match<EventExchangeDelete>(schema)) {
+                        std::string name = values["exName"].asString();
+                        QPID_LOG(debug, "Notified of deletion of exchange " << 
name);
+                        try {
+                            boost::shared_ptr<Exchange> exchange = 
broker.getExchanges().get(name);
+                            if (exchange && 
exchange->getArgs().isSet("qpid.propagate")/*TODO: check value*/) {
+                                broker.deleteExchange(
+                                    name,
+                                    values["user"].asString(),
+                                    values["rhost"].asString());
+                            } else {
+                                if (exchange) {
+                                    QPID_LOG(debug, "Ignoring deletion 
notification for non-propagated exchange " << name);
+                                } else {
+                                    QPID_LOG(debug, "No such exchange " << 
name);
+                                }
+                            }
+                        } catch (const qpid::framing::NotFoundException&) {}
+                    }
+                } catch (const std::exception& e) {
+                    QPID_LOG(error, "Error propagating configuration: " << 
e.what());
+                }
+            }
+        } else if (headers->getAsString("qmf.opcode") == "_query_response") {
+            //decode as list
+            std::string content = msg.getMessage().getFrames().getContent();
+            qpid::types::Variant::List list;
+            qpid::amqp_0_10::ListCodec::decode(content, list);
+            QPID_LOG(debug, "Got query response (" << list.size() << ")");
+            for (qpid::types::Variant::List::iterator i = list.begin(); i != 
list.end(); ++i) {
+                std::string type = 
i->asMap()["_schema_id"].asMap()["_class_name"];
+                qpid::types::Variant::Map& values = 
i->asMap()["_values"].asMap();
+                QPID_LOG(debug, "class: " << type << ", values: " << values);
+                if (values["arguments"].asMap()["qpid.propagate"]) {
+                    qpid::framing::FieldTable args;
+                    qpid::amqp_0_10::translate(values["arguments"].asMap(), 
args);
+                    if (type == "queue") {
+                        if (!broker.createQueue(
+                                values["name"].asString(),
+                                values["durable"].asBool(),
+                                values["autoDelete"].asBool(),
+                                0 /*i.e. no owner regardless of exclusivity on 
master*/,
+                                ""/*TODO: need to include alternate-exchange*/,
+                                args,
+                                ""/*TODO: who is the user?*/,
+                                ""/*TODO: what should we use as connection 
id?*/).second) {
+                            QPID_LOG(warning, "Propagatable queue " << 
values["name"] << " already exists");
+                        }
+                    } else if (type == "exchange") {
+                        if (!broker.createExchange(
+                                values["name"].asString(),
+                                values["type"].asString(),
+                                values["durable"].asBool(),
+                                ""/*TODO: need to include alternate-exchange*/,
+                                args,
+                                ""/*TODO: who is the user?*/,
+                                ""/*TODO: what should we use as connection 
id?*/).second) {
+                            QPID_LOG(warning, "Propagatable queue " << 
values["qName"] << " already exists");
+                        }
+                    } else {
+                        QPID_LOG(warning, "Ignoring unknow object class: " << 
type);
+                    }
+                }
+            }
+        } else {
+            QPID_LOG(debug, "Dropping QMFv2 message with headers: " << 
*headers);
+        }
+    } else {
+        QPID_LOG(warning, "Ignoring message which is not a valid QMFv2 event 
or query response");
+    }
+}
+
+bool NodeClone::isNodeCloneDestination(const std::string& target)
+{
+    return target == "qpid.node-cloner";
+}
+
+boost::shared_ptr<Exchange> NodeClone::create(const std::string& target, 
Broker& broker)
+{
+    boost::shared_ptr<Exchange> exchange;
+    if (isNodeCloneDestination(target)) {
+        //TODO: need to cache the exchange
+        QPID_LOG(info, "Creating node cloner");
+        exchange.reset(new NodeClone(target, broker));
+    }
+    return exchange;
+}
+
+bool NodeClone::bind(boost::shared_ptr<Queue>, const std::string&, const 
qpid::framing::FieldTable*) { return false; }
+bool NodeClone::unbind(boost::shared_ptr<Queue>, const std::string&, const 
qpid::framing::FieldTable*) { return false; }
+bool NodeClone::isBound(boost::shared_ptr<Queue>, const std::string* const, 
const qpid::framing::FieldTable* const) { return false; }
+
+const std::string NodeClone::typeName("node-cloner");
+
+std::string NodeClone::getType() const
+{
+    return typeName;
+}
+
+}} // namespace qpid::broker

Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h?rev=1233627&view=auto
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h (added)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h Thu Jan 19 
23:01:07 2012
@@ -0,0 +1,54 @@
+#ifndef QPID_BROKER_NODEPROPAGATOR_H
+#define QPID_BROKER_NODEPROPAGATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Exchange.h"
+
+namespace qpid {
+namespace broker {
+
+class Broker;
+
+/**
+ * Pseudo-exchange for recreating local queues and/or exchanges on
+ * receipt of QMF events indicating their creation on another node
+ */
+class NodeClone : public Exchange
+{
+  public:
+    NodeClone(const std::string&, Broker&);
+    ~NodeClone();
+    std::string getType() const;
+    bool bind(boost::shared_ptr<Queue>, const std::string&, const 
qpid::framing::FieldTable*);
+    bool unbind(boost::shared_ptr<Queue>, const std::string&, const 
qpid::framing::FieldTable*);
+    void route(Deliverable&, const std::string&, const 
qpid::framing::FieldTable*);
+    bool isBound(boost::shared_ptr<Queue>, const std::string* const, const 
qpid::framing::FieldTable* const);
+
+    static bool isNodeCloneDestination(const std::string&);
+    static boost::shared_ptr<Exchange> create(const std::string&, Broker&);
+    static const std::string typeName;
+  private:
+    Broker& broker;
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_NODEPROPAGATOR_H*/

Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to