Author: aconway
Date: Tue Oct 2 18:41:41 2012
New Revision: 1393089
URL: http://svn.apache.org/viewvc?rev=1393089&view=rev
Log:
QPID-4285: HA backups continuously disconnect / re-sync after attempting to
replicate a deleted queue. (Based on patch by Jason Dillama)
This does not directly tackle the origin of the problem but extends Jasons's
patch since
it addresses something we had to fix anyway: "leaking" queues and exchanges. It
does 2 things.
1. enabled hideDeletedError on all subscription objects used by HA
This suppress the troublesome exception with a harmless no-op
2. Delete queues/exchanges missing from responses (based on Jasons patch)
Fix the "leak" of queues and exchanges possible when an object replicated
to a backup is deleted from the newn primary before the backup connects.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
qpid/trunk/qpid/cpp/src/tests/ha_test.py
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1393089&r1=1393088&r2=1393089&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Oct 2 18:41:41 2012
@@ -370,7 +370,7 @@ bool Queue::acquire(const QueueCursor& p
bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
{
- checkNotDeleted(c);
+ if (!checkNotDeleted(c)) return false;
QueueListeners::NotificationSet set;
while (true) {
//TODO: reduce lock scope
@@ -1443,11 +1443,11 @@ QueueListeners& Queue::getListeners() {
Messages& Queue::getMessages() { return *messages; }
const Messages& Queue::getMessages() const { return *messages; }
-void Queue::checkNotDeleted(const Consumer::shared_ptr& c)
+bool Queue::checkNotDeleted(const Consumer::shared_ptr& c)
{
- if (deleted && !c->hideDeletedError()) {
+ if (deleted && !c->hideDeletedError())
throw ResourceDeletedException(QPID_MSG("Queue " << getName() << " has
been deleted."));
- }
+ return !deleted;
}
void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1393089&r1=1393088&r2=1393089&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Oct 2 18:41:41 2012
@@ -188,7 +188,7 @@ class Queue : public boost::enable_share
int getEventMode();
void dequeueFromStore(boost::intrusive_ptr<PersistableMessage>);
void abandoned(const Message& message);
- void checkNotDeleted(const Consumer::shared_ptr&);
+ bool checkNotDeleted(const Consumer::shared_ptr&);
void notifyDeleted();
uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor,
SubscriptionType);
virtual bool checkDepth(const QueueDepth& increment, const Message&);
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1393089&r1=1393088&r2=1393089&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Tue Oct 2 18:41:41
2012
@@ -33,6 +33,7 @@
#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qmf/org/apache/qpid/broker/EventBind.h"
#include "qmf/org/apache/qpid/broker/EventUnbind.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
@@ -58,8 +59,9 @@ using qmf::org::apache::qpid::broker::Ev
using qmf::org::apache::qpid::broker::EventQueueDelete;
using qmf::org::apache::qpid::broker::EventSubscribe;
using qmf::org::apache::qpid::ha::EventMembersUpdate;
+using qpid::broker::amqp_0_10::MessageTransfer;
using namespace framing;
-using std::string;
+using namespace std;
using std::ostream;
using types::Variant;
using namespace broker;
@@ -177,8 +179,11 @@ BrokerReplicator::BrokerReplicator(HaBro
logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
haBroker(hb), broker(hb.getBroker()), link(l),
initialized(false),
- alternates(hb.getBroker().getExchanges())
-{}
+ alternates(hb.getBroker().getExchanges()),
+ cleaner(*this)
+{
+ getArgs().setString(QPID_REPLICATE, printable(NONE).str());
+}
void BrokerReplicator::initialize() {
// Can't do this in the constructor because we need a shared_ptr to this.
@@ -223,6 +228,10 @@ void BrokerReplicator::initializeBridge(
<< " status:" << printable(haBroker.getStatus()));
initialized = true;
+ // Scan for existing replicated queues and exchanges. Any that have not
been seen by
+ // the time all reponses are received will be cleaned up.
+ cleaner.start();
+
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge&
args(bridge.getArgs());
@@ -254,7 +263,7 @@ void BrokerReplicator::route(Deliverable
}
Variant::List list;
try {
- if
(!qpid::broker::amqp_0_10::MessageTransfer::isQMFv2(msg.getMessage()))
+ if (!MessageTransfer::isQMFv2(msg.getMessage()))
throw Exception("Unexpected message, not QMF2 event or query
response.");
// decode as list
string content = msg.getMessage().getContent();
@@ -287,13 +296,18 @@ void BrokerReplicator::route(Deliverable
else if (type == BINDING) doResponseBind(values);
else if (type == HA_BROKER) doResponseHaBroker(values);
}
- if
(qpid::broker::amqp_0_10::MessageTransfer::isLastQMFResponse(msg.getMessage(),
EXCHANGE)) {
- // We have received all of the exchange response.
+ if (MessageTransfer::isLastQMFResponse(msg.getMessage(),
EXCHANGE)) {
+ QPID_LOG(debug, logPrefix << "Initial exchange configuration
complete.");
+ cleaner.cleanExchanges(); // Clean up exchanges that no longer
exist in the primary
alternates.clear();
}
+ if (MessageTransfer::isLastQMFResponse(msg.getMessage(), QUEUE)) {
+ QPID_LOG(debug, logPrefix << "Initial queue configuration
complete.");
+ cleaner.cleanQueues(); // Clean up queues that no longer exist
in the primary
+ }
}
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what()
+ QPID_LOG(critical, logPrefix << "Configuration replication failed: "
<< e.what()
<< ": while handling: " << list);
haBroker.shutdown();
throw;
@@ -308,14 +322,15 @@ void BrokerReplicator::doEventQueueDecla
if (values[DISP] == CREATED && replicationTest.isReplicated(CONFIGURATION,
argsMap, autoDel, excl)) {
string name = values[QNAME].asString();
QueueSettings settings(values[DURABLE].asBool(),
values[AUTODEL].asBool());
+ QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
+ cleaner.forgetQueue(name);
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
// If we already have a queue with this name, replace it.
// The queue was definitely created on the primary.
if (broker.getQueues().find(name)) {
QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " <<
name);
- broker.deleteQueue(name, userId, remoteHost);
- stopQueueReplicator(name);
+ deleteQueue(name);
}
settings.populate(args, settings.storeSettings);
CreateQueueResult result =
@@ -346,8 +361,8 @@ void BrokerReplicator::doEventQueueDelet
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
if (queue &&
replicationTest.replicateLevel(queue->getSettings().storeSettings)) {
QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
- stopQueueReplicator(name);
- broker.deleteQueue(name, userId, remoteHost);
+ cleaner.forgetQueue(name);
+ deleteQueue(name);
}
}
@@ -357,13 +372,14 @@ void BrokerReplicator::doEventExchangeDe
if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
string name = values[EXNAME].asString();
QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
+ cleaner.forgetExchange(name);
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
// If we already have a exchange with this name, replace it.
// The exchange was definitely created on the primary.
if (broker.getExchanges().find(name)) {
- broker.deleteExchange(name, userId, remoteHost);
- QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " <<
name);
+ deleteExchange(name);
+ QPID_LOG(warning, logPrefix << "Replaced existing exchange: " <<
name);
}
CreateExchangeResult result = createExchange(
name, values[EXTYPE].asString(), values[DURABLE].asBool(), args,
@@ -377,12 +393,13 @@ void BrokerReplicator::doEventExchangeDe
string name = values[EXNAME].asString();
boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
if (!exchange) {
- QPID_LOG(warning, logPrefix << "Exchange delete event, does not exist:
" << name);
+ QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " <<
name);
} else if (!replicationTest.replicateLevel(exchange->getArgs())) {
QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated:
" << name);
} else {
QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
- broker.deleteExchange(name, userId, remoteHost);
+ cleaner.forgetExchange(name);
+ deleteExchange(name);
replicatedExchanges.erase(name);
}
}
@@ -457,6 +474,7 @@ void BrokerReplicator::doResponseQueue(V
values[EXCLUSIVE].asBool()))
return;
string name(values[NAME].asString());
+ cleaner.forgetQueue(name);
QPID_LOG(debug, logPrefix << "Queue response: " << name);
if (broker.getQueues().find(name)) { // Already exists
if (findQueueReplicator(name))
@@ -478,6 +496,7 @@ void BrokerReplicator::doResponseExchang
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicationTest.replicateLevel(argsMap)) return;
string name = values[NAME].asString();
+ cleaner.forgetExchange(name);
QPID_LOG(debug, logPrefix << "Exchange response: " << name);
if (broker.getExchanges().find(name)) {
if (replicatedExchanges.find(name) != replicatedExchanges.end())
@@ -572,7 +591,7 @@ void BrokerReplicator::startQueueReplica
}
}
-void BrokerReplicator::stopQueueReplicator(const std::string& name) {
+void BrokerReplicator::deleteQueue(const std::string& name) {
boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
if (qr) {
qr->deactivate();
@@ -580,6 +599,22 @@ void BrokerReplicator::stopQueueReplicat
// actually be destroyed.
broker.getExchanges().destroy(qr->getName());
}
+ qr.reset();
+ try {
+ broker.deleteQueue(name, userId, remoteHost);
+ QPID_LOG(debug, logPrefix << "Queue deleted: " << name);
+ } catch (const framing::NotFoundException&) {
+ QPID_LOG(debug, logPrefix << "Queue not found for deletion: " << name);
+ }
+}
+
+void BrokerReplicator::deleteExchange(const std::string& name) {
+ try {
+ broker.deleteExchange(name, userId, remoteHost);
+ QPID_LOG(debug, logPrefix << "Exchange deleted: " << name);
+ } catch (const framing::NotFoundException&) {
+ QPID_LOG(debug, logPrefix << "Exchange not found for deletion: " <<
name);
+ }
}
BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue(
@@ -639,4 +674,45 @@ bool BrokerReplicator::isBound(boost::sh
string BrokerReplicator::getType() const { return
QPID_CONFIGURATION_REPLICATOR; }
+// BrokerReplicator::Cleaner
+
+BrokerReplicator::Cleaner::Cleaner(BrokerReplicator& br) :
brokerReplicator(br) {}
+
+void BrokerReplicator::Cleaner::start() {
+ queues.clear();
+ exchanges.clear();
+
brokerReplicator.broker.getExchanges().eachExchange(boost::bind(&Cleaner::addExchange,
this, _1));
+
brokerReplicator.broker.getQueues().eachQueue(boost::bind(&Cleaner::addQueue,
this, _1));
+}
+
+void
BrokerReplicator::Cleaner::addExchange(boost::shared_ptr<broker::Exchange> ex) {
+ if (brokerReplicator.replicationTest.isReplicated(CONFIGURATION, *ex))
+ exchanges.insert(ex->getName());
+}
+
+void BrokerReplicator::Cleaner::addQueue(boost::shared_ptr<broker::Queue> q) {
+ if (brokerReplicator.replicationTest.isReplicated(CONFIGURATION, *q))
+ queues.insert(q->getName());
+}
+
+void BrokerReplicator::Cleaner::forgetExchange(const std::string& name) {
+ exchanges.erase(name);
+}
+
+void BrokerReplicator::Cleaner::forgetQueue(const std::string& name) {
+ queues.erase(name);
+}
+
+void BrokerReplicator::Cleaner::cleanExchanges() {
+ for_each(exchanges.begin(), exchanges.end(),
+ boost::bind(&BrokerReplicator::deleteExchange, &brokerReplicator,
_1));
+ exchanges.clear();
+}
+
+void BrokerReplicator::Cleaner::cleanQueues() {
+ for_each(queues.begin(), queues.end(),
+ boost::bind(&BrokerReplicator::deleteQueue, &brokerReplicator,
_1));
+ queues.clear();
+}
+
}} // namespace broker
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1393089&r1=1393088&r2=1393089&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Tue Oct 2 18:41:41 2012
@@ -31,6 +31,7 @@
#include "qpid/management/ManagementObject.h"
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
+#include <set>
namespace qpid {
@@ -81,6 +82,33 @@ class BrokerReplicator : public broker::
typedef std::pair<boost::shared_ptr<broker::Queue>, bool>
CreateQueueResult;
typedef std::pair<boost::shared_ptr<broker::Exchange>, bool>
CreateExchangeResult;
+ /** Keep track of queues and exchanges that need to be cleaned up. */
+ class Cleaner {
+ public:
+ Cleaner(BrokerReplicator&);
+
+ /** Scan for existing queues and exchanges. */
+ void start();
+
+ // Forget a queue/exchange that does not need cleaning
+ void forgetExchange(const std::string& name);
+ void forgetQueue(const std::string& name);
+
+ void cleanExchanges();
+ void cleanQueues();
+
+ private:
+ typedef std::set<std::string> Names;
+
+ // add a queue/exchange that may need cleaning.
+ void addExchange(boost::shared_ptr<broker::Exchange>);
+ void addQueue(boost::shared_ptr<broker::Queue>);
+
+ BrokerReplicator& brokerReplicator;
+ Names queues, exchanges;
+ };
+ friend class Cleaner;
+
void initializeBridge(broker::Bridge&, broker::SessionHandler&);
void doEventQueueDeclare(types::Variant::Map& values);
@@ -98,7 +126,6 @@ class BrokerReplicator : public broker::
QueueReplicatorPtr findQueueReplicator(const std::string& qname);
void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
- void stopQueueReplicator(const std::string& name);
CreateQueueResult createQueue(
const std::string& name,
@@ -114,6 +141,9 @@ class BrokerReplicator : public broker::
const qpid::framing::FieldTable& args,
const std::string& alternateExchange);
+ void deleteQueue(const std::string& name);
+ void deleteExchange(const std::string& name);
+
std::string logPrefix;
std::string userId, remoteHost;
ReplicationTest replicationTest;
@@ -125,6 +155,7 @@ class BrokerReplicator : public broker::
qpid::Address primary;
typedef std::set<std::string> StringSet;
StringSet replicatedExchanges; // exchanges that have been replicated.
+ Cleaner cleaner;
};
}} // namespace qpid::broker
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1393089&r1=1393088&r2=1393089&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Tue Oct 2 18:41:41 2012
@@ -70,6 +70,7 @@ QueueReplicator::QueueReplicator(HaBroke
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
+ getArgs().setString(QPID_REPLICATE, printable(NONE).str());
}
// This must be separate from the constructor so we can call shared_from_this.
@@ -123,8 +124,6 @@ void QueueReplicator::initializeBridge(B
SequenceNumber front, back;
queue->getRange(front, back, broker::REPLICATOR);
if (front <= back) settings.setInt(ReplicatingSubscription::QPID_FRONT,
front);
- QPID_LOG(debug, logPrefix << " subscribe with settings " << settings);
-
peer.getMessage().subscribe(
args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
false/*exclusive*/, "", 0, settings);
Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1393089&r1=1393088&r2=1393089&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Tue Oct 2 18:41:41 2012
@@ -209,6 +209,7 @@ class HaCluster(object):
def start(self, update_urls=True, args=[]):
"""Start a new broker in the cluster"""
b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
+ b.ready()
self._brokers.append(b)
if update_urls: self.update_urls()
return b
@@ -235,6 +236,7 @@ class HaCluster(object):
self._brokers[i] = HaBroker(
self.test, name=b.name, port=b.port(), brokers_url=self.url,
**self.kwargs)
+ self._brokers[i].ready()
def bounce(self, i, promote_next=True):
"""Stop and restart a broker in a cluster."""
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1393089&r1=1393088&r2=1393089&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Tue Oct 2 18:41:41 2012
@@ -624,6 +624,30 @@ acl deny all all
actual = [m.content for m in primary.get_messages("pq", 100)]
self.assertEqual(expect, actual)
+ def test_delete_missing_response(self):
+ """Check that a backup correctly deletes leftover queues and exchanges
that are
+ missing from the initial reponse set."""
+ cluster = HaCluster(self,2)
+ s = cluster[0].connect().session()
+ s.sender("q1;{create:always}")
+ s.sender("q2;{create:always}")
+ s.sender("e1;{create:always, node:{type:topic}}")
+ s.sender("e2;{create:always, node:{type:topic}}")
+ cluster.bounce(0, promote_next=False)
+ # Fake a primary that has deleted some queues and exchanges.
+ s = cluster[0].connect_admin().session()
+ s.sender("q2;{create:always}")
+ s.sender("e2;{create:always, node:{type:topic}}")
+ s.sender("x;{create:always}") # A new queue so we can wait for the
update.
+ cluster[0].promote()
+ # Verify the backup has deleted the missing queues and exchanges
+ cluster[1].wait_status("ready")
+ s = cluster[1].connect_admin().session()
+ cluster[1].wait_backup("x");
+ self.assertRaises(NotFound, s.receiver, ("q1"));
+ self.assertRaises(NotFound, s.receiver, ("e1"));
+
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given
fairshare limit
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]