Author: aconway
Date: Tue Oct 2 21:46:50 2012
New Revision: 1393201
URL: http://svn.apache.org/viewvc?rev=1393201&view=rev
Log:
QPID-4285: HA backups continuously disconnect / re-sync after attempting to
replicate a deleted queue
Fixes queues getting into a partially deleted state: previously when a broker
was
promoted, it did not clean up it's QueueReplicators. The QueueReplicators keep a
shared_ptr to the Queue so this kept Queues in memory after they were
destroyed. It also
kept them in QMF, since the management object is unregistered in the destructor.
This patch cleans up properly on promotion.
Modified:
qpid/trunk/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h
qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h?rev=1393201&r1=1393200&r2=1393201&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h Tue Oct 2
21:46:50 2012
@@ -45,7 +45,7 @@ class AlternateExchangeSetter
/** If altEx is already known, call setter(altEx) now else save for later
*/
void setAlternate(const std::string& altEx, const SetFunction& setter) {
- broker::Exchange::shared_ptr ex = exchanges.find(altEx);
+ boost::shared_ptr<broker::Exchange> ex = exchanges.find(altEx);
if (ex) setter(ex); // Set immediately.
else setters.insert(Setters::value_type(altEx, setter)); // Save for
later.
}
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1393201&r1=1393200&r2=1393201&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp Tue Oct 2 21:46:50 2012
@@ -75,8 +75,13 @@ void Backup::initialize(const Url& broke
}
Backup::~Backup() {
+ QPID_LOG(debug, logPrefix << "Backup shutting down.");
if (link) link->close();
- if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
+ if (replicator.get()) {
+ broker.getExchanges().destroy(replicator->getName());
+ replicator->shutdown();
+ replicator.reset();
+ }
}
// Called via management.
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1393201&r1=1393200&r2=1393201&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Tue Oct 2 21:46:50
2012
@@ -68,7 +68,7 @@ using namespace broker;
namespace {
-const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator");
+const string QPID_CONFIGURATION_REPLICATOR("qpid.broker-replicator");
const string CLASS_NAME("_class_name");
const string EVENT("_event");
@@ -208,7 +208,12 @@ void BrokerReplicator::initialize() {
);
}
-BrokerReplicator::~BrokerReplicator() { }
+BrokerReplicator::~BrokerReplicator() { shutdown(); }
+
+void BrokerReplicator::shutdown() {
+ QPID_LOG(debug, logPrefix << "BrokerReplicator shutting down.");
+ broker.getQueues().eachQueue(boost::bind(&BrokerReplicator::deactivate,
this, _1));
+}
// This is called in the connection IO thread when the bridge is started.
void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler&
sessionHandler) {
@@ -591,7 +596,7 @@ void BrokerReplicator::startQueueReplica
}
}
-void BrokerReplicator::deleteQueue(const std::string& name) {
+void BrokerReplicator::deactivateQueue(const std::string& name) {
boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
if (qr) {
qr->deactivate();
@@ -599,7 +604,14 @@ void BrokerReplicator::deleteQueue(const
// actually be destroyed.
broker.getExchanges().destroy(qr->getName());
}
- qr.reset();
+}
+
+void BrokerReplicator::deactivate(boost::shared_ptr<broker::Queue> q) {
+ deactivateQueue(q->getName());
+}
+
+void BrokerReplicator::deleteQueue(const std::string& name) {
+ deactivateQueue(name);
try {
broker.deleteQueue(name, userId, remoteHost);
QPID_LOG(debug, logPrefix << "Queue deleted: " << name);
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1393201&r1=1393200&r2=1393201&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Tue Oct 2 21:46:50 2012
@@ -76,6 +76,7 @@ class BrokerReplicator : public broker::
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const
framing::FieldTable*);
void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const,
const framing::FieldTable* const);
+ void shutdown();
private:
typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
@@ -141,6 +142,8 @@ class BrokerReplicator : public broker::
const qpid::framing::FieldTable& args,
const std::string& alternateExchange);
+ void deactivateQueue(const std::string& name);
+ void deactivate(boost::shared_ptr<broker::Queue> q);
void deleteQueue(const std::string& name);
void deleteExchange(const std::string& name);
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1393201&r1=1393200&r2=1393201&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Tue Oct 2 21:46:50 2012
@@ -44,6 +44,7 @@ namespace qpid {
namespace ha {
using namespace broker;
using namespace framing;
+using namespace std;
const std::string QPID_HA_EVENT_PREFIX("qpid.ha-");
const std::string
QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue");
@@ -124,13 +125,18 @@ void QueueReplicator::initializeBridge(B
SequenceNumber front, back;
queue->getRange(front, back, broker::REPLICATOR);
if (front <= back) settings.setInt(ReplicatingSubscription::QPID_FRONT,
front);
- peer.getMessage().subscribe(
- args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
- false/*exclusive*/, "", 0, settings);
- // FIXME aconway 2012-05-22: use a finite credit window?
- peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
- peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
-
+ try {
+ peer.getMessage().subscribe(
+ args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
+ false/*exclusive*/, "", 0, settings);
+ // FIXME aconway 2012-05-22: use a finite credit window?
+ peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
+ peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
+ }
+ catch(const exception& e) {
+ QPID_LOG(error, QPID_MSG(logPrefix + "Cannot connect to primary: " <<
e.what()));
+ throw;
+ }
qpid::Address primary;
link->getRemoteAddress(primary);
QPID_LOG(info, logPrefix << "Connected to " << primary << "(" <<
bridgeName << ")");
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1393201&r1=1393200&r2=1393201&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Tue Oct 2 21:46:50 2012
@@ -648,6 +648,24 @@ acl deny all all
self.assertRaises(NotFound, s.receiver, ("e1"));
+ def test_auto_delete_qpid_4285(self):
+ """Regression test for QPID-4285: an auto delete queue gets stuck in
+ a partially deleted state and causes replication errors."""
+ cluster = HaCluster(self,2)
+ cluster[1].wait_status("ready")
+ s = cluster[0].connect().session()
+ s.receiver("q;{create:always}")
+ cluster[1].wait_backup("q")
+ cluster.kill(0) # Make the backup take over.
+ s = cluster[1].connect().session()
+ s.receiver("q;{delete:always}").close() # Delete q on new primary
+ try:
+ s.receiver("q")
+ self.fail("Expected NotFound exception") # Should not be avaliable
+ except NotFound: pass
+ assert not cluster[1].agent().getQueue("q") # Should not be in QMF
+
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given
fairshare limit
@@ -660,7 +678,7 @@ def fairshare(msgs, limit, levels):
msgs = postponed
count = 0
last_priority = None
- postponed = []
+ postponed = [ ]
msg = msgs.pop(0)
if last_priority and priority_level(msg.priority, levels) ==
last_priority:
count += 1
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]