Author: aconway
Date: Tue Oct  2 18:41:41 2012
New Revision: 1393089

URL: http://svn.apache.org/viewvc?rev=1393089&view=rev
Log:
QPID-4285: HA backups continuously disconnect / re-sync after attempting to 
replicate a deleted queue. (Based on patch by Jason Dillama)

This does not directly tackle the origin of the problem but extends Jasons's 
patch since
it addresses something we had to fix anyway: "leaking" queues and exchanges. It 
does 2 things.

1. enabled hideDeletedError on all subscription objects used by HA
This suppress the troublesome exception with a harmless no-op

2. Delete queues/exchanges missing from responses (based on Jasons patch)
Fix the "leak" of queues and exchanges possible when an object replicated
to a backup is deleted from the newn primary before the backup connects.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_test.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1393089&r1=1393088&r2=1393089&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Oct  2 18:41:41 2012
@@ -370,7 +370,7 @@ bool Queue::acquire(const QueueCursor& p
 
 bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
 {
-    checkNotDeleted(c);
+    if (!checkNotDeleted(c)) return false;
     QueueListeners::NotificationSet set;
     while (true) {
         //TODO: reduce lock scope
@@ -1443,11 +1443,11 @@ QueueListeners& Queue::getListeners() { 
 Messages& Queue::getMessages() { return *messages; }
 const Messages& Queue::getMessages() const { return *messages; }
 
-void Queue::checkNotDeleted(const Consumer::shared_ptr& c)
+bool Queue::checkNotDeleted(const Consumer::shared_ptr& c)
 {
-    if (deleted && !c->hideDeletedError()) {
+    if (deleted && !c->hideDeletedError())
         throw ResourceDeletedException(QPID_MSG("Queue " << getName() << " has 
been deleted."));
-    }
+    return !deleted;
 }
 
 void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1393089&r1=1393088&r2=1393089&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Oct  2 18:41:41 2012
@@ -188,7 +188,7 @@ class Queue : public boost::enable_share
     int getEventMode();
     void dequeueFromStore(boost::intrusive_ptr<PersistableMessage>);
     void abandoned(const Message& message);
-    void checkNotDeleted(const Consumer::shared_ptr&);
+    bool checkNotDeleted(const Consumer::shared_ptr&);
     void notifyDeleted();
     uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, 
SubscriptionType);
     virtual bool checkDepth(const QueueDepth& increment, const Message&);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1393089&r1=1393088&r2=1393089&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Tue Oct  2 18:41:41 
2012
@@ -33,6 +33,7 @@
 #include "qpid/broker/SessionHandler.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/reply_exceptions.h"
 #include "qmf/org/apache/qpid/broker/EventBind.h"
 #include "qmf/org/apache/qpid/broker/EventUnbind.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
@@ -58,8 +59,9 @@ using qmf::org::apache::qpid::broker::Ev
 using qmf::org::apache::qpid::broker::EventQueueDelete;
 using qmf::org::apache::qpid::broker::EventSubscribe;
 using qmf::org::apache::qpid::ha::EventMembersUpdate;
+using qpid::broker::amqp_0_10::MessageTransfer;
 using namespace framing;
-using std::string;
+using namespace std;
 using std::ostream;
 using types::Variant;
 using namespace broker;
@@ -177,8 +179,11 @@ BrokerReplicator::BrokerReplicator(HaBro
       logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
       haBroker(hb), broker(hb.getBroker()), link(l),
       initialized(false),
-      alternates(hb.getBroker().getExchanges())
-{}
+      alternates(hb.getBroker().getExchanges()),
+      cleaner(*this)
+{
+    getArgs().setString(QPID_REPLICATE, printable(NONE).str());
+}
 
 void BrokerReplicator::initialize() {
     // Can't do this in the constructor because we need a shared_ptr to this.
@@ -223,6 +228,10 @@ void BrokerReplicator::initializeBridge(
              << " status:" << printable(haBroker.getStatus()));
     initialized = true;
 
+    // Scan for existing replicated queues and exchanges. Any that have not 
been seen by
+    // the time all reponses are received will be cleaned up.
+    cleaner.start();
+
     framing::AMQP_ServerProxy peer(sessionHandler.out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& 
args(bridge.getArgs());
 
@@ -254,7 +263,7 @@ void BrokerReplicator::route(Deliverable
     }
     Variant::List list;
     try {
-        if 
(!qpid::broker::amqp_0_10::MessageTransfer::isQMFv2(msg.getMessage()))
+        if (!MessageTransfer::isQMFv2(msg.getMessage()))
             throw Exception("Unexpected message, not QMF2 event or query 
response.");
         // decode as list
         string content = msg.getMessage().getContent();
@@ -287,13 +296,18 @@ void BrokerReplicator::route(Deliverable
                 else if (type == BINDING) doResponseBind(values);
                 else if (type == HA_BROKER) doResponseHaBroker(values);
             }
-            if 
(qpid::broker::amqp_0_10::MessageTransfer::isLastQMFResponse(msg.getMessage(), 
EXCHANGE)) {
-                // We have received all of the exchange response.
+            if (MessageTransfer::isLastQMFResponse(msg.getMessage(), 
EXCHANGE)) {
+                QPID_LOG(debug, logPrefix << "Initial exchange configuration 
complete.");
+                cleaner.cleanExchanges(); // Clean up exchanges that no longer 
exist in the primary
                 alternates.clear();
             }
+            if (MessageTransfer::isLastQMFResponse(msg.getMessage(), QUEUE)) {
+                QPID_LOG(debug, logPrefix << "Initial queue configuration 
complete.");
+                cleaner.cleanQueues(); // Clean up queues that no longer exist 
in the primary
+            }
         }
     } catch (const std::exception& e) {
-        QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what()
+        QPID_LOG(critical, logPrefix << "Configuration replication failed: " 
<< e.what()
                  << ": while handling: " << list);
         haBroker.shutdown();
         throw;
@@ -308,14 +322,15 @@ void BrokerReplicator::doEventQueueDecla
     if (values[DISP] == CREATED && replicationTest.isReplicated(CONFIGURATION, 
argsMap, autoDel, excl)) {
         string name = values[QNAME].asString();
         QueueSettings settings(values[DURABLE].asBool(), 
values[AUTODEL].asBool());
+        QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
+        cleaner.forgetQueue(name);
         framing::FieldTable args;
         qpid::amqp_0_10::translate(argsMap, args);
         // If we already have a queue with this name, replace it.
         // The queue was definitely created on the primary.
         if (broker.getQueues().find(name)) {
             QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << 
name);
-            broker.deleteQueue(name, userId, remoteHost);
-            stopQueueReplicator(name);
+            deleteQueue(name);
         }
         settings.populate(args, settings.storeSettings);
         CreateQueueResult result =
@@ -346,8 +361,8 @@ void BrokerReplicator::doEventQueueDelet
     boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
     if (queue && 
replicationTest.replicateLevel(queue->getSettings().storeSettings)) {
         QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
-        stopQueueReplicator(name);
-        broker.deleteQueue(name, userId, remoteHost);
+        cleaner.forgetQueue(name);
+        deleteQueue(name);
     }
 }
 
@@ -357,13 +372,14 @@ void BrokerReplicator::doEventExchangeDe
     if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
         string name = values[EXNAME].asString();
         QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
+        cleaner.forgetExchange(name);
         framing::FieldTable args;
         qpid::amqp_0_10::translate(argsMap, args);
         // If we already have a exchange with this name, replace it.
         // The exchange was definitely created on the primary.
         if (broker.getExchanges().find(name)) {
-            broker.deleteExchange(name, userId, remoteHost);
-            QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << 
name);
+            deleteExchange(name);
+            QPID_LOG(warning, logPrefix << "Replaced existing exchange: " << 
name);
         }
         CreateExchangeResult result = createExchange(
             name, values[EXTYPE].asString(), values[DURABLE].asBool(), args,
@@ -377,12 +393,13 @@ void BrokerReplicator::doEventExchangeDe
     string name = values[EXNAME].asString();
     boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
     if (!exchange) {
-        QPID_LOG(warning, logPrefix << "Exchange delete event, does not exist: 
" << name);
+        QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << 
name);
     } else if (!replicationTest.replicateLevel(exchange->getArgs())) {
         QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: 
" << name);
     } else {
         QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
-        broker.deleteExchange(name, userId, remoteHost);
+        cleaner.forgetExchange(name);
+        deleteExchange(name);
         replicatedExchanges.erase(name);
     }
 }
@@ -457,6 +474,7 @@ void BrokerReplicator::doResponseQueue(V
             values[EXCLUSIVE].asBool()))
         return;
     string name(values[NAME].asString());
+    cleaner.forgetQueue(name);
     QPID_LOG(debug, logPrefix << "Queue response: " << name);
     if (broker.getQueues().find(name)) { // Already exists
         if (findQueueReplicator(name))
@@ -478,6 +496,7 @@ void BrokerReplicator::doResponseExchang
     Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
     if (!replicationTest.replicateLevel(argsMap)) return;
     string name = values[NAME].asString();
+    cleaner.forgetExchange(name);
     QPID_LOG(debug, logPrefix << "Exchange response: " << name);
     if (broker.getExchanges().find(name)) {
         if (replicatedExchanges.find(name) != replicatedExchanges.end())
@@ -572,7 +591,7 @@ void BrokerReplicator::startQueueReplica
     }
 }
 
-void BrokerReplicator::stopQueueReplicator(const std::string& name) {
+void BrokerReplicator::deleteQueue(const std::string& name) {
     boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
     if (qr) {
         qr->deactivate();
@@ -580,6 +599,22 @@ void BrokerReplicator::stopQueueReplicat
         // actually be destroyed.
         broker.getExchanges().destroy(qr->getName());
     }
+    qr.reset();
+    try {
+        broker.deleteQueue(name, userId, remoteHost);
+        QPID_LOG(debug, logPrefix << "Queue deleted: " << name);
+    } catch (const framing::NotFoundException&) {
+        QPID_LOG(debug, logPrefix << "Queue not found for deletion: " << name);
+    }
+}
+
+void BrokerReplicator::deleteExchange(const std::string& name) {
+    try {
+        broker.deleteExchange(name, userId, remoteHost);
+        QPID_LOG(debug, logPrefix << "Exchange deleted: " << name);
+    } catch (const framing::NotFoundException&) {
+        QPID_LOG(debug, logPrefix << "Exchange not found for deletion: " << 
name);
+    }
 }
 
 BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue(
@@ -639,4 +674,45 @@ bool BrokerReplicator::isBound(boost::sh
 string BrokerReplicator::getType() const { return 
QPID_CONFIGURATION_REPLICATOR; }
 
 
+// BrokerReplicator::Cleaner
+
+BrokerReplicator::Cleaner::Cleaner(BrokerReplicator& br) : 
brokerReplicator(br) {}
+
+void BrokerReplicator::Cleaner::start() {
+    queues.clear();
+    exchanges.clear();
+    
brokerReplicator.broker.getExchanges().eachExchange(boost::bind(&Cleaner::addExchange,
 this, _1));
+    
brokerReplicator.broker.getQueues().eachQueue(boost::bind(&Cleaner::addQueue, 
this, _1));
+}
+
+void 
BrokerReplicator::Cleaner::addExchange(boost::shared_ptr<broker::Exchange> ex) {
+    if (brokerReplicator.replicationTest.isReplicated(CONFIGURATION, *ex))
+        exchanges.insert(ex->getName());
+}
+
+void BrokerReplicator::Cleaner::addQueue(boost::shared_ptr<broker::Queue> q) {
+    if (brokerReplicator.replicationTest.isReplicated(CONFIGURATION, *q))
+        queues.insert(q->getName());
+}
+
+void BrokerReplicator::Cleaner::forgetExchange(const std::string& name) {
+    exchanges.erase(name);
+}
+
+void BrokerReplicator::Cleaner::forgetQueue(const std::string& name) {
+    queues.erase(name);
+}
+
+void BrokerReplicator::Cleaner::cleanExchanges() {
+    for_each(exchanges.begin(), exchanges.end(),
+             boost::bind(&BrokerReplicator::deleteExchange, &brokerReplicator, 
_1));
+    exchanges.clear();
+}
+
+void BrokerReplicator::Cleaner::cleanQueues() {
+    for_each(queues.begin(), queues.end(),
+             boost::bind(&BrokerReplicator::deleteQueue, &brokerReplicator, 
_1));
+    queues.clear();
+}
+
 }} // namespace broker

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1393089&r1=1393088&r2=1393089&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Tue Oct  2 18:41:41 2012
@@ -31,6 +31,7 @@
 #include "qpid/management/ManagementObject.h"
 #include <boost/shared_ptr.hpp>
 #include <boost/enable_shared_from_this.hpp>
+#include <set>
 
 namespace qpid {
 
@@ -81,6 +82,33 @@ class BrokerReplicator : public broker::
     typedef std::pair<boost::shared_ptr<broker::Queue>, bool> 
CreateQueueResult;
     typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> 
CreateExchangeResult;
 
+    /** Keep track of queues and exchanges that need to be cleaned up. */
+    class Cleaner {
+      public:
+        Cleaner(BrokerReplicator&);
+
+        /** Scan for existing queues and exchanges. */
+        void start();
+
+        // Forget a queue/exchange that does not need cleaning
+        void forgetExchange(const std::string& name);
+        void forgetQueue(const std::string& name);
+
+        void cleanExchanges();
+        void cleanQueues();
+
+      private:
+        typedef std::set<std::string> Names;
+
+        // add a queue/exchange that may need cleaning.
+        void addExchange(boost::shared_ptr<broker::Exchange>);
+        void addQueue(boost::shared_ptr<broker::Queue>);
+
+        BrokerReplicator& brokerReplicator;
+        Names queues, exchanges;
+    };
+  friend class Cleaner;
+
     void initializeBridge(broker::Bridge&, broker::SessionHandler&);
 
     void doEventQueueDeclare(types::Variant::Map& values);
@@ -98,7 +126,6 @@ class BrokerReplicator : public broker::
 
     QueueReplicatorPtr findQueueReplicator(const std::string& qname);
     void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
-    void stopQueueReplicator(const std::string& name);
 
     CreateQueueResult createQueue(
         const std::string& name,
@@ -114,6 +141,9 @@ class BrokerReplicator : public broker::
         const qpid::framing::FieldTable& args,
         const std::string& alternateExchange);
 
+    void deleteQueue(const std::string& name);
+    void deleteExchange(const std::string& name);
+
     std::string logPrefix;
     std::string userId, remoteHost;
     ReplicationTest replicationTest;
@@ -125,6 +155,7 @@ class BrokerReplicator : public broker::
     qpid::Address primary;
     typedef std::set<std::string> StringSet;
     StringSet replicatedExchanges; // exchanges that have been replicated.
+    Cleaner cleaner;
 };
 }} // namespace qpid::broker
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1393089&r1=1393088&r2=1393089&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Tue Oct  2 18:41:41 2012
@@ -70,6 +70,7 @@ QueueReplicator::QueueReplicator(HaBroke
     args.setString(QPID_REPLICATE, printable(NONE).str());
     Uuid uuid(true);
     bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
+    getArgs().setString(QPID_REPLICATE, printable(NONE).str());
 }
 
 // This must be separate from the constructor so we can call shared_from_this.
@@ -123,8 +124,6 @@ void QueueReplicator::initializeBridge(B
     SequenceNumber front, back;
     queue->getRange(front, back, broker::REPLICATOR);
     if (front <= back) settings.setInt(ReplicatingSubscription::QPID_FRONT, 
front);
-    QPID_LOG(debug, logPrefix << " subscribe with settings  " << settings);
-
     peer.getMessage().subscribe(
         args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
         false/*exclusive*/, "", 0, settings);

Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1393089&r1=1393088&r2=1393089&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Tue Oct  2 18:41:41 2012
@@ -209,6 +209,7 @@ class HaCluster(object):
     def start(self, update_urls=True, args=[]):
         """Start a new broker in the cluster"""
         b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
+        b.ready()
         self._brokers.append(b)
         if update_urls: self.update_urls()
         return b
@@ -235,6 +236,7 @@ class HaCluster(object):
         self._brokers[i] = HaBroker(
             self.test, name=b.name, port=b.port(), brokers_url=self.url,
             **self.kwargs)
+        self._brokers[i].ready()
 
     def bounce(self, i, promote_next=True):
         """Stop and restart a broker in a cluster."""

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1393089&r1=1393088&r2=1393089&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Tue Oct  2 18:41:41 2012
@@ -624,6 +624,30 @@ acl deny all all
         actual = [m.content for m in primary.get_messages("pq", 100)]
         self.assertEqual(expect, actual)
 
+    def test_delete_missing_response(self):
+        """Check that a backup correctly deletes leftover queues and exchanges 
that are
+        missing from the initial reponse set."""
+        cluster = HaCluster(self,2)
+        s = cluster[0].connect().session()
+        s.sender("q1;{create:always}")
+        s.sender("q2;{create:always}")
+        s.sender("e1;{create:always, node:{type:topic}}")
+        s.sender("e2;{create:always, node:{type:topic}}")
+        cluster.bounce(0, promote_next=False)
+        # Fake a primary that has deleted some queues and exchanges.
+        s = cluster[0].connect_admin().session()
+        s.sender("q2;{create:always}")
+        s.sender("e2;{create:always, node:{type:topic}}")
+        s.sender("x;{create:always}") # A new queue so we can wait for the 
update.
+        cluster[0].promote()
+        # Verify the backup has deleted the missing queues and exchanges
+        cluster[1].wait_status("ready")
+        s = cluster[1].connect_admin().session()
+        cluster[1].wait_backup("x");
+        self.assertRaises(NotFound, s.receiver, ("q1"));
+        self.assertRaises(NotFound, s.receiver, ("e1"));
+
+
 def fairshare(msgs, limit, levels):
     """
     Generator to return prioritised messages in expected order for a given 
fairshare limit



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to