Author: aconway
Date: Thu Jan 19 23:04:14 2012
New Revision: 1233649
URL: http://svn.apache.org/viewvc?rev=1233649&view=rev
Log:
QPID-3603: In progress - integrate ReplicatingSubscription.
The code to use ReplicatingSubscription is there but it is disabled by
commenting out getConsumerFactories().add in Backup.cpp because it
hangs the test.
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1233649&r1=1233648&r2=1233649&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp Thu Jan 19
23:04:14 2012
@@ -21,6 +21,7 @@
#include "Backup.h"
#include "Settings.h"
#include "WiringReplicator.h"
+#include "ReplicatingSubscription.h"
#include "qpid/Url.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/Bridge.h"
@@ -58,6 +59,12 @@ Backup::Backup(broker::Broker& b, const
link = result.first;
boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link));
broker.getExchanges().registerExchange(wr);
+
+ // FIXME aconway 2011-11-25: using ReplicatingSubscription hangs the
tests
+ // The tests pass with a plain subscription if we dont add the factory.
+// broker.getConsumerFactories().add(
+// boost::shared_ptr<ReplicatingSubscription::Factory>(
+// new ReplicatingSubscription::Factory()));
}
}
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1233649&r1=1233648&r2=1233649&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp Thu Jan 19
23:04:14 2012
@@ -58,7 +58,7 @@ HaBroker::HaBroker(broker::Broker& b, co
mgmtObject->set_status("solo");
ma->addObject(mgmtObject);
}
- QPID_LOG(notice, "HA broker initialized, client-url=" << clientUrl
+ QPID_LOG(notice, "HA: broker initialized, client-url=" << clientUrl
<< ", broker-url=" << brokerUrl);
backup.reset(new Backup(broker, s));
}
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1233649&r1=1233648&r2=1233649&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp Thu Jan 19
23:04:14 2012
@@ -56,10 +56,10 @@ struct HaPlugin : public Plugin {
void initialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
if (broker && settings.enabled) {
- QPID_LOG(info, "HA plugin enabled");
+ QPID_LOG(info, "HA: Enabled");
haBroker.reset(new ha::HaBroker(*broker, settings));
} else
- QPID_LOG(info, "HA plugin disabled");
+ QPID_LOG(info, "HA: Disabled");
}
};
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1233649&r1=1233648&r2=1233649&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
Thu Jan 19 23:04:14 2012
@@ -66,6 +66,23 @@ std::string mask(const std::string& in)
return DOLLAR + in + INTERNAL;
}
+boost::shared_ptr<broker::SemanticState::ConsumerImpl>
+ReplicatingSubscription::Factory::create(
+ SemanticState* _parent,
+ const std::string& _name,
+ Queue::shared_ptr _queue,
+ bool ack,
+ bool _acquire,
+ bool _exclusive,
+ const std::string& _tag,
+ const std::string& _resumeId,
+ uint64_t _resumeTtl,
+ const framing::FieldTable& _arguments
+) {
+ return boost::shared_ptr<broker::SemanticState::ConsumerImpl>(
+ new ReplicatingSubscription(_parent, _name, _queue, ack, _acquire,
_exclusive, _tag, _resumeId, _resumeTtl, _arguments));
+}
+
ReplicatingSubscription::ReplicatingSubscription(
SemanticState* _parent,
const std::string& _name,
@@ -81,7 +98,8 @@ ReplicatingSubscription::ReplicatingSubs
events(new Queue(mask(_name))),
consumer(new DelegatingConsumer(*this))
{
-
+ // FIXME aconway 2011-11-25: string constants.
+ QPID_LOG(debug, "HA: replicating subscription " << _name << " to " <<
_queue->getName());
if (_arguments.isSet("qpid.high_sequence_number")) {
qpid::framing::SequenceNumber hwm =
_arguments.getAsInt("qpid.high_sequence_number");
qpid::framing::SequenceNumber lwm;
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1233649&r1=1233648&r2=1233649&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
Thu Jan 19 23:04:14 2012
@@ -24,6 +24,7 @@
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/ConsumerFactory.h"
namespace qpid {
@@ -43,11 +44,21 @@ class ReplicatingSubscription : public b
public broker::QueueObserver
{
public:
+ struct Factory : public broker::ConsumerFactory {
+ boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
+ broker::SemanticState* parent,
+ const std::string& name, boost::shared_ptr<broker::Queue> ,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl,
+ const framing::FieldTable& arguments);
+ };
+
ReplicatingSubscription(broker::SemanticState* parent,
const std::string& name,
boost::shared_ptr<broker::Queue> ,
bool ack, bool acquire, bool exclusive, const
std::string& tag,
const std::string& resumeId, uint64_t resumeTtl,
const framing::FieldTable& arguments);
+
~ReplicatingSubscription();
void init();
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1233649&r1=1233648&r2=1233649&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Thu Jan
19 23:04:14 2012
@@ -117,22 +117,18 @@ const string S_ALL="all";
ReplicateLevel replicateLevel(const string& str) {
// FIXME aconway 2011-11-24: case insenstive comparison.
- QPID_LOG(critical, "FIXME replicateLevel " << str);
ReplicateLevel rl = RL_NONE;
if (str == S_WIRING) rl = RL_WIRING;
else if (str == S_ALL) rl = RL_ALL;
- QPID_LOG(critical, "FIXME replicateLevel " << str << " = " << rl);
return rl;
}
ReplicateLevel replicateLevel(const framing::FieldTable& f) {
- QPID_LOG(critical, "FIXME replicateLevel " << f);
if (f.isSet(QPID_REPLICATE)) return
replicateLevel(f.getAsString(QPID_REPLICATE));
else return RL_NONE;
}
ReplicateLevel replicateLevel(const Variant::Map& m) {
- QPID_LOG(critical, "FIXME replicateLevel " << m);
Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
if (i != m.end()) return replicateLevel(i->second.asString());
else return RL_NONE;
@@ -234,6 +230,7 @@ void WiringReplicator::route(Deliverable
Variant::Map& map = list.front().asMap();
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
+ QPID_LOG(trace, "HA: Configuration event from primary: " <<
values);
if (match<EventQueueDeclare>(schema))
doEventQueueDeclare(values);
else if (match<EventQueueDelete>(schema))
doEventQueueDelete(values);
else if (match<EventExchangeDeclare>(schema))
doEventExchangeDeclare(values);
@@ -249,29 +246,30 @@ void WiringReplicator::route(Deliverable
Variant::Map& values = i->asMap()[VALUES].asMap();
framing::FieldTable args;
amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
+ QPID_LOG(trace, "HA: Configuration response from primary: " <<
values);
if (type == QUEUE) doResponseQueue(values);
else if (type == EXCHANGE) doResponseExchange(values);
else if (type == BINDING) doResponseBind(values);
- else throw Exception(QPID_MSG("Ignoring unexpected class: " <<
type));
+ else throw Exception(QPID_MSG("HA: Unexpected response type: "
<< type));
}
} else {
- QPID_LOG(warning, QPID_MSG("Replicator: Ignoring QMFv2 message
with headers: " << *headers));
+ QPID_LOG(warning, QPID_MSG("HA: Expecting remote configuration
message, got: " << *headers));
}
} catch (const std::exception& e) {
- QPID_LOG(warning, "Replicator: Error replicating configuration: " <<
e.what());
- QPID_LOG(debug, "Replicator: Error processing: " << list);
+ QPID_LOG(warning, "HA: Error replicating configuration: " << e.what());
+ QPID_LOG(debug, "HA: Error processing configuration message: " <<
list);
}
}
void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
- QPID_LOG(critical, "FIXME doEventQueueDeclare " << values);
string name = values[QNAME].asString();
Variant::Map argsMap = values[ARGS].asMap();
if (values[DISP] == CREATED && replicateLevel(argsMap)) {
- QPID_LOG(debug, "HA: Creating replicated queue " << name);
- framing::FieldTable args;
+ framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
- std::pair<boost::shared_ptr<Queue>, bool> result =
+
+ QPID_LOG(debug, "HA: Creating queue from event " << name);
+ std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
name,
values[DURABLE].asBool(),
@@ -288,7 +286,7 @@ void WiringReplicator::doEventQueueDecla
// out of date.
startQueueReplicator(result.first);
} else {
- QPID_LOG(warning, "Replicated queue " << name << " already
exists");
+ QPID_LOG(warning, "HA: Replicated queue " << name << " already
exists");
}
}
}
@@ -297,7 +295,7 @@ void WiringReplicator::doEventQueueDelet
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
if (queue && replicateLevel(queue->getSettings())) {
- QPID_LOG(debug, "Deleting replicated queue " << name);
+ QPID_LOG(debug, "HA: Deleting queue from event: " << name);
broker.deleteQueue(
name,
values[USER].asString(),
@@ -311,6 +309,7 @@ void WiringReplicator::doEventExchangeDe
string name = values[EXNAME].asString();
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
+ QPID_LOG(debug, "HA: Creating exchange from event " << name);
if (!broker.createExchange(
name,
values[EXTYPE].asString(),
@@ -331,7 +330,7 @@ void WiringReplicator::doEventExchangeDe
try {
boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
if (exchange && replicateLevel(exchange->getArgs())) {
- QPID_LOG(debug, "Deleting replicated exchange " << name);
+ QPID_LOG(debug, "HA: Deleting exchange:" << name);
broker.deleteExchange(
name,
values[USER].asString(),
@@ -358,10 +357,8 @@ void WiringReplicator::doEventBind(Varia
}
void WiringReplicator::doResponseQueue(Variant::Map& values) {
- QPID_LOG(critical, "FIXME doResponseQueue " << values);
// FIXME aconway 2011-11-22: more flexible ways & defaults to indicate
replication
Variant::Map argsMap(values[ARGUMENTS].asMap());
- QPID_LOG(critical, "FIXME doResponseQueue replevel " <<
replicateLevel(argsMap));
if (!replicateLevel(argsMap)) return;
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
@@ -429,10 +426,8 @@ void WiringReplicator::doResponseBind(Va
try {
std::string exName = getRefName(EXCHANGE_REF_PREFIX,
values[EXCHANGE_REF]);
std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
- QPID_LOG(critical, "FIXME doResponseBind " << qName << " to " <<
exName);
boost::shared_ptr<Exchange> exchange =
broker.getExchanges().get(exName);
boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
- QPID_LOG(critical, "FIXME doResponseBind ptrs " << queue.get() << " to
" << exchange.get());
// FIXME aconway 2011-11-24: more flexible configuration for binding
replication.
// Automatically replicate exchange if queue and exchange are
replicated
@@ -451,9 +446,7 @@ void WiringReplicator::doResponseBind(Va
}
void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>&
queue) {
- QPID_LOG(critical, "FIXME startQueueReplicator " << queue->getName() << "
" << queue->getSettings());
if (replicateLevel(queue->getSettings()) == RL_ALL) {
- QPID_LOG(critical, "FIXME startQueueReplicator starting");
boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue,
link));
broker.getExchanges().registerExchange(qr);
}
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1233649&r1=1233648&r2=1233649&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Thu Jan 19
23:04:14 2012
@@ -43,9 +43,6 @@ class ShortTests(BrokerTest):
cmd="qpid-route route add %s %s qpid.node-cloner x"%(backup, primary)
self.assertEqual(0, os.system(cmd))
- def setup_replication(self, primary, backup, queue):
- self.assertEqual(0,os.system("qpid-route --ack 1 queue add %s %s
qpid.replicator-%s %s"%(backup, primary, queue, queue)))
-
# FIXME aconway 2011-11-15: work around async replication.
def wait(self, session, address):
def check():
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]