Author: aconway
Date: Thu Jan 19 23:04:33 2012
New Revision: 1233651
URL: http://svn.apache.org/viewvc?rev=1233651&view=rev
Log:
QPID-3603: Integrate ReplicatingSubscription into the HA code.
HaBroker registers the ConsumerFactory, QueueReplicator sets
appropriate arguments in consume command.
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/management-schema.xml
qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1233651&r1=1233650&r2=1233651&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu
Jan 19 23:04:33 2012
@@ -118,7 +118,7 @@ void SemanticState::consume(const string
const ConsumerFactories::Factories& cf(
session.getBroker().getConsumerFactories().get());
ConsumerImpl::shared_ptr c;
- for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i !=
cf.end(); !c)
+ for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i !=
cf.end() && !c; ++i)
c = (*i)->create(this, name, queue, ackRequired, acquire, exclusive,
tag,
resumeId, resumeTtl, arguments);
if (!c) // Create plain consumer
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1233651&r1=1233650&r2=1233651&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp Thu Jan 19
23:04:33 2012
@@ -43,8 +43,8 @@ using types::Variant;
using std::string;
Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
- // FIXME aconway 2011-11-24: identifying the primary. Only has 1 address.
- if (s.brokerUrl != "dummy") { // FIXME aconway 2011-11-22: temporary hack
to identify primary.
+ // FIXME aconway 2011-11-24: identifying the primary.
+ if (s.brokerUrl != "primary") { // FIXME aconway 2011-11-22: temporary
hack to identify primary.
Url url(s.brokerUrl);
QPID_LOG(info, "HA: Acting as backup to " << url);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
@@ -59,12 +59,6 @@ Backup::Backup(broker::Broker& b, const
link = result.first;
boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link));
broker.getExchanges().registerExchange(wr);
-
- // FIXME aconway 2011-11-25: using ReplicatingSubscription hangs the
tests
- // The tests pass with a plain subscription if we dont add the factory.
-// broker.getConsumerFactories().add(
-// boost::shared_ptr<ReplicatingSubscription::Factory>(
-// new ReplicatingSubscription::Factory()));
}
}
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h?rev=1233651&r1=1233650&r2=1233651&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h Thu Jan 19 23:04:33
2012
@@ -37,6 +37,9 @@ class Settings;
/**
* State associated with a backup broker. Manages connections to primary.
+ *
+ * THREAD SAFE: trivially because currently it only has a constructor.
+ * May need locking as the functionality grows.
*/
class Backup
{
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1233651&r1=1233650&r2=1233651&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp Thu Jan 19
23:04:33 2012
@@ -21,6 +21,7 @@
#include "Backup.h"
#include "HaBroker.h"
#include "Settings.h"
+#include "ReplicatingSubscription.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
#include "qpid/management/ManagementAgent.h"
@@ -61,6 +62,10 @@ HaBroker::HaBroker(broker::Broker& b, co
QPID_LOG(notice, "HA: broker initialized, client-url=" << clientUrl
<< ", broker-url=" << brokerUrl);
backup.reset(new Backup(broker, s));
+ // Register a factory for replicating subscriptions.
+ broker.getConsumerFactories().add(
+ boost::shared_ptr<ReplicatingSubscription::Factory>(
+ new ReplicatingSubscription::Factory()));
}
HaBroker::~HaBroker() {}
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1233651&r1=1233650&r2=1233651&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Jan
19 23:04:33 2012
@@ -20,6 +20,7 @@
*/
#include "QueueReplicator.h"
+#include "ReplicatingSubscription.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
@@ -63,14 +64,25 @@ QueueReplicator::QueueReplicator(boost::
QueueReplicator::~QueueReplicator() {}
+// NB: This is called back ina broker connection thread when the
+// bridge is created.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler&
sessionHandler) {
+ // No lock needed, no mutable member variables are used.
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge&
args(bridge.getArgs());
- peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1,
0, false, "", 0, framing::FieldTable());
+ framing::FieldTable settings;
+ // FIXME aconway 2011-11-28: string constants.
+ settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
+ // FIXME aconway 2011-11-28: inconsistent use of _ vs. -
+ settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER,
queue->getPosition());
+ qpid::framing::SequenceNumber oldest;
+ if (queue->getOldest(oldest))
+ settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER,
oldest);
+
+ peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1,
0, false, "", 0, settings);
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " <<
args.i_dest);
-
}
@@ -117,39 +129,13 @@ void QueueReplicator::route(Deliverable&
}
}
-bool QueueReplicator::isReplicatingLink(const std::string& name)
-{
- return name.find(REPLICATOR) == 0;
-}
-
-bool QueueReplicator::initReplicationSettings(const std::string& target,
QueueRegistry& queues, qpid::framing::FieldTable& settings)
-{
- if (isReplicatingLink(target)) {
- std::string queueName = target.substr(REPLICATOR.size());
- boost::shared_ptr<Queue> queue = queues.find(queueName);
- if (queue) {
- settings.setInt("qpid.replicating-subscription", 1);
- settings.setInt("qpid.high_sequence_number", queue->getPosition());
- qpid::framing::SequenceNumber oldest;
- if (queue->getOldest(oldest)) {
- settings.setInt("qpid.low_sequence_number", oldest);
- }
- }
- return true;
- } else {
- return false;
- }
-}
-
bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const
qpid::framing::FieldTable*) { return false; }
bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&,
const qpid::framing::FieldTable*) { return false; }
bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string*
const, const qpid::framing::FieldTable* const) { return false; }
-const std::string QueueReplicator::typeName("queue-replicator");
+// FIXME aconway 2011-11-28: rationalise string constants.
+static const std::string TYPE_NAME("qpid.queue-replicator");
-std::string QueueReplicator::getType() const
-{
- return typeName;
-}
+std::string QueueReplicator::getType() const { return TYPE_NAME; }
}} // namespace qpid::broker
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1233651&r1=1233650&r2=1233651&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h Thu Jan 19
23:04:33 2012
@@ -38,7 +38,13 @@ class Deliverable;
namespace ha {
/**
- * Dummy exchange for processing replication messages
+ * Exchange created on a backup broker to replicate a queue on the primary.
+ *
+ * Puts replicated messages on the local queue, handles dequeue events.
+ * Creates a ReplicatingSubscription on the primary by passing special
+ * arguments to the consume command.
+ *
+ * THREAD SAFE.
*/
class QueueReplicator : public broker::Exchange
{
@@ -50,12 +56,11 @@ class QueueReplicator : public broker::E
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const
framing::FieldTable*);
void route(broker::Deliverable&, const std::string&, const
framing::FieldTable*);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const,
const framing::FieldTable* const);
- static bool isReplicatingLink(const std::string&);
- static bool initReplicationSettings(const std::string&,
broker::QueueRegistry&, framing::FieldTable&);
- static const std::string typeName;
+
private:
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler&
sessionHandler);
+ sys::Mutex lock;
boost::shared_ptr<broker::Queue> queue;
boost::shared_ptr<broker::Link> link;
framing::SequenceNumber current;
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1233651&r1=1233650&r2=1233651&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
Thu Jan 19 23:04:33 2012
@@ -30,9 +30,16 @@ namespace ha {
using namespace framing;
using namespace broker;
+using namespace std;
-const std::string DOLLAR("$");
-const std::string INTERNAL("_internal");
+// FIXME aconway 2011-11-28: review all arugment names, prefixes etc.
+// Do we want a common HA prefix?
+const string
ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
+const string
ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high_sequence_number");
+const string
ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low_sequence_number");
+
+const string DOLLAR("$");
+const string INTERNAL("_internal");
class ReplicationStateInitialiser
{
@@ -61,7 +68,7 @@ class ReplicationStateInitialiser
const qpid::framing::SequenceNumber end;
};
-std::string mask(const std::string& in)
+string mask(const string& in)
{
return DOLLAR + in + INTERNAL;
}
@@ -69,29 +76,30 @@ std::string mask(const std::string& in)
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
ReplicatingSubscription::Factory::create(
SemanticState* _parent,
- const std::string& _name,
+ const string& _name,
Queue::shared_ptr _queue,
bool ack,
bool _acquire,
bool _exclusive,
- const std::string& _tag,
- const std::string& _resumeId,
+ const string& _tag,
+ const string& _resumeId,
uint64_t _resumeTtl,
const framing::FieldTable& _arguments
) {
+
return boost::shared_ptr<broker::SemanticState::ConsumerImpl>(
new ReplicatingSubscription(_parent, _name, _queue, ack, _acquire,
_exclusive, _tag, _resumeId, _resumeTtl, _arguments));
}
ReplicatingSubscription::ReplicatingSubscription(
SemanticState* _parent,
- const std::string& _name,
+ const string& _name,
Queue::shared_ptr _queue,
bool ack,
bool _acquire,
bool _exclusive,
- const std::string& _tag,
- const std::string& _resumeId,
+ const string& _tag,
+ const string& _resumeId,
uint64_t _resumeTtl,
const framing::FieldTable& _arguments
) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag,
_resumeId, _resumeTtl, _arguments),
@@ -158,7 +166,7 @@ void ReplicatingSubscription::enqueued(c
void ReplicatingSubscription::generateDequeueEvent()
{
- std::string buf(range.encodedSize(),'\0');
+ string buf(range.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
range.encode(buffer);
range.clear();
@@ -166,7 +174,7 @@ void ReplicatingSubscription::generateDe
//generate event message
boost::intrusive_ptr<Message> event = new Message();
- AMQFrame method((MessageTransferBody(ProtocolVersion(), std::string(), 0,
0)));
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
AMQFrame header((AMQHeaderBody()));
AMQFrame content((AMQContentBody()));
content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize());
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1233651&r1=1233650&r2=1233651&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
Thu Jan 19 23:04:33 2012
@@ -38,7 +38,12 @@ class OwnershipToken;
namespace ha {
/**
- * Subscriber to a remote queue that replicates to a local queue.
+ * A susbcription that represents a backup replicating a queue.
+ *
+ * Runs on the primary. Delays completion of messages till the backup
+ * has acknowledged, informs backup of locally dequeued messages.
+ *
+ * THREAD UNSAFE: used only in broker connection thread.
*/
class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
public broker::QueueObserver
@@ -53,6 +58,11 @@ class ReplicatingSubscription : public b
const framing::FieldTable& arguments);
};
+ // Argument names for consume command.
+ static const std::string QPID_REPLICATING_SUBSCRIPTION;
+ static const std::string QPID_HIGH_SEQUENCE_NUMBER;
+ static const std::string QPID_LOW_SEQUENCE_NUMBER;
+
ReplicatingSubscription(broker::SemanticState* parent,
const std::string& name,
boost::shared_ptr<broker::Queue> ,
bool ack, bool acquire, bool exclusive, const
std::string& tag,
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1233651&r1=1233650&r2=1233651&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Thu Jan
19 23:04:33 2012
@@ -446,6 +446,7 @@ void WiringReplicator::doResponseBind(Va
}
void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>&
queue) {
+ // FIXME aconway 2011-11-28: also need to remove these when queue is
destroyed.
if (replicateLevel(queue->getSettings()) == RL_ALL) {
boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue,
link));
broker.getExchanges().registerExchange(qr);
@@ -456,11 +457,6 @@ bool WiringReplicator::bind(boost::share
bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const string&, const
framing::FieldTable*) { return false; }
bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const string* const,
const framing::FieldTable* const) { return false; }
-const string WiringReplicator::typeName(QPID_WIRING_REPLICATOR);
-
-string WiringReplicator::getType() const
-{
- return typeName;
-}
+string WiringReplicator::getType() const { return QPID_WIRING_REPLICATOR; }
}} // namespace broker
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.h?rev=1233651&r1=1233650&r2=1233651&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.h Thu Jan
19 23:04:33 2012
@@ -38,8 +38,15 @@ class SessionHandler;
namespace ha {
/**
- * Pseudo-exchange for recreating local queues and/or exchanges on
- * receipt of QMF events indicating their creation on another node
+ * Replicate wiring on a backup broker.
+ *
+ * Implemented as an exchange that subscribes to receive QMF
+ * configuration events from the primary. It configures local queues
+ * exchanges and bindings to replicate the primary.
+ * It also creates QueueReplicators for newly replicated queues.
+ *
+ * THREAD SAFE: Has no mutable state.
+ *
*/
class WiringReplicator : public broker::Exchange
{
@@ -54,8 +61,6 @@ class WiringReplicator : public broker::
void route(broker::Deliverable&, const std::string&, const
framing::FieldTable*);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const,
const framing::FieldTable* const);
- static const std::string typeName;
-
private:
void initializeBridge(broker::Bridge&, broker::SessionHandler&);
void doEventQueueDeclare(types::Variant::Map& values);
@@ -66,8 +71,6 @@ class WiringReplicator : public broker::
void doResponseQueue(types::Variant::Map& values);
void doResponseExchange(types::Variant::Map& values);
void doResponseBind(types::Variant::Map& values);
-
- private:
void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
broker::Broker& broker;
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/management-schema.xml
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/management-schema.xml?rev=1233651&r1=1233650&r2=1233651&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/management-schema.xml
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/management-schema.xml Thu
Jan 19 23:04:33 2012
@@ -21,7 +21,7 @@
<!-- Monitor and control HA status of a broker. -->
<class name="HaBroker">
- <property name="status" type="sstr" desc="HA statu: PRIMARY, BACKUP,
SOLO"/>
+ <property name="status" type="sstr" desc="HA status: PRIMARY, BACKUP,
SOLO"/>
<method name="setStatus" desc="Set HA status: PRIMARY, BACKUP, SOLO">
<arg name="status" type="sstr" dir="I"/>
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1233651&r1=1233650&r2=1233651&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Thu Jan 19
23:04:33 2012
@@ -89,7 +89,7 @@ class ShortTests(BrokerTest):
self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
# Create config, send messages before starting the backup, to test
catch-up replication.
- primary = self.ha_broker(name="primary")
+ primary = self.ha_broker(name="primary", broker_url="primary") # Temp
hack to identify primary
p = primary.connect().session()
setup(p, "1")
# Start the backup
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]