Author: aconway
Date: Thu Jan 19 23:00:59 2012
New Revision: 1233626
URL: http://svn.apache.org/viewvc?rev=1233626&view=rev
Log:
QPID-3603: Initial (very rough) cut of queue and exchange propagation from one
node to another
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/managementgen/qmfgen/templates/Event.cpp
qpid/branches/qpid-3603-2/qpid/cpp/managementgen/qmfgen/templates/Event.h
qpid/branches/qpid-3603-2/qpid/cpp/src/CMakeLists.txt
qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
qpid/branches/qpid-3603-2/qpid/specs/management-schema.xml
qpid/branches/qpid-3603-2/qpid/tools/src/py/qpid-config
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/managementgen/qmfgen/templates/Event.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/managementgen/qmfgen/templates/Event.cpp?rev=1233626&r1=1233625&r2=1233626&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/managementgen/qmfgen/templates/Event.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/managementgen/qmfgen/templates/Event.cpp
Thu Jan 19 23:00:59 2012
@@ -99,3 +99,8 @@ void Event/*MGEN:Event.NameCap*/::mapEnc
using namespace ::qpid::types;
/*MGEN:Event.ArgMap*/
}
+
+bool Event/*MGEN:Event.NameCap*/::match(const std::string& evt, const
std::string& pkg)
+{
+ return eventName == evt && packageName == pkg;
+}
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/managementgen/qmfgen/templates/Event.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/managementgen/qmfgen/templates/Event.h?rev=1233626&r1=1233625&r2=1233626&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/managementgen/qmfgen/templates/Event.h
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/managementgen/qmfgen/templates/Event.h
Thu Jan 19 23:00:59 2012
@@ -51,6 +51,8 @@ class Event/*MGEN:Event.NameCap*/ : publ
uint8_t getSeverity() const { return /*MGEN:Event.Severity*/; }
void encode(std::string& buffer) const;
void mapEncode(::qpid::types::Variant::Map& map) const;
+
+ static bool match(const std::string& evt, const std::string& pkg);
};
}/*MGEN:Event.CloseNamespaces*/
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/CMakeLists.txt
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/CMakeLists.txt?rev=1233626&r1=1233625&r2=1233626&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/CMakeLists.txt Thu Jan 19 23:00:59
2012
@@ -1001,6 +1001,7 @@ set (qpidbroker_SOURCES
qpid/broker/LegacyLVQ.cpp
qpid/broker/MessageDeque.cpp
qpid/broker/MessageMap.cpp
+ qpid/broker/NodeClone.cpp
qpid/broker/PriorityQueue.cpp
qpid/broker/Queue.cpp
qpid/broker/QueueCleaner.cpp
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am?rev=1233626&r1=1233625&r2=1233626&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am Thu Jan 19 23:00:59 2012
@@ -598,6 +598,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/PriorityQueue.cpp \
qpid/broker/NameGenerator.cpp \
qpid/broker/NameGenerator.h \
+ qpid/broker/NodeClone.h \
+ qpid/broker/NodeClone.cpp \
qpid/broker/NullMessageStore.cpp \
qpid/broker/NullMessageStore.h \
qpid/broker/OwnershipToken.h \
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1233626&r1=1233625&r2=1233626&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp Thu Jan 19
23:00:59 2012
@@ -24,17 +24,28 @@
#include "qpid/broker/Connection.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/LinkRegistry.h"
+#include "qpid/broker/NodeClone.h"
#include "qpid/broker/QueueReplicator.h"
#include "qpid/broker/SessionState.h"
#include "qpid/management/ManagementAgent.h"
+#include "qpid/types/Variant.h"
+#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/framing/Uuid.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
#include <iostream>
using qpid::framing::FieldTable;
using qpid::framing::Uuid;
using qpid::framing::Buffer;
+using qpid::framing::AMQFrame;
+using qpid::framing::AMQContentBody;
+using qpid::framing::AMQHeaderBody;
+using qpid::framing::MessageProperties;
+using qpid::framing::MessageTransferBody;
+using qpid::types::Variant;
using qpid::management::ManagementAgent;
using std::string;
namespace _qmf = qmf::org::apache::qpid::broker;
@@ -105,6 +116,52 @@ void Bridge::create(Connection& c)
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
QPID_LOG(debug, "Activated route from queue " << args.i_src << " to "
<< args.i_dest);
+ } else if (NodeClone::isNodeCloneDestination(args.i_dest)) {
+ //declare and bind an event queue
+ peer->getQueue().declare(queueName, "", false, false, true, true,
FieldTable());
+ peer->getExchange().bind(queueName, "qmf.default.topic",
"agent.ind.event.org_apache_qpid_broker.#", FieldTable());
+ //subscribe to the queue
+ peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "",
0, FieldTable());
+ peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+
+ //issue a query request for queues and another for exchanges using
event queue as the reply-to address
+ for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable
utility functions
+ Variant::Map request;
+ request["_what"] = "OBJECT";
+ Variant::Map schema;
+ schema["_class_name"] = (i == 0 ? "queue" : "exchange");
+ schema["_package_name"] = "org.apache.qpid.broker";
+ request["_schema_id"] = schema;
+
+ AMQFrame
method((MessageTransferBody(qpid::framing::ProtocolVersion(),
"qmf.default.direct", 0, 0)));
+ method.setBof(true);
+ method.setEof(false);
+ method.setBos(true);
+ method.setEos(true);
+ AMQHeaderBody headerBody;
+ MessageProperties* props = headerBody.get<MessageProperties>(true);
+ props->setReplyTo(qpid::framing::ReplyTo("", queueName));
+ props->setAppId("qmf2");
+ props->getApplicationHeaders().setString("qmf.opcode",
"_query_request");
+
headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker");
+ AMQFrame header(headerBody);
+ header.setBof(false);
+ header.setEof(false);
+ header.setBos(true);
+ header.setEos(true);
+ AMQContentBody data;
+ qpid::amqp_0_10::MapCodec::encode(request, data.getData());
+ AMQFrame content(data);
+ content.setBof(false);
+ content.setEof(true);
+ content.setBos(true);
+ content.setEos(true);
+ sessionHandler.out->handle(method);
+ sessionHandler.out->handle(header);
+ sessionHandler.out->handle(content);
+ }
+
} else {
FieldTable queueSettings;
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1233626&r1=1233625&r2=1233626&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Broker.cpp Thu Jan 19
23:00:59 2012
@@ -903,7 +903,7 @@ std::pair<boost::shared_ptr<Queue>, bool
//event instead?
managementAgent->raiseEvent(
_qmf::EventQueueDeclare(connectionId, userId, name,
- durable, owner, autodelete,
+ durable, owner, autodelete,
alternateExchange,
ManagementAgent::toMap(arguments),
"created"));
}
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1233626&r1=1233625&r2=1233626&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu
Jan 19 23:00:59 2012
@@ -25,6 +25,7 @@
#include "qpid/broker/DtxAck.h"
#include "qpid/broker/DtxTimeout.h"
#include "qpid/broker/Message.h"
+#include "qpid/broker/NodeClone.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueReplicator.h"
#include "qpid/broker/SessionContext.h"
@@ -676,6 +677,7 @@ void SemanticState::route(intrusive_ptr<
std::string exchangeName = msg->getExchangeName();
if (!cacheExchange || cacheExchange->getName() != exchangeName ||
cacheExchange->isDestroyed()) {
cacheExchange = QueueReplicator::create(exchangeName,
getSession().getBroker().getQueues());
+ if (!cacheExchange) cacheExchange = NodeClone::create(exchangeName,
getSession().getBroker());
if (!cacheExchange) cacheExchange =
session.getBroker().getExchanges().get(exchangeName);
}
cacheExchange->setProperties(msg);
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1233626&r1=1233625&r2=1233626&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Thu
Jan 19 23:00:59 2012
@@ -316,8 +316,8 @@ void SessionAdapter::QueueHandlerImpl::d
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(),
getConnection().getUserId(),
- name, durable,
exclusive, autoDelete, ManagementAgent::toMap(arguments),
- "existing"));
+ name, durable,
exclusive, autoDelete, alternateExchange, ManagementAgent::toMap(arguments),
+ "existing"));
}
}
Modified: qpid/branches/qpid-3603-2/qpid/specs/management-schema.xml
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/specs/management-schema.xml?rev=1233626&r1=1233625&r2=1233626&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/specs/management-schema.xml (original)
+++ qpid/branches/qpid-3603-2/qpid/specs/management-schema.xml Thu Jan 19
23:00:59 2012
@@ -425,7 +425,7 @@
<event name="clientDisconnect" sev="inform" args="rhost, user"/>
<event name="brokerLinkUp" sev="inform" args="rhost"/>
<event name="brokerLinkDown" sev="warn" args="rhost"/>
- <event name="queueDeclare" sev="inform" args="rhost, user, qName,
durable, excl, autoDel, args, disp"/>
+ <event name="queueDeclare" sev="inform" args="rhost, user, qName,
durable, excl, autoDel, altEx, args, disp"/>
<event name="queueDelete" sev="inform" args="rhost, user, qName"/>
<event name="exchangeDeclare" sev="inform" args="rhost, user, exName,
exType, altEx, durable, autoDel, args, disp"/>
<event name="exchangeDelete" sev="inform" args="rhost, user, exName"/>
Modified: qpid/branches/qpid-3603-2/qpid/tools/src/py/qpid-config
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/tools/src/py/qpid-config?rev=1233626&r1=1233625&r2=1233626&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/tools/src/py/qpid-config (original)
+++ qpid/branches/qpid-3603-2/qpid/tools/src/py/qpid-config Thu Jan 19 23:00:59
2012
@@ -492,6 +492,12 @@ class BrokerManager:
etype = args[0]
ename = args[1]
declArgs = {}
+ for a in config._extra_arguments:
+ r = a.split("=", 1)
+ if len(r) == 2: value = r[1]
+ else: value = None
+ declArgs[r[0]] = value
+
if config._msgSequence:
declArgs[MSG_SEQUENCE] = 1
if config._ive:
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]