Author: aconway
Date: Thu Jan 19 23:03:03 2012
New Revision: 1233641
URL: http://svn.apache.org/viewvc?rev=1233641&view=rev
Log:
QPID-3603: Move init code for WiringReplicator out of Bridge into ha::Backup.
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SessionHandler.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1233641&r1=1233640&r2=1233641&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp Thu Jan 19
23:03:03 2012
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -59,9 +59,11 @@ void Bridge::PushHandler::handle(framing
}
Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
- const _qmf::ArgsLinkBridge& _args) :
+ const _qmf::ArgsLinkBridge& _args,
+ InitializeCallback init) :
link(_link), id(_id), args(_args), mgmtObject(0),
- listener(l), name(Uuid(true).str()), queueName("bridge_queue_"),
persistenceId(0)
+ listener(l), name(Uuid(true).str()), queueName("bridge_queue_"),
persistenceId(0),
+ initialize(init)
{
std::stringstream title;
title << id << "_" << link->getBroker()->getFederationTag();
@@ -77,9 +79,9 @@ Bridge::Bridge(Link* _link, framing::Cha
QPID_LOG(debug, "Bridge created from " << args.i_src << " to " <<
args.i_dest);
}
-Bridge::~Bridge()
+Bridge::~Bridge()
{
- mgmtObject->resourceDestroy();
+ mgmtObject->resourceDestroy();
}
void Bridge::create(Connection& c)
@@ -98,7 +100,7 @@ void Bridge::create(Connection& c)
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
-
+
session->attach(name, false);
session->commandPoint(0,0);
} else {
@@ -108,7 +110,8 @@ void Bridge::create(Connection& c)
}
if (args.i_srcIsLocal)
sessionHandler.getSession()->disableReceiverTracking();
- if (args.i_srcIsQueue) {
+ if (initialize) initialize(*this, sessionHandler);
+ else if (args.i_srcIsQueue) {
//TODO: something other than this which is nasty...
bool isReplicatingLink =
QueueReplicator::initReplicationSettings(args.i_dest,
link->getBroker()->getQueues(), options);
@@ -116,52 +119,6 @@ void Bridge::create(Connection& c)
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
QPID_LOG(debug, "Activated route from queue " << args.i_src << " to "
<< args.i_dest);
- } else if
(ha::WiringReplicator::isWiringReplicatorDestination(args.i_dest)) {
- //declare and bind an event queue
- peer->getQueue().declare(queueName, "", false, false, true, true,
FieldTable());
- peer->getExchange().bind(queueName, "qmf.default.topic",
"agent.ind.event.org_apache_qpid_broker.#", FieldTable());
- //subscribe to the queue
- peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "",
0, FieldTable());
- peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
-
- //issue a query request for queues and another for exchanges using
event queue as the reply-to address
- for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable
utility functions
- Variant::Map request;
- request["_what"] = "OBJECT";
- Variant::Map schema;
- schema["_class_name"] = (i == 0 ? "queue" : "exchange");
- schema["_package_name"] = "org.apache.qpid.broker";
- request["_schema_id"] = schema;
-
- AMQFrame
method((MessageTransferBody(qpid::framing::ProtocolVersion(),
"qmf.default.direct", 0, 0)));
- method.setBof(true);
- method.setEof(false);
- method.setBos(true);
- method.setEos(true);
- AMQHeaderBody headerBody;
- MessageProperties* props = headerBody.get<MessageProperties>(true);
- props->setReplyTo(qpid::framing::ReplyTo("", queueName));
- props->setAppId("qmf2");
- props->getApplicationHeaders().setString("qmf.opcode",
"_query_request");
-
headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker");
- AMQFrame header(headerBody);
- header.setBof(false);
- header.setEof(false);
- header.setBos(true);
- header.setEos(true);
- AMQContentBody data;
- qpid::amqp_0_10::MapCodec::encode(request, data.getData());
- AMQFrame content(data);
- content.setBof(false);
- content.setEof(true);
- content.setBos(true);
- content.setEos(true);
- sessionHandler.out->handle(method);
- sessionHandler.out->handle(header);
- sessionHandler.out->handle(content);
- }
-
} else {
FieldTable queueSettings;
@@ -235,11 +192,6 @@ void Bridge::setPersistenceId(uint64_t p
persistenceId = pId;
}
-const string& Bridge::getName() const
-{
- return name;
-}
-
Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
{
string host;
@@ -267,7 +219,7 @@ Bridge::shared_ptr Bridge::decode(LinkRe
is_queue, is_local, id, excludes, dynamic,
sync).first;
}
-void Bridge::encode(Buffer& buffer) const
+void Bridge::encode(Buffer& buffer) const
{
buffer.putShortString(string("bridge"));
buffer.putShortString(link->getHost());
@@ -284,8 +236,8 @@ void Bridge::encode(Buffer& buffer) cons
buffer.putShort(args.i_sync);
}
-uint32_t Bridge::encodedSize() const
-{
+uint32_t Bridge::encodedSize() const
+{
return link->getHost().size() + 1 // short-string (host)
+ 7 // short-string ("bridge")
+ 2 // port
@@ -310,7 +262,7 @@ management::Manageable::status_t Bridge:
management::Args&
/*args*/,
string&)
{
- if (methodId == _qmf::Bridge::METHOD_CLOSE) {
+ if (methodId == _qmf::Bridge::METHOD_CLOSE) {
//notify that we are closed
destroy();
return management::Manageable::STATUS_OK;
@@ -357,7 +309,7 @@ void Bridge::sendReorigin()
conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding,
this,
queueName, args.i_src, args.i_key,
bindArgs));
}
-bool Bridge::resetProxy()
+bool Bridge::resetProxy()
{
SessionHandler& sessionHandler = conn->getChannel(id);
if (!sessionHandler.getSession()) peer.reset();
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.h?rev=1233641&r1=1233640&r2=1233641&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.h Thu Jan 19
23:03:03 2012
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -42,15 +42,19 @@ class Connection;
class ConnectionState;
class Link;
class LinkRegistry;
+class SessionHandler;
class Bridge : public PersistableConfig, public management::Manageable, public
Exchange::DynamicBridge
{
public:
typedef boost::shared_ptr<Bridge> shared_ptr;
typedef boost::function<void(Bridge*)> CancellationListener;
+ typedef boost::function<void(Bridge&, SessionHandler&)> InitializeCallback;
Bridge(Link* link, framing::ChannelId id, CancellationListener l,
- const qmf::org::apache::qpid::broker::ArgsLinkBridge& args);
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& args,
+ InitializeCallback init
+ );
~Bridge();
void create(Connection& c);
@@ -70,8 +74,8 @@ public:
void setPersistenceId(uint64_t id) const;
uint64_t getPersistenceId() const { return persistenceId; }
uint32_t encodedSize() const;
- void encode(framing::Buffer& buffer) const;
- const std::string& getName() const;
+ void encode(framing::Buffer& buffer) const;
+ const std::string& getName() const { return name; }
static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer&
buffer);
// Exchange::DynamicBridge methods
@@ -81,6 +85,10 @@ public:
bool containsLocalTag(const std::string& tagList) const;
const std::string& getLocalTag() const;
+ // Methods needed by initialization functions
+ std::string getQueueName() const { return queueName; }
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return
args; }
+
private:
struct PushHandler : framing::FrameHandler {
PushHandler(Connection* c) { conn = c; }
@@ -103,6 +111,7 @@ private:
mutable uint64_t persistenceId;
ConnectionState* connState;
Connection* conn;
+ InitializeCallback initialize;
bool resetProxy();
};
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1233641&r1=1233640&r2=1233641&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Thu Jan
19 23:03:03 2012
@@ -162,7 +162,9 @@ pair<Bridge::shared_ptr, bool> LinkRegis
const std::string& tag,
const std::string&
excludes,
bool dynamic,
- uint16_t sync)
+ uint16_t sync,
+
Bridge::InitializeCallback init
+)
{
Mutex::ScopedLock locker(lock);
QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " <<
src << " to " << dest << " (" << key << ")");
@@ -196,7 +198,8 @@ pair<Bridge::shared_ptr, bool> LinkRegis
bridge = Bridge::shared_ptr
(new Bridge (l->second.get(), l->second->nextChannel(),
boost::bind(&LinkRegistry::destroy, this,
- host, port, src, dest, key), args));
+ host, port, src, dest, key),
+ args, init));
bridges[bridgeKey] = bridge;
l->second->add(bridge);
return std::pair<Bridge::shared_ptr, bool>(bridge, true);
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=1233641&r1=1233640&r2=1233641&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.h Thu Jan
19 23:03:03 2012
@@ -91,6 +91,7 @@ namespace broker {
const std::string& authMechanism,
const std::string& username,
const std::string& password);
+
std::pair<Bridge::shared_ptr, bool>
declare(const std::string& host,
uint16_t port,
@@ -103,9 +104,12 @@ namespace broker {
const std::string& id,
const std::string& excludes,
bool dynamic,
- uint16_t sync);
+ uint16_t sync,
+ Bridge::InitializeCallback=0
+ );
void destroy(const std::string& host, const uint16_t port);
+
void destroy(const std::string& host,
const uint16_t port,
const std::string& src,
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SessionHandler.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=1233641&r1=1233640&r2=1233641&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SessionHandler.h
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SessionHandler.h Thu Jan
19 23:03:03 2012
@@ -23,6 +23,7 @@
*/
#include "qpid/amqp_0_10/SessionHandler.h"
+#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/AMQP_ClientProxy.h"
namespace qpid {
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1233641&r1=1233640&r2=1233641&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp Thu Jan 19
23:03:03 2012
@@ -21,11 +21,79 @@
#include "Backup.h"
#include "Settings.h"
#include "qpid/Url.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/types/Variant.h"
namespace qpid {
namespace ha {
+using namespace framing;
+using namespace broker;
+using types::Variant;
+
+namespace {
+const std::string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
+}
+
+// Initialize a bridge as a wiring replicator.
+void bridgeInitWiringReplicator(Bridge& bridge, SessionHandler&
sessionHandler) {
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ std::string queueName = bridge.getQueueName();
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge&
args(bridge.getArgs());
+
+ //declare and bind an event queue
+ peer.getQueue().declare(queueName, "", false, false, true, true,
FieldTable());
+ peer.getExchange().bind(queueName, "qmf.default.topic",
"agent.ind.event.org_apache_qpid_broker.#", FieldTable());
+ //subscribe to the queue
+ peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0,
FieldTable());
+ peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+
+ //issue a query request for queues and another for exchanges using event
queue as the reply-to address
+ for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable
utility functions
+ Variant::Map request;
+ request["_what"] = "OBJECT";
+ Variant::Map schema;
+ schema["_class_name"] = (i == 0 ? "queue" : "exchange");
+ schema["_package_name"] = "org.apache.qpid.broker";
+ request["_schema_id"] = schema;
+
+ AMQFrame method((MessageTransferBody(ProtocolVersion(),
"qmf.default.direct", 0, 0)));
+ method.setBof(true);
+ method.setEof(false);
+ method.setBos(true);
+ method.setEos(true);
+ AMQHeaderBody headerBody;
+ MessageProperties* props = headerBody.get<MessageProperties>(true);
+ props->setReplyTo(qpid::framing::ReplyTo("", queueName));
+ props->setAppId("qmf2");
+ props->getApplicationHeaders().setString("qmf.opcode",
"_query_request");
+
headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker");
+ AMQFrame header(headerBody);
+ header.setBof(false);
+ header.setEof(false);
+ header.setBos(true);
+ header.setEos(true);
+ AMQContentBody data;
+ qpid::amqp_0_10::MapCodec::encode(request, data.getData());
+ AMQFrame content(data);
+ content.setBof(false);
+ content.setEof(true);
+ content.setBos(true);
+ content.setEos(true);
+ sessionHandler.out->handle(method);
+ sessionHandler.out->handle(header);
+ sessionHandler.out->handle(content);
+ }
+}
+
Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
// Create a link to replicate wiring
if (s.brokerUrl != "dummy") {
@@ -41,15 +109,17 @@ Backup::Backup(broker::Broker& b, const
broker.getLinks().declare( // Declare the bridge
url[0].host, url[0].port,
false, // durable
- "qpid.wiring-replicator", // src
- "qpid.wiring-replicator", // dest
+ QPID_WIRING_REPLICATOR, // src
+ QPID_WIRING_REPLICATOR, // dest
"x", // key
false, // isQueue
false, // isLocal
"", // id/tag
"", // excludes
false, // dynamic
- 0); // sync?
+ 0, // sync?
+ bridgeInitWiringReplicator
+ );
}
// FIXME aconway 2011-11-17: need to enhance the link code to
// handle discovery of the primary broker and fail-over correctly.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]