Author: aconway
Date: Thu Jul 19 19:16:12 2012
New Revision: 1363486

URL: http://svn.apache.org/viewvc?rev=1363486&view=rev
Log:
QPID-4144 HA broker deadlocks on broker::QueueRegistry lock and ha::Primary lock

Running tests repeatedly, the broker deadlocked with the attached stack trace.

The problem call sequences are:
1. QueueRegistry::destroy takes QueuerRegistry lock > 
ConfigurationObserver::queueDestroy > ha::Primary::queueDestroy takes Primary 
lock.
2. ConnectionObserver::opened cals Primary::opened lock> 
RemoteBackup>getQueues().eachQueue

This patch breaks the deadlock at both ends: QueueRegistry no longer holds the 
lock across the observer call and Primary does not hold the lock across 
eachQueue.

Modified:
    qpid/branches/0.18/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    qpid/branches/0.18/qpid/cpp/src/qpid/broker/QueueRegistry.h
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/RemoteBackup.h

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1363486&r1=1363485&r2=1363486&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Thu Jul 19 
19:16:12 2012
@@ -79,18 +79,17 @@ QueueRegistry::declare(const string& dec
     return result;
 }
 
-void QueueRegistry::destroyLH (const string& name) {
-    QueueMap::iterator i = queues.find(name);
-    if (i != queues.end()) {
-        Queue::shared_ptr q = i->second;
-        queues.erase(i);
-        if (broker) broker->getConfigurationObservers().queueDestroy(q);
+void QueueRegistry::destroy(const string& name) {
+    Queue::shared_ptr q;
+    {
+        qpid::sys::RWlock::ScopedWlock locker(lock);
+        QueueMap::iterator i = queues.find(name);
+        if (i != queues.end()) {
+            Queue::shared_ptr q = i->second;
+            queues.erase(i);
+        }
     }
-}
-
-void QueueRegistry::destroy (const string& name){
-    RWlock::ScopedWlock locker(lock);
-    destroyLH (name);
+    if (broker && q) broker->getConfigurationObservers().queueDestroy(q);
 }
 
 Queue::shared_ptr QueueRegistry::find(const string& name){

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=1363486&r1=1363485&r2=1363486&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/broker/QueueRegistry.h Thu Jul 19 
19:16:12 2012
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -61,7 +61,7 @@ class QueueRegistry {
     QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Queue>, bool> declare(
         const std::string& name,
         bool durable = false,
-        bool autodelete = false, 
+        bool autodelete = false,
         const OwnershipToken* owner = 0,
         boost::shared_ptr<Exchange> alternateExchange = 
boost::shared_ptr<Exchange>(),
         const qpid::framing::FieldTable& args = framing::FieldTable(),
@@ -82,9 +82,8 @@ class QueueRegistry {
     QPID_BROKER_EXTERN void destroy(const std::string& name);
     template <class Test> bool destroyIf(const std::string& name, Test test)
     {
-        qpid::sys::RWlock::ScopedWlock locker(lock);
         if (test()) {
-            destroyLH (name);
+            destroy(name);
             return true;
         } else {
             return false;
@@ -127,13 +126,13 @@ class QueueRegistry {
         for (QueueMap::const_iterator i = queues.begin(); i != queues.end(); 
++i)
             f(i->second);
     }
-       
+
        /**
        * Change queue mode when cluster size drops to 1 node, expands again
        * in practice allows flow queue to disk when last name to be exectuted
        */
        void updateQueueClusterState(bool lastNode);
-    
+
 private:
     typedef std::map<std::string, boost::shared_ptr<Queue> > QueueMap;
     QueueMap queues;
@@ -144,12 +143,9 @@ private:
     management::Manageable* parent;
     bool lastNode; //used to set mode on queue declare
     Broker* broker;
-
-    //destroy impl that assumes lock is already held:
-    void destroyLH (const std::string& name);
 };
 
-    
+
 }} // namespace qpid::broker
 
 

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/Primary.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1363486&r1=1363485&r2=1363486&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/Primary.cpp Thu Jul 19 19:16:12 2012
@@ -90,13 +90,10 @@ Primary::Primary(HaBroker& hb, const Bro
         QPID_LOG(debug, logPrefix << "Promoted, expected backups: " << expect);
         for (BrokerInfo::Set::const_iterator i = expect.begin(); i != 
expect.end(); ++i) {
             boost::shared_ptr<RemoteBackup> backup(
-                new RemoteBackup(
-                    *i, haBroker.getBroker(), haBroker.getReplicationTest(),
-                    true, // Create queue guards immediately for expected 
backups.
-                    false  // Not yet connected.
-                ));
+                new RemoteBackup(*i, haBroker.getReplicationTest(), false));
             backups[i->getSystemId()] = backup;
             if (!backup->isReady()) expectedBackups.insert(backup);
+            backup->createGuards(hb.getBroker().getQueues());
         }
         // Set timeout for expected brokers to connect and become ready.
         sys::Duration 
timeout(int64_t(hb.getSettings().backupTimeout*sys::TIME_SEC));
@@ -181,16 +178,13 @@ void Primary::queueDestroy(const QueuePt
 void Primary::opened(broker::Connection& connection) {
     Mutex::ScopedLock l(lock);
     BrokerInfo info;
+    boost::shared_ptr<RemoteBackup> backup;
     if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
         BackupMap::iterator i = backups.find(info.getSystemId());
         if (i == backups.end()) {
+            backup.reset(new RemoteBackup(info, haBroker.getReplicationTest(), 
true));
+            backups[info.getSystemId()] = backup;
             QPID_LOG(debug, logPrefix << "New backup connected: " << info);
-            backups[info.getSystemId()].reset(
-                new RemoteBackup(
-                    info, haBroker.getBroker(), haBroker.getReplicationTest(),
-                    false, // Lazy-create guards for new backups, creating now 
deadlocks
-                    true // Backup is connected
-                ));
         }
         else {
             QPID_LOG(debug, logPrefix << "Known backup connected: " << info);

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1363486&r1=1363485&r2=1363486&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Thu Jul 19 
19:16:12 2012
@@ -30,14 +30,15 @@ namespace ha {
 
 using sys::Mutex;
 
-RemoteBackup::RemoteBackup(
-    const BrokerInfo& info, broker::Broker& broker, ReplicationTest rt, bool 
cg, bool con) :
+RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool 
con) :
     logPrefix("Primary remote backup "+info.getLogId()+": "),
-    brokerInfo(info), replicationTest(rt),
-    createGuards(cg), connected(con)
+    brokerInfo(info), replicationTest(rt), connected(con)
+{}
+
+void RemoteBackup::createGuards(broker::QueueRegistry& queues)
 {
     QPID_LOG(debug, logPrefix << "Guarding queues for backup broker.");
-    broker.getQueues().eachQueue(boost::bind(&RemoteBackup::initialQueue, 
this, _1));
+    queues.eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1));
 }
 
 RemoteBackup::~RemoteBackup() { cancel(); }
@@ -56,14 +57,12 @@ void RemoteBackup::initialQueue(const Qu
 }
 
 RemoteBackup::GuardPtr RemoteBackup::guard(const QueuePtr& q) {
-    if (!createGuards) return RemoteBackup::GuardPtr();
     GuardMap::iterator i = guards.find(q);
-    if (i == guards.end()) {
-        assert(0);
-        throw Exception(logPrefix+": Cannot find queue guard: "+q->getName());
+    GuardPtr guard;
+    if (i != guards.end()) {
+        guard = i->second;
+        guards.erase(i);
     }
-    GuardPtr guard = i->second;
-    guards.erase(i);
     return guard;
 }
 
@@ -89,7 +88,7 @@ void RemoteBackup::ready(const QueuePtr&
 
 // Called via ConfigurationObserver and from initialQueue
 void RemoteBackup::queueCreate(const QueuePtr& q) {
-    if (createGuards && replicationTest.isReplicated(ALL, *q))
+    if (replicationTest.isReplicated(ALL, *q))
         guards[q].reset(new QueueGuard(*q, brokerInfo));
 }
 

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/RemoteBackup.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/RemoteBackup.h?rev=1363486&r1=1363485&r2=1363486&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/RemoteBackup.h (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/RemoteBackup.h Thu Jul 19 19:16:12 
2012
@@ -31,8 +31,8 @@
 namespace qpid {
 
 namespace broker {
-class Broker;
 class Queue;
+class QueueRegistry;
 }
 
 namespace ha {
@@ -51,11 +51,15 @@ class RemoteBackup
     typedef boost::shared_ptr<QueueGuard> GuardPtr;
     typedef boost::shared_ptr<broker::Queue> QueuePtr;
 
-    /** Note: isReady() can be true after construction */
-    RemoteBackup(const BrokerInfo& info, broker::Broker&, ReplicationTest rt,
-                 bool createGuards, bool connected);
+    /** Note: isReady() can be true after construction
+     *@param connected true if the backup is already connected.
+     */
+    RemoteBackup(const BrokerInfo& info, ReplicationTest, bool connected);
     ~RemoteBackup();
 
+    /** Create initial guards for all the replicated queues in the registry. */
+    void createGuards(broker::QueueRegistry&);
+
     /** Return guard associated with a queue. Used to create 
ReplicatingSubscription. */
     GuardPtr guard(const QueuePtr&);
 
@@ -85,15 +89,15 @@ class RemoteBackup
     typedef std::map<QueuePtr, GuardPtr> GuardMap;
     typedef std::set<QueuePtr> QueueSet;
 
+    /** Add queue to guard as an initial queue */
+    void initialQueue(const QueuePtr&);
+
     std::string logPrefix;
     BrokerInfo brokerInfo;
     ReplicationTest replicationTest;
     GuardMap guards;
     QueueSet initialQueues;
-    bool createGuards;
     bool connected;
-
-    void initialQueue(const QueuePtr&);
 };
 
 }} // namespace qpid::ha



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to