Author: aconway
Date: Thu Jan 19 23:03:52 2012
New Revision: 1233646
URL: http://svn.apache.org/viewvc?rev=1233646&view=rev
Log:
QPID-3603: Automatic wiring and message replication.
Added:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
(contents, props changed)
- copied, changed from r1233645,
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/QueueReplicator.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h
(contents, props changed)
- copied, changed from r1233645,
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/QueueReplicator.h
Removed:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/QueueReplicator.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/QueueReplicator.h
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am
qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.h
qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py
qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am?rev=1233646&r1=1233645&r2=1233646&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am Thu Jan 19 23:03:52 2012
@@ -627,8 +627,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/QueuedMessage.h \
qpid/broker/QueueFlowLimit.h \
qpid/broker/QueueFlowLimit.cpp \
- qpid/broker/QueueReplicator.h \
- qpid/broker/QueueReplicator.cpp \
qpid/broker/ReplicatingSubscription.h \
qpid/broker/ReplicatingSubscription.cpp \
qpid/broker/RateFlowcontrol.h \
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk?rev=1233646&r1=1233645&r2=1233646&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk Thu Jan 19 23:03:52 2012
@@ -29,6 +29,8 @@ ha_la_SOURCES = \
qpid/ha/HaBroker.h \
qpid/ha/HaPlugin.cpp \
qpid/ha/Settings.h \
+ qpid/ha/QueueReplicator.h \
+ qpid/ha/QueueReplicator.cpp \
qpid/ha/WiringReplicator.cpp \
qpid/ha/WiringReplicator.h
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1233646&r1=1233645&r2=1233646&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp Thu Jan 19
23:03:52 2012
@@ -25,7 +25,6 @@
#include "qpid/broker/Link.h"
#include "qpid/broker/LinkRegistry.h"
#include "qpid/ha/WiringReplicator.h"
-#include "qpid/broker/QueueReplicator.h"
#include "qpid/broker/SessionState.h"
#include "qpid/management/ManagementAgent.h"
@@ -112,10 +111,7 @@ void Bridge::create(Connection& c)
if (args.i_srcIsLocal)
sessionHandler.getSession()->disableReceiverTracking();
if (initialize) initialize(*this, sessionHandler);
else if (args.i_srcIsQueue) {
- //TODO: something other than this which is nasty...
- bool isReplicatingLink =
QueueReplicator::initReplicationSettings(args.i_dest,
link->getBroker()->getQueues(), options);
-
- peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0
: 1, isReplicatingLink ? 1 : 0, false, "", 0, options);
+ peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0
: 1, 0, false, "", 0, options);
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
QPID_LOG(debug, "Activated route from queue " << args.i_src << " to "
<< args.i_dest);
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h?rev=1233646&r1=1233645&r2=1233646&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h Thu Jan 19
23:03:52 2012
@@ -105,6 +105,8 @@ namespace qpid {
std::string getHost() { return host; }
uint16_t getPort() { return port; }
+ std::string getTransport() { return transport; }
+
bool isDurable() { return durable; }
void maintenanceVisit ();
uint nextChannel();
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1233646&r1=1233645&r2=1233646&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.cpp Thu Jan 19
23:03:52 2012
@@ -1434,7 +1434,7 @@ bool Queue::bind(boost::shared_ptr<Excha
}
-const Broker* Queue::getBroker()
+Broker* Queue::getBroker()
{
return broker;
}
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.h?rev=1233646&r1=1233645&r2=1233646&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.h Thu Jan 19
23:03:52 2012
@@ -403,7 +403,7 @@ class Queue : public boost::enable_share
void flush();
- const Broker* getBroker();
+ Broker* getBroker();
uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
void setDequeueSincePurge(uint32_t value);
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1233646&r1=1233645&r2=1233646&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu
Jan 19 23:03:52 2012
@@ -27,7 +27,6 @@
#include "qpid/broker/Message.h"
#include "qpid/ha/WiringReplicator.h"
#include "qpid/broker/Queue.h"
-#include "qpid/broker/QueueReplicator.h"
#include "qpid/broker/ReplicatingSubscription.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SessionOutputException.h"
@@ -478,9 +477,10 @@ void SemanticState::route(intrusive_ptr<
msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
std::string exchangeName = msg->getExchangeName();
- if (!cacheExchange || cacheExchange->getName() != exchangeName ||
cacheExchange->isDestroyed()) {
- cacheExchange = QueueReplicator::create(exchangeName,
getSession().getBroker().getQueues());
- if (!cacheExchange) cacheExchange =
session.getBroker().getExchanges().get(exchangeName);
+ if (!cacheExchange || cacheExchange->getName() != exchangeName
+ || cacheExchange->isDestroyed())
+ {
+ cacheExchange = session.getBroker().getExchanges().get(exchangeName);
}
cacheExchange->setProperties(msg);
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1233646&r1=1233645&r2=1233646&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp Thu Jan 19
23:03:52 2012
@@ -42,9 +42,10 @@ using types::Variant;
using std::string;
Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
+ // FIXME aconway 2011-11-24: identifying the primary. Only has 1 address.
if (s.brokerUrl != "dummy") { // FIXME aconway 2011-11-22: temporary hack
to identify primary.
Url url(s.brokerUrl);
- QPID_LOG(info, "HA backup broker connecting to: " << url);
+ QPID_LOG(info, "HA: Acting as backup to " << url);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
// FIXME aconway 2011-11-17: TBD: link management, discovery,
fail-over.
Copied: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
(from r1233645,
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/QueueReplicator.cpp)
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?p2=qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp&p1=qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/QueueReplicator.cpp&r1=1233645&r2=1233646&rev=1233646&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/QueueReplicator.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Jan
19 23:03:52 2012
@@ -18,18 +18,62 @@
* under the License.
*
*/
-#include "qpid/broker/QueueReplicator.h"
+
+#include "QueueReplicator.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Link.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/SequenceSet.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+
+namespace {
+const std::string QPID_REPLICATOR_("qpid.replicator-");
+}
namespace qpid {
-namespace broker {
+namespace ha {
+using namespace broker;
+
+QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q,
boost::shared_ptr<Link> l)
+ : Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway
2011-11-24: hidden from management?
+ queue(q), link(l), current(queue->getPosition())
+{
+ // FIXME aconway 2011-11-24: consistent logging.
+ QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " <<
q->getSettings());
+ queue->getBroker()->getLinks().declare(
+ link->getHost(), link->getPort(),
+ false, // durable
+ queue->getName(), // src
+ getName(), // dest
+ "", // key
+ false, // isQueue
+ false, // isLocal
+ "", // id/tag
+ "", // excludes
+ false, // dynamic
+ 0, // sync?
+ boost::bind(&QueueReplicator::initializeBridge, this, _1, _2)
+ );
+}
-QueueReplicator::QueueReplicator(const std::string& name,
boost::shared_ptr<Queue> q) : Exchange(name, 0, 0), queue(q),
current(queue->getPosition()) {}
QueueReplicator::~QueueReplicator() {}
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler&
sessionHandler) {
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge&
args(bridge.getArgs());
+ peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1,
0, false, "", 0, framing::FieldTable());
+ peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
+ peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
+ QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " <<
args.i_dest);
+
+}
+
+
namespace {
const std::string DEQUEUE_EVENT("dequeue-event");
const std::string REPLICATOR("qpid.replicator-");
@@ -78,23 +122,6 @@ bool QueueReplicator::isReplicatingLink(
return name.find(REPLICATOR) == 0;
}
-boost::shared_ptr<Exchange> QueueReplicator::create(const std::string& target,
QueueRegistry& queues)
-{
- boost::shared_ptr<Exchange> exchange;
- if (isReplicatingLink(target)) {
- std::string queueName = target.substr(REPLICATOR.size());
- boost::shared_ptr<Queue> queue = queues.find(queueName);
- if (!queue) {
- QPID_LOG(warning, "Unable to create replicator, can't find " <<
queueName);
- } else {
- //TODO: need to cache the replicator
- QPID_LOG(info, "Creating replicator for " << queueName);
- exchange.reset(new QueueReplicator(target, queue));
- }
- }
- return exchange;
-}
-
bool QueueReplicator::initReplicationSettings(const std::string& target,
QueueRegistry& queues, qpid::framing::FieldTable& settings)
{
if (isReplicatingLink(target)) {
Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h (from
r1233645, qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/QueueReplicator.h)
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h?p2=qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h&p1=qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/QueueReplicator.h&r1=1233645&r2=1233646&rev=1233646&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/QueueReplicator.h
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h Thu Jan 19
23:03:52 2012
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_QUEUEREPLICATOR_H
-#define QPID_BROKER_QUEUEREPLICATOR_H
+#ifndef QPID_HA_QUEUEREPLICATOR_H
+#define QPID_HA_QUEUEREPLICATOR_H
/*
*
@@ -25,33 +25,43 @@
#include "qpid/framing/SequenceSet.h"
namespace qpid {
-namespace broker {
+namespace broker {
+class Bridge;
+class Link;
+class Queue;
class QueueRegistry;
+class SessionHandler;
+class Deliverable;
+}
+
+namespace ha {
/**
* Dummy exchange for processing replication messages
*/
-class QueueReplicator : public Exchange
+class QueueReplicator : public broker::Exchange
{
public:
- QueueReplicator(const std::string& name, boost::shared_ptr<Queue>);
+ QueueReplicator(boost::shared_ptr<broker::Queue> q,
boost::shared_ptr<broker::Link> l);
~QueueReplicator();
std::string getType() const;
- bool bind(boost::shared_ptr<Queue>, const std::string&, const
qpid::framing::FieldTable*);
- bool unbind(boost::shared_ptr<Queue>, const std::string&, const
qpid::framing::FieldTable*);
- void route(Deliverable&, const std::string&, const
qpid::framing::FieldTable*);
- bool isBound(boost::shared_ptr<Queue>, const std::string* const, const
qpid::framing::FieldTable* const);
+ bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const
framing::FieldTable*);
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const
framing::FieldTable*);
+ void route(broker::Deliverable&, const std::string&, const
framing::FieldTable*);
+ bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const,
const framing::FieldTable* const);
static bool isReplicatingLink(const std::string&);
- static boost::shared_ptr<Exchange> create(const std::string&,
QueueRegistry&);
- static bool initReplicationSettings(const std::string&, QueueRegistry&,
qpid::framing::FieldTable&);
+ static bool initReplicationSettings(const std::string&,
broker::QueueRegistry&, framing::FieldTable&);
static const std::string typeName;
private:
- boost::shared_ptr<Queue> queue;
- qpid::framing::SequenceNumber current;
- qpid::framing::SequenceSet dequeued;
+ void initializeBridge(broker::Bridge& bridge, broker::SessionHandler&
sessionHandler);
+
+ boost::shared_ptr<broker::Queue> queue;
+ boost::shared_ptr<broker::Link> link;
+ framing::SequenceNumber current;
+ framing::SequenceSet dequeued;
};
-}} // namespace qpid::broker
+}} // namespace qpid::ha
-#endif /*!QPID_BROKER_QUEUEREPLICATOR_H*/
+#endif /*!QPID_HA_QUEUEREPLICATOR_H*/
Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1233646&r1=1233645&r2=1233646&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Thu Jan
19 23:03:52 2012
@@ -19,6 +19,7 @@
*
*/
#include "WiringReplicator.h"
+#include "QueueReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Link.h"
@@ -62,7 +63,6 @@ const string QUERY_RESPONSE("_query_resp
const string SCHEMA_ID("_schema_id");
const string VALUES("_values");
-const string ALL("all");
const string ALTEX("altEx");
const string ARGS("args");
const string ARGUMENTS("arguments");
@@ -83,7 +83,6 @@ const string QUEUE("queue");
const string RHOST("rhost");
const string TYPE("type");
const string USER("user");
-const string WIRING("wiring");
const string
AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
const string QMF2("qmf2");
@@ -110,15 +109,33 @@ template <class T> bool match(Variant::M
return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
}
-bool isReplicated(const string& value) {
- return value == ALL || value == WIRING;
+// FIXME aconway 2011-11-24: this should be a class.
+enum ReplicateLevel { RL_NONE=0, RL_WIRING, RL_ALL };
+const string S_NONE="none";
+const string S_WIRING="wiring";
+const string S_ALL="all";
+
+ReplicateLevel replicateLevel(const string& str) {
+ // FIXME aconway 2011-11-24: case insenstive comparison.
+ QPID_LOG(critical, "FIXME replicateLevel " << str);
+ ReplicateLevel rl = RL_NONE;
+ if (str == S_WIRING) rl = RL_WIRING;
+ else if (str == S_ALL) rl = RL_ALL;
+ QPID_LOG(critical, "FIXME replicateLevel " << str << " = " << rl);
+ return rl;
+}
+
+ReplicateLevel replicateLevel(const framing::FieldTable& f) {
+ QPID_LOG(critical, "FIXME replicateLevel " << f);
+ if (f.isSet(QPID_REPLICATE)) return
replicateLevel(f.getAsString(QPID_REPLICATE));
+ else return RL_NONE;
}
-bool isReplicated(const framing::FieldTable& f) {
- return f.isSet(QPID_REPLICATE) &&
isReplicated(f.getAsString(QPID_REPLICATE));
-}
-bool isReplicated(const Variant::Map& m) {
+
+ReplicateLevel replicateLevel(const Variant::Map& m) {
+ QPID_LOG(critical, "FIXME replicateLevel " << m);
Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
- return i != m.end() && isReplicated(i->second.asString());
+ if (i != m.end()) return replicateLevel(i->second.asString());
+ else return RL_NONE;
}
void sendQuery(const string className, const string& queueName,
SessionHandler& sessionHandler) {
@@ -164,6 +181,8 @@ WiringReplicator::~WiringReplicator() {}
WiringReplicator::WiringReplicator(const boost::shared_ptr<Link>& l)
: Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l)
{
+ QPID_LOG(debug, "HA: Starting replication from " <<
+ link->getTransport() << ":" << link->getHost() << ":" <<
link->getPort());
broker.getLinks().declare(
link->getHost(), link->getPort(),
false, // durable
@@ -198,6 +217,7 @@ void WiringReplicator::initializeBridge(
sendQuery(QUEUE, queueName, sessionHandler);
sendQuery(EXCHANGE, queueName, sessionHandler);
sendQuery(BINDING, queueName, sessionHandler);
+ QPID_LOG(debug, "Activated wiring replicator")
}
void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const
framing::FieldTable* headers) {
@@ -244,13 +264,15 @@ void WiringReplicator::route(Deliverable
}
void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
+ QPID_LOG(critical, "FIXME doEventQueueDeclare " << values);
string name = values[QNAME].asString();
Variant::Map argsMap = values[ARGS].asMap();
- if (values[DISP] == CREATED && isReplicated(argsMap)) {
- QPID_LOG(debug, "Creating replicated queue " << name);
+ if (values[DISP] == CREATED && replicateLevel(argsMap)) {
+ QPID_LOG(debug, "HA: Creating replicated queue " << name);
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
- if (!broker.createQueue(
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ broker.createQueue(
name,
values[DURABLE].asBool(),
values[AUTODEL].asBool(),
@@ -258,11 +280,14 @@ void WiringReplicator::doEventQueueDecla
values[ALTEX].asString(),
args,
values[USER].asString(),
- values[RHOST].asString()).second) {
+ values[RHOST].asString());
+ if (result.second) {
// FIXME aconway 2011-11-22: should delete old queue and
// re-create from event.
// Events are always up to date, whereas responses may be
// out of date.
+ startQueueReplicator(result.first);
+ } else {
QPID_LOG(warning, "Replicated queue " << name << " already
exists");
}
}
@@ -271,7 +296,7 @@ void WiringReplicator::doEventQueueDecla
void WiringReplicator::doEventQueueDelete(Variant::Map& values) {
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
- if (queue && isReplicated(queue->getSettings())) {
+ if (queue && replicateLevel(queue->getSettings())) {
QPID_LOG(debug, "Deleting replicated queue " << name);
broker.deleteQueue(
name,
@@ -282,7 +307,7 @@ void WiringReplicator::doEventQueueDelet
void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) {
Variant::Map argsMap(values[ARGS].asMap());
- if (values[DISP] == CREATED && isReplicated(argsMap)) {
+ if (values[DISP] == CREATED && replicateLevel(argsMap)) {
string name = values[EXNAME].asString();
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
@@ -305,7 +330,7 @@ void WiringReplicator::doEventExchangeDe
string name = values[EXNAME].asString();
try {
boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
- if (exchange && isReplicated(exchange->getArgs())) {
+ if (exchange && replicateLevel(exchange->getArgs())) {
QPID_LOG(debug, "Deleting replicated exchange " << name);
broker.deleteExchange(
name,
@@ -320,7 +345,7 @@ void WiringReplicator::doEventBind(Varia
boost::shared_ptr<Exchange> exchange =
broker.getExchanges().get(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
broker.getQueues().find(values[QNAME].asString());
// We only replicated a binds for a replicated queue to replicated
exchange.
- if (isReplicated(exchange->getArgs()) &&
isReplicated(queue->getSettings())) {
+ if (replicateLevel(exchange->getArgs()) &&
replicateLevel(queue->getSettings())) {
framing::FieldTable args;
amqp_0_10::translate(values[ARGS].asMap(), args);
string key = values[KEY].asString();
@@ -333,21 +358,28 @@ void WiringReplicator::doEventBind(Varia
}
void WiringReplicator::doResponseQueue(Variant::Map& values) {
+ QPID_LOG(critical, "FIXME doResponseQueue " << values);
// FIXME aconway 2011-11-22: more flexible ways & defaults to indicate
replication
Variant::Map argsMap(values[ARGUMENTS].asMap());
- if (!isReplicated(argsMap)) return;
+ QPID_LOG(critical, "FIXME doResponseQueue replevel " <<
replicateLevel(argsMap));
+ if (!replicateLevel(argsMap)) return;
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
+ string name(values[NAME].asString());
QPID_LOG(debug, "Creating replicated queue " << values[NAME].asString() <<
" (in catch-up)");
- if (!broker.createQueue(
- values[NAME].asString(),
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ broker.createQueue(
+ name,
values[DURABLE].asBool(),
values[AUTODELETE].asBool(),
0 /*i.e. no owner regardless of exclusivity on master*/,
""/*TODO: need to include alternate-exchange*/,
args,
""/*TODO: who is the user?*/,
- ""/*TODO: what should we use as connection id?*/).second) {
+ ""/*TODO: what should we use as connection id?*/);
+ if (result.second) {
+ startQueueReplicator(result.first);
+ } else {
// FIXME aconway 2011-11-22: Normal to find queue already
// exists if we're failing over.
QPID_LOG(warning, "Replicated queue " << values[NAME] << " already
exists (in catch-up)");
@@ -356,7 +388,7 @@ void WiringReplicator::doResponseQueue(V
void WiringReplicator::doResponseExchange(Variant::Map& values) {
Variant::Map argsMap(values[ARGUMENTS].asMap());
- if (!isReplicated(argsMap)) return;
+ if (!replicateLevel(argsMap)) return;
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
QPID_LOG(debug, "Creating replicated exchange " << values[NAME].asString()
<< " (in catch-up)");
@@ -396,23 +428,21 @@ const std::string QUEUE_REF("queueRef");
void WiringReplicator::doResponseBind(Variant::Map& values) {
try {
std::string exName = getRefName(EXCHANGE_REF_PREFIX,
values[EXCHANGE_REF]);
- boost::shared_ptr<Exchange> exchange =
broker.getExchanges().get(exName);
- if (!exchange) return;
-
std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
+ QPID_LOG(critical, "FIXME doResponseBind " << qName << " to " <<
exName);
+ boost::shared_ptr<Exchange> exchange =
broker.getExchanges().get(exName);
boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
- if (!queue) return;
+ QPID_LOG(critical, "FIXME doResponseBind ptrs " << queue.get() << " to
" << exchange.get());
+ // FIXME aconway 2011-11-24: more flexible configuration for binding
replication.
- // We only replicated a bind for a replicated queue to replicated
exchange.
- // FIXME aconway 2011-11-22: do we always log binds between replicated
ex/q
- // or do we consider the bind arguments as well?
- if (exchange && queue &&
- isReplicated(exchange->getArgs()) &&
isReplicated(queue->getSettings()))
+ // Automatically replicate exchange if queue and exchange are
replicated
+ if (exchange && replicateLevel(exchange->getArgs()) &&
+ queue && replicateLevel(queue->getSettings()))
{
framing::FieldTable args;
amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
string key = values[KEY].asString();
- QPID_LOG(debug, "Replicated binding exchange=" <<
exchange->getName()
+ QPID_LOG(debug, "HA: Replicated binding exchange=" <<
exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
exchange->bind(queue, key, &args);
@@ -420,6 +450,15 @@ void WiringReplicator::doResponseBind(Va
} catch (const framing::NotFoundException& e) {} // Ignore unreplicated
queue or exchange.
}
+void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>&
queue) {
+ QPID_LOG(critical, "FIXME startQueueReplicator " << queue->getName() << "
" << queue->getSettings());
+ if (replicateLevel(queue->getSettings()) == RL_ALL) {
+ QPID_LOG(critical, "FIXME startQueueReplicator starting");
+ boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue,
link));
+ broker.getExchanges().registerExchange(qr);
+ }
+}
+
bool WiringReplicator::bind(boost::shared_ptr<Queue>, const string&, const
framing::FieldTable*) { return false; }
bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const string&, const
framing::FieldTable*) { return false; }
bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const string* const,
const framing::FieldTable* const) { return false; }
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.h?rev=1233646&r1=1233645&r2=1233646&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.h Thu Jan
19 23:03:52 2012
@@ -68,7 +68,7 @@ class WiringReplicator : public broker::
void doResponseBind(types::Variant::Map& values);
private:
- void startQueueReplicator(const std::string& name);
+ void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
broker::Broker& broker;
boost::shared_ptr<broker::Link> link;
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py?rev=1233646&r1=1233645&r2=1233646&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py Thu Jan 19
23:03:52 2012
@@ -519,6 +519,12 @@ class BrokerTest(TestCase):
actual_contents = self.browse(session, queue, timeout)
self.assertEqual(expect_contents, actual_contents)
+ def assert_browse_retry(self, session, queue, expect_contents, timeout=1,
delay=.01):
+ """Wait up to timeout for contents of queue to match expect_contents"""
+ def test(): return self.browse(session, queue, 0) == expect_contents
+ retry(test, timeout, delay)
+ self.assertEqual(expect_contents, self.browse(session, queue, 0))
+
def join(thread, timeout=10):
thread.join(timeout)
if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1233646&r1=1233645&r2=1233646&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Thu Jan 19
23:03:52 2012
@@ -57,50 +57,52 @@ class ShortTests(BrokerTest):
def assert_missing(self,session, address):
try:
- session.receiver(a)
+ session.receiver(address)
self.fail("Should not have been replicated: %s"%(address))
except NotFound: pass
- def test_replicate_wiring(self):
-
queue="%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"
-
exchange="%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s},
type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"
+ def test_replication(self):
+ def queue(name, replicate):
+ return
"%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name,
replicate)
+
+ def exchange(name, replicate, bindq):
+
return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s},
type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate,
name, bindq)
+ def setup(p, prefix):
+ """Create config, send messages on the primary p"""
+ p.sender(queue(prefix+"q1", "all")).send(Message("1"))
+ p.sender(queue(prefix+"q2", "wiring")).send(Message("2"))
+ p.sender(queue(prefix+"q3", "none")).send(Message("3"))
+ p.sender(exchange(prefix+"e1", "all",
prefix+"q1")).send(Message("4"))
+ p.sender(exchange(prefix+"e2", "all",
prefix+"q2")).send(Message("5"))
+ # FIXME aconway 2011-11-24: need a marker so we can wait till sync
is done.
+ p.sender(queue(prefix+"x", "wiring"))
+
+ def verify(b, prefix):
+ """Verify setup was replicated to backup b"""
+ # FIXME aconway 2011-11-21: wait for wiring to replicate.
+ self.wait(b, prefix+"x");
+ # Verify backup
+ # FIXME aconway 2011-11-24: assert_browse_retry to deal with async
replication.
+ self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
+ self.assert_browse_retry(b, prefix+"q2", []) # wiring only
+ self.assert_missing(b, prefix+"q3")
+ b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds
with replicate=all
+ self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
+ b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds
with replicate=wiring
+ self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
- # Create some wiring before starting the backup, to test catch-up
+ # Create config, send messages before starting the backup, to test
catch-up replication.
primary = self.ha_broker(name="primary")
p = primary.connect().session()
- p.sender(queue%("q1", "all")).send(Message("1"))
- p.sender(queue%("q2", "wiring")).send(Message("2"))
- p.sender(queue%("q3", "none")).send(Message("3"))
- p.sender(exchange%("e1", "all", "e1", "q2")).send(Message("4"))
-
- # Create some after starting backup, test steady-state replication
+ setup(p, "1")
+ # Start the backup
backup = self.ha_broker(name="backup", broker_url=primary.host_port())
b = backup.connect().session()
- # FIXME aconway 2011-11-21: need to wait for backup to be ready to
test event replication
- for a in ["q1", "q2", "e1"]: self.wait(b,a)
- p.sender(queue%("q11", "all")).send(Message("11"))
- p.sender(queue%("q12", "wiring")).send(Message("12"))
- p.sender(queue%("q13", "none")).send(Message("13"))
- p.sender(exchange%("e11", "all", "e11", "q12")).send(Message("14"))
-
- # Verify replication
- # FIXME aconway 2011-11-18: We should kill primary here and fail over.
- for a in ["q11", "q12", "e11"]: self.wait(b,a)
- # FIXME aconway 2011-11-18: replicate messages
-# self.assert_browse(b, "q11", ["11", "14", "e11"])
-# self.assert_browse(b, "q12", []) # wiring only
-# self.assert_missing(b,"q13")
- b.sender("e11").send(Message("e11")) # Verify bind
- self.assert_browse(b, "q12", ["e11"])
-
- for a in ["q1", "q2", "e1"]: self.wait(b,a)
- # FIXME aconway 2011-11-18: replicate messages
-# self.assert_browse(b, "q1", ["1", "4", "e1"])
-# self.assert_browse(b, "q2", []) # wiring only
-# self.assert_missing(b,"q3")
- b.sender("e1").send(Message("e1")) # Verify bind
- self.assert_browse(b, "q2", ["e1"])
+ verify(b, "1")
+ # Create config, send messages after starting the backup, to test
steady-state replication.
+ setup(p, "2")
+ verify(b, "2")
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]