Author: aconway
Date: Fri Nov  9 21:45:06 2012
New Revision: 1407661

URL: http://svn.apache.org/viewvc?rev=1407661&view=rev
Log:
Bug 874118 - HA Deadlock in backup broker after disconnecting from primary.

The backup broker was running ExchangeRegistry::for_each to clean up
connections, but this holds the ExchangeRegistry lock and hence the deadlock.
Now we copy a list of exchanges with for_each and work on it without the lock.

The issue showed up for 0 timeouts only because the queue schedules non-0
timeouts to a separate timer thread.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1407661&r1=1407660&r2=1407661&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Fri Nov  9 21:45:06 
2012
@@ -63,6 +63,7 @@ using qmf::org::apache::qpid::ha::EventM
 using qpid::broker::amqp_0_10::MessageTransfer;
 using namespace framing;
 using namespace std;
+using namespace boost;
 using std::ostream;
 using types::Variant;
 using namespace broker;
@@ -790,9 +791,7 @@ bool BrokerReplicator::isBound(boost::sh
 
 string BrokerReplicator::getType() const { return 
QPID_CONFIGURATION_REPLICATOR; }
 
-void BrokerReplicator::autoDeleteCheck(
-    boost::shared_ptr<Exchange> ex, set<string>& result)
-{
+void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) {
     boost::shared_ptr<QueueReplicator> 
qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
     if (!qr) return;
     assert(qr);
@@ -802,8 +801,9 @@ void BrokerReplicator::autoDeleteCheck(
             Queue::tryAutoDelete(broker, qr->getQueue(), remoteHost, userId);
         }
         else {
-            // Mark for immediate deletion.
-            result.insert(qr->getQueue()->getName());
+            // Delete immediately. Don't purge, the primary is gone so we need
+            // to reroute the deleted messages.
+            deleteQueue(qr->getQueue()->getName(), false);
         }
     }
 }
@@ -812,13 +812,12 @@ void BrokerReplicator::disconnected() {
     QPID_LOG(info, logPrefix << "Disconnected");
     connection = 0;
     // Clean up auto-delete queues
-    set<string> deleteQueues;
-    exchanges.eachExchange(boost::bind(&BrokerReplicator::autoDeleteCheck,
-                                           this, _1, 
boost::ref(deleteQueues)));
-    // Don't purge before deleting, the primary is gone so we need to
-    // reroute the deleted messages.
-    for_each(deleteQueues.begin(), deleteQueues.end(),
-             boost::bind(&BrokerReplicator::deleteQueue, this, _1, false));
+    vector<boost::shared_ptr<Exchange> > collect;
+    // Make a copy so we can work outside the ExchangeRegistry lock
+    exchanges.eachExchange(
+        boost::bind(&vector<boost::shared_ptr<Exchange> >::push_back, 
ref(collect), _1));
+    for_each(collect.begin(), collect.end(),
+             boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1));
 }
 
 }} // namespace broker

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1407661&r1=1407660&r2=1407661&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Fri Nov  9 21:45:06 2012
@@ -133,7 +133,7 @@ class BrokerReplicator : public broker::
     void deleteQueue(const std::string& name, bool purge=true);
     void deleteExchange(const std::string& name);
 
-    void autoDeleteCheck(boost::shared_ptr<broker::Exchange>, 
std::set<std::string>&);
+    void autoDeleteCheck(boost::shared_ptr<broker::Exchange>);
     void disconnected();
 
     std::string logPrefix;

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1407661&r1=1407660&r2=1407661&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Fri Nov  9 21:45:06 2012
@@ -753,11 +753,16 @@ acl deny all all
 
     def test_auto_delete_timeout(self):
         cluster = HaCluster(self, 2)
-        s = 
cluster[0].connect().session().receiver("q;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
-        cluster[1].wait_queue("q")
+        # Test timeout
+        r1 = 
cluster[0].connect().session().receiver("q1;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
+        # Test special case of timeout = 0
+        r0 = 
cluster[0].connect().session().receiver("q0;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':0}}}}")
+        cluster[1].wait_queue("q0")
+        cluster[1].wait_queue("q1")
         cluster[0].kill()
-        cluster[1].wait_queue("q")    # Not timed out yet
-        cluster[1].wait_no_queue("q") # Wait for timeout
+        cluster[1].wait_queue("q1")                       # Not timed out yet
+        cluster[1].wait_no_queue("q1", timeout=2)         # Wait for timeout
+        cluster[1].wait_no_queue("q0", timeout=2)
 
     def test_alt_exchange_dup(self):
         """QPID-4349: if a queue has an alterante exchange and is deleted the
@@ -817,10 +822,13 @@ acl deny all all
                     if class_name(m) == 'queueDeclare' and q_name(m) == qname: 
found = True
             except Empty: pass
             assert(found)
-        verify_qmf_events("q1")
-        cluster[1].wait_status("ready")
-        cluster.kill(0)
-        verify_qmf_events("q2")
+        try:
+            verify_qmf_events("q1")
+            cluster[1].wait_status("ready")
+            l = LogLevel(ERROR) # Hide expected WARNING log messages from 
failover.
+            cluster.kill(0)
+            verify_qmf_events("q2")
+        finally: l.restore()
 
 def fairshare(msgs, limit, levels):
     """



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to