Author: aconway
Date: Thu Jan 19 23:03:31 2012
New Revision: 1233644
URL: http://svn.apache.org/viewvc?rev=1233644&view=rev
Log:
QPID-3603: Move wiring-replicator creation out of SemanticState::route.
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am
qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.h
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am?rev=1233644&r1=1233643&r2=1233644&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am Thu Jan 19 23:03:31 2012
@@ -600,8 +600,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/PriorityQueue.cpp \
qpid/broker/NameGenerator.cpp \
qpid/broker/NameGenerator.h \
- qpid/ha/WiringReplicator.h \
- qpid/ha/WiringReplicator.cpp \
qpid/broker/NullMessageStore.cpp \
qpid/broker/NullMessageStore.h \
qpid/broker/OwnershipToken.h \
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk?rev=1233644&r1=1233643&r2=1233644&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk Thu Jan 19 23:03:31 2012
@@ -23,12 +23,14 @@
dmoduleexec_LTLIBRARIES += ha.la
ha_la_SOURCES = \
- qpid/ha/HaPlugin.cpp \
- qpid/ha/HaBroker.cpp \
- qpid/ha/HaBroker.h \
qpid/ha/Backup.cpp \
qpid/ha/Backup.h \
- qpid/ha/Settings.h
+ qpid/ha/HaBroker.cpp \
+ qpid/ha/HaBroker.h \
+ qpid/ha/HaPlugin.cpp \
+ qpid/ha/Settings.h \
+ qpid/ha/WiringReplicator.cpp \
+ qpid/ha/WiringReplicator.h
ha_la_LIBADD = libqpidbroker.la
ha_la_LDFLAGS = $(PLUGINLDFLAGS)
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1233644&r1=1233643&r2=1233644&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu
Jan 19 23:03:31 2012
@@ -480,7 +480,6 @@ void SemanticState::route(intrusive_ptr<
std::string exchangeName = msg->getExchangeName();
if (!cacheExchange || cacheExchange->getName() != exchangeName ||
cacheExchange->isDestroyed()) {
cacheExchange = QueueReplicator::create(exchangeName,
getSession().getBroker().getQueues());
- if (!cacheExchange) cacheExchange =
ha::WiringReplicator::create(exchangeName, getSession().getBroker());
if (!cacheExchange) cacheExchange =
session.getBroker().getExchanges().get(exchangeName);
}
cacheExchange->setProperties(msg);
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1233644&r1=1233643&r2=1233644&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp Thu Jan 19
23:03:31 2012
@@ -20,11 +20,13 @@
*/
#include "Backup.h"
#include "Settings.h"
+#include "WiringReplicator.h"
#include "qpid/Url.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/SessionHandler.h"
+#include "qpid/broker/Link.h"
#include "qpid/framing/AMQP_ServerProxy.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/FieldTable.h"
@@ -39,114 +41,24 @@ using namespace broker;
using types::Variant;
using std::string;
-namespace {
-const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
-const string _WHAT("_what");
-const string _CLASS_NAME("_class_name");
-const string _PACKAGE_NAME("_package_name");
-const string _SCHEMA_ID("_schema_id");
-const string OBJECT("OBJECT");
-const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
-const string QMF_DEFAULT_DIRECT("qmf.default.direct");
-const string QMF2("qmf2");
-const string QMF_OPCODE("qmf.opcode");
-const string _QUERY_REQUEST("_query_request");
-const string BROKER("broker");
-}
-
-void sendQuery(const string className, const string& queueName,
SessionHandler& sessionHandler) {
- framing::AMQP_ServerProxy peer(sessionHandler.out);
- Variant::Map request;
- request[_WHAT] = OBJECT;
- Variant::Map schema;
- schema[_CLASS_NAME] = className;
- schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
- request[_SCHEMA_ID] = schema;
-
- AMQFrame method((MessageTransferBody(ProtocolVersion(),
QMF_DEFAULT_DIRECT, 0, 0)));
- method.setBof(true);
- method.setEof(false);
- method.setBos(true);
- method.setEos(true);
- AMQHeaderBody headerBody;
- MessageProperties* props = headerBody.get<MessageProperties>(true);
- props->setReplyTo(qpid::framing::ReplyTo("", queueName));
- props->setAppId(QMF2);
- props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST);
-
headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER);
- AMQFrame header(headerBody);
- header.setBof(false);
- header.setEof(false);
- header.setBos(true);
- header.setEos(true);
- AMQContentBody data;
- qpid::amqp_0_10::MapCodec::encode(request, data.getData());
- AMQFrame content(data);
- content.setBof(false);
- content.setEof(true);
- content.setBos(true);
- content.setEos(true);
- sessionHandler.out->handle(method);
- sessionHandler.out->handle(header);
- sessionHandler.out->handle(content);
-}
-
-namespace {
-const string QMF_DEFAULT_TOPIC("qmf.default.topic");
-const string
AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
-const string QUEUE("queue");
-const string EXCHANGE("exchange");
-const string BINDING("binding");
-}
-
-// Initialize a bridge as a wiring replicator.
-void bridgeInitWiringReplicator(Bridge& bridge, SessionHandler&
sessionHandler) {
- framing::AMQP_ServerProxy peer(sessionHandler.out);
- string queueName = bridge.getQueueName();
- const qmf::org::apache::qpid::broker::ArgsLinkBridge&
args(bridge.getArgs());
-
- //declare and bind an event queue
- peer.getQueue().declare(queueName, "", false, false, true, true,
FieldTable());
- peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC,
AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
- //subscribe to the queue
- peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0,
FieldTable());
- peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
-
- //issue a query request for queues and another for exchanges using event
queue as the reply-to address
- sendQuery(QUEUE, queueName, sessionHandler);
- sendQuery(EXCHANGE, queueName, sessionHandler);
- sendQuery(BINDING, queueName, sessionHandler);
-}
-
Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
- // Create a link to replicate wiring
if (s.brokerUrl != "dummy") { // FIXME aconway 2011-11-22: temporary hack
to identify primary.
Url url(s.brokerUrl);
QPID_LOG(info, "HA backup broker connecting to: " << url);
-
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
- broker.getLinks().declare( // Declare the link
+
+ // FIXME aconway 2011-11-17: TBD: link management, discovery,
fail-over.
+ // Declare the link
+ std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
url[0].host, url[0].port, protocol,
false, // durable
s.mechanism, s.username, s.password);
-
- broker.getLinks().declare( // Declare the bridge
- url[0].host, url[0].port,
- false, // durable
- QPID_WIRING_REPLICATOR, // src
- QPID_WIRING_REPLICATOR, // dest
- "", // key
- false, // isQueue
- false, // isLocal
- "", // id/tag
- "", // excludes
- false, // dynamic
- 0, // sync?
- bridgeInitWiringReplicator
- );
+ assert(result.second); // FIXME aconway 2011-11-23: error handling
+ link = result.first;
+ boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link));
+ broker.getExchanges().registerExchange(wr);
+ wr->initialize(); // Must be called after registering exchange.
}
- // FIXME aconway 2011-11-17: handle discovery of the primary broker and
fail-over correctly.
}
}} // namespace qpid::ha
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h?rev=1233644&r1=1233643&r2=1233644&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h Thu Jan 19 23:03:31
2012
@@ -24,10 +24,12 @@
#include "Settings.h"
#include "qpid/Url.h"
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
class Broker;
+class Link;
}
namespace ha {
@@ -44,6 +46,7 @@ class Backup
private:
broker::Broker& broker;
Settings settings;
+ boost::shared_ptr<broker::Link> link;
};
}} // namespace qpid::ha
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1233644&r1=1233643&r2=1233644&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Thu Jan
19 23:03:31 2012
@@ -21,9 +21,13 @@
#include "WiringReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/Link.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qmf/org/apache/qpid/broker/EventBind.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
@@ -40,62 +44,69 @@ using qmf::org::apache::qpid::broker::Ev
using qmf::org::apache::qpid::broker::EventQueueDeclare;
using qmf::org::apache::qpid::broker::EventQueueDelete;
using qmf::org::apache::qpid::broker::EventSubscribe;
-
+using namespace framing;
using std::string;
using types::Variant;
using namespace broker;
-namespace{
+namespace {
+const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
const string QPID_REPLICATE("qpid.replicate");
-const string ALL("all");
-const string WIRING("wiring");
const string CLASS_NAME("_class_name");
+const string EVENT("_event");
const string OBJECT_NAME("_object_name");
const string PACKAGE_NAME("_package_name");
-const string VALUES("_values");
-const string EVENT("_event");
-const string SCHEMA_ID("_schema_id");
const string QUERY_RESPONSE("_query_response");
+const string SCHEMA_ID("_schema_id");
+const string VALUES("_values");
-const string ARGUMENTS("arguments");
+const string ALL("all");
+const string ALTEX("altEx");
const string ARGS("args");
-const string QUEUE("queue");
-const string EXCHANGE("exchange");
+const string ARGUMENTS("arguments");
+const string AUTODEL("autoDel");
+const string AUTODELETE("autoDelete");
const string BIND("bind");
const string BINDING("binding");
+const string CREATED("created");
+const string DISP("disp");
const string DURABLE("durable");
-const string QNAME("qName");
-const string AUTODEL("autoDel");
-const string ALTEX("altEx");
-const string USER("user");
-const string RHOST("rhost");
-const string EXTYPE("exType");
+const string EXCHANGE("exchange");
const string EXNAME("exName");
-const string AUTODELETE("autoDelete");
+const string EXTYPE("exType");
+const string KEY("key");
const string NAME("name");
+const string QNAME("qName");
+const string QUEUE("queue");
+const string RHOST("rhost");
const string TYPE("type");
-const string DISP("disp");
-const string CREATED("created");
-const string KEY("key");
-
+const string USER("user");
+const string WIRING("wiring");
-const string QMF_OPCODE("qmf.opcode");
-const string QMF_CONTENT("qmf.content");
+const string
AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
const string QMF2("qmf2");
+const string QMF_CONTENT("qmf.content");
+const string QMF_DEFAULT_TOPIC("qmf.default.topic");
+const string QMF_OPCODE("qmf.opcode");
-const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
-
+const string _WHAT("_what");
+const string _CLASS_NAME("_class_name");
+const string _PACKAGE_NAME("_package_name");
+const string _SCHEMA_ID("_schema_id");
+const string OBJECT("OBJECT");
+const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
+const string QMF_DEFAULT_DIRECT("qmf.default.direct");
+const string _QUERY_REQUEST("_query_request");
+const string BROKER("broker");
-bool isQMFv2(const Message& message)
-{
+bool isQMFv2(const Message& message) {
const framing::MessageProperties* props =
message.getProperties<framing::MessageProperties>();
return props && props->getAppId() == QMF2;
}
-template <class T> bool match(Variant::Map& schema)
-{
+template <class T> bool match(Variant::Map& schema) {
return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
}
@@ -110,13 +121,90 @@ bool isReplicated(const Variant::Map& m)
return i != m.end() && isReplicated(i->second.asString());
}
+void sendQuery(const string className, const string& queueName,
SessionHandler& sessionHandler) {
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ Variant::Map request;
+ request[_WHAT] = OBJECT;
+ Variant::Map schema;
+ schema[_CLASS_NAME] = className;
+ schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
+ request[_SCHEMA_ID] = schema;
+
+ AMQFrame method((MessageTransferBody(ProtocolVersion(),
QMF_DEFAULT_DIRECT, 0, 0)));
+ method.setBof(true);
+ method.setEof(false);
+ method.setBos(true);
+ method.setEos(true);
+ AMQHeaderBody headerBody;
+ MessageProperties* props = headerBody.get<MessageProperties>(true);
+ props->setReplyTo(qpid::framing::ReplyTo("", queueName));
+ props->setAppId(QMF2);
+ props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST);
+
headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER);
+ AMQFrame header(headerBody);
+ header.setBof(false);
+ header.setEof(false);
+ header.setBos(true);
+ header.setEos(true);
+ AMQContentBody data;
+ qpid::amqp_0_10::MapCodec::encode(request, data.getData());
+ AMQFrame content(data);
+ content.setBof(false);
+ content.setEof(true);
+ content.setBos(true);
+ content.setEos(true);
+ sessionHandler.out->handle(method);
+ sessionHandler.out->handle(header);
+ sessionHandler.out->handle(content);
+}
} // namespace
-
-WiringReplicator::WiringReplicator(const string& name, Broker& b) :
Exchange(name), broker(b) {}
-
WiringReplicator::~WiringReplicator() {}
+WiringReplicator::WiringReplicator(const boost::shared_ptr<Link>& l)
+ : Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l)
+{}
+
+// We need to split out the initialization so that the WiringReplicator
+// can be registered as an exchange before starting the bridge.
+void WiringReplicator::initialize() {
+ assert(link->getBroker());
+ broker.getLinks().declare(
+ link->getHost(), link->getPort(),
+ false, // durable
+ QPID_WIRING_REPLICATOR, // src
+ QPID_WIRING_REPLICATOR, // dest
+ "", // key
+ false, // isQueue
+ false, // isLocal
+ "", // id/tag
+ "", // excludes
+ false, // dynamic
+ 0, // sync?
+ boost::bind(&WiringReplicator::initializeBridge, this, _1, _2)
+ );
+}
+
+// This is called in the connection IO thread when the bridge is started.
+void WiringReplicator::initializeBridge(Bridge& bridge, SessionHandler&
sessionHandler) {
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ string queueName = bridge.getQueueName();
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge&
args(bridge.getArgs());
+
+ //declare and bind an event queue
+ peer.getQueue().declare(queueName, "", false, false, true, true,
FieldTable());
+ peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC,
AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
+ //subscribe to the queue
+ peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0,
FieldTable());
+ peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+
+ //issue a query request for queues and another for exchanges using event
queue as the reply-to address
+ sendQuery(QUEUE, queueName, sessionHandler);
+ sendQuery(EXCHANGE, queueName, sessionHandler);
+ sendQuery(BINDING, queueName, sessionHandler);
+}
+
void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const
framing::FieldTable* headers) {
Variant::List list;
try {
@@ -176,7 +264,10 @@ void WiringReplicator::doEventQueueDecla
args,
values[USER].asString(),
values[RHOST].asString()).second) {
- // FIXME aconway 2011-11-22: should delete old queue and re-create
from exchanges.
+ // FIXME aconway 2011-11-22: should delete old queue and
+ // re-create from event.
+ // Events are always up to date, whereas responses may be
+ // out of date.
QPID_LOG(warning, "Replicated queue " << name << " already
exists");
}
}
@@ -209,7 +300,7 @@ void WiringReplicator::doEventExchangeDe
values[USER].asString(),
values[RHOST].asString()).second) {
// FIXME aconway 2011-11-22: should delete pre-exisitng exchange
- // and re-create from event. Likewise for queues.
+ // and re-create from event. See comment in doEventQueueDeclare.
QPID_LOG(warning, "Replicated exchange " << name << " already
exists");
}
}
@@ -286,8 +377,6 @@ void WiringReplicator::doResponseExchang
}
}
-// FIXME aconway 2011-11-21: refactor to remove redundancy between do*
functions.
-
namespace {
const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:");
const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:");
@@ -299,7 +388,7 @@ std::string getRefName(const std::string
throw Exception(QPID_MSG("Replicator: invalid object reference: " <<
ref));
const std::string name = i->second.asString();
if (name.compare(0, prefix.size(), prefix) != 0)
- throw Exception(QPID_MSG("Replicator unexpected reference prefix: " <<
name));
+ throw Exception(QPID_MSG("Replicator: unexpected reference prefix: "
<< name));
std::string ret = name.substr(prefix.size());
return ret;
}
@@ -336,21 +425,6 @@ void WiringReplicator::doResponseBind(Va
} catch (const framing::NotFoundException& e) {} // Ignore unreplicated
queue or exchange.
}
-boost::shared_ptr<Exchange> WiringReplicator::create(const string& target,
Broker& broker)
-{
- boost::shared_ptr<Exchange> exchange;
- if (isWiringReplicatorDestination(target)) {
- //TODO: need to cache the exchange
- exchange.reset(new WiringReplicator(target, broker));
- }
- return exchange;
-}
-
-bool WiringReplicator::isWiringReplicatorDestination(const string& target)
-{
- return target == QPID_WIRING_REPLICATOR;
-}
-
bool WiringReplicator::bind(boost::shared_ptr<Queue>, const string&, const
framing::FieldTable*) { return false; }
bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const string&, const
framing::FieldTable*) { return false; }
bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const string* const,
const framing::FieldTable* const) { return false; }
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.h?rev=1233644&r1=1233643&r2=1233644&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.h Thu Jan
19 23:03:31 2012
@@ -24,13 +24,15 @@
#include "qpid/broker/Exchange.h"
#include "qpid/types/Variant.h"
-
-// FIXME aconway 2011-11-17: relocate to ../ha
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
class Broker;
+class Link;
+class Bridge;
+class SessionHandler;
}
namespace ha {
@@ -42,19 +44,23 @@ namespace ha {
class WiringReplicator : public broker::Exchange
{
public:
- WiringReplicator(const std::string&, broker::Broker&);
+ WiringReplicator(const boost::shared_ptr<broker::Link>&);
~WiringReplicator();
std::string getType() const;
+
+ // Call this after the WiringReplicator has been registered as an exchange.
+ void initialize();
+
+ // Exchange methods
bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const
framing::FieldTable*);
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const
framing::FieldTable*);
void route(broker::Deliverable&, const std::string&, const
framing::FieldTable*);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const,
const framing::FieldTable* const);
- static bool isWiringReplicatorDestination(const std::string&);
- static boost::shared_ptr<broker::Exchange> create(const std::string&,
broker::Broker&);
static const std::string typeName;
- private:
+ private:
+ void initializeBridge(broker::Bridge&, broker::SessionHandler&);
void doEventQueueDeclare(types::Variant::Map& values);
void doEventQueueDelete(types::Variant::Map& values);
void doEventExchangeDeclare(types::Variant::Map& values);
@@ -65,7 +71,10 @@ class WiringReplicator : public broker::
void doResponseBind(types::Variant::Map& values);
private:
+ void startQueueReplicator(const std::string& name);
+
broker::Broker& broker;
+ boost::shared_ptr<broker::Link> link;
};
}} // namespace qpid::broker
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]