Author: aconway
Date: Fri Jun 22 19:28:42 2012
New Revision: 1353001

URL: http://svn.apache.org/viewvc?rev=1353001&view=rev
Log:
NO-JIRA: Simplify locking and remove member-update callback in HA code.

Get rid of the separate Membership lock and put HaBroker in control of
membership changes. Removes a potential deadlock which could explain
some observed failures in long-running failover tests.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1353001&r1=1353000&r2=1353001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp Fri Jun 22 19:28:42 2012
@@ -83,6 +83,8 @@ void Backup::initialize(const Url& broke
         false,                  // durable
         settings.mechanism, settings.username, settings.password,
         false);                 // amq.failover
+
+    sys::Mutex::ScopedLock l(lock);
     link = result.first;
     link->setUrl(url);
     replicator.reset(new BrokerReplicator(haBroker, link));
@@ -93,7 +95,6 @@ void Backup::initialize(const Url& broke
 Backup::~Backup() {
     if (link) link->close();
     if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
-    replicator.reset();
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1353001&r1=1353000&r2=1353001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Fri Jun 22 19:28:42 
2012
@@ -546,7 +546,7 @@ void BrokerReplicator::doResponseHaBroke
         if (mine != primary)
             throw Exception(QPID_MSG("Replicate default on backup (" << mine
                                      << ") does not match primary (" <<  
primary << ")"));
-        haBroker.getMembership().assign(values[MEMBERS].asList());
+        haBroker.setMembership(values[MEMBERS].asList());
     } catch (const std::exception& e) {
         QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " << 
e.what());
         haBroker.shutdown();

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1353001&r1=1353000&r2=1353001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Fri Jun 22 19:28:42 2012
@@ -66,7 +66,7 @@ HaBroker::HaBroker(broker::Broker& b, co
       brokerInfo(broker.getSystem()->getNodeName(),
                  // TODO aconway 2012-05-24: other transports?
                  broker.getPort(broker::Broker::TCP_TRANSPORT), systemId),
-      membership(systemId, boost::bind(&HaBroker::membershipUpdate, this, _1)),
+      membership(systemId),
       replicationTest(s.replicateDefault.get())
 {
     // Set up the management object.
@@ -271,17 +271,53 @@ void HaBroker::statusChanged(Mutex::Scop
     setLinkProperties(l);
 }
 
-void HaBroker::membershipUpdate(const Variant::List& brokers) {
-    // FIXME aconway 2012-06-12: nasty callback in callback, clean up.
-    BrokerInfo info;
-    if (getStatus() == CATCHUP && getMembership().get(systemId, info) && 
info.getStatus() == READY)
-        setStatus(READY);
-
-    // No lock, only calls thread-safe objects.
+void HaBroker::membershipUpdated(const Variant::List& brokers) {
+    // No lock, these are thread-safe.
     mgmtObject->set_members(brokers);
     broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
 }
 
+void HaBroker::setMembership(const Variant::List& brokers) {
+    Mutex::ScopedLock l(lock);
+    membership.assign(brokers);
+    BrokerInfo info;
+    // Check if my own status has been updated to READY
+    if (getStatus() == CATCHUP &&
+        membership.get(systemId, info) && info.getStatus() == READY)
+        setStatus(READY, l);
+    membershipUpdated(brokers);
+}
+
+void HaBroker::resetMembership(const BrokerInfo& b) {
+    Variant::List members;
+    {
+        Mutex::ScopedLock l(lock);
+        membership.reset(b);
+        members = membership.asList();
+    }
+    membershipUpdated(members);
+}
+
+void HaBroker::addBroker(const BrokerInfo& b) {
+    Variant::List members;
+    {
+        Mutex::ScopedLock l(lock);
+        membership.add(b);
+        members = membership.asList();
+    }
+    membershipUpdated(members);
+}
+
+void HaBroker::removeBroker(const Uuid& id) {
+    Variant::List members;
+    {
+        Mutex::ScopedLock l(lock);
+        membership.remove(id);
+        members = membership.asList();
+    }
+    membershipUpdated(members);
+}
+
 void HaBroker::setLinkProperties(Mutex::ScopedLock&) {
     framing::FieldTable linkProperties = broker.getLinkClientProperties();
     if (isBackup(status)) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1353001&r1=1353000&r2=1353001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Fri Jun 22 19:28:42 2012
@@ -87,8 +87,11 @@ class HaBroker : public management::Mana
     boost::shared_ptr<ConnectionObserver> getObserver() { return observer; }
 
     const BrokerInfo& getBrokerInfo() const { return brokerInfo; }
-    Membership& getMembership() { return membership; }
-    void membershipUpdate(const types::Variant::List&);
+
+    void setMembership(const types::Variant::List&); // Set membership from 
list.
+    void resetMembership(const BrokerInfo& b); // Reset to contain just one 
member.
+    void addBroker(const BrokerInfo& b);       // Add a broker to the 
membership.
+    void removeBroker(const types::Uuid& id);  // Remove a broker from 
membership.
 
   private:
     void setClientUrl(const Url&, sys::Mutex::ScopedLock&);
@@ -105,6 +108,8 @@ class HaBroker : public management::Mana
 
     std::vector<Url> getKnownBrokers() const;
 
+    void membershipUpdated(const types::Variant::List&);
+
     std::string logPrefix;
     broker::Broker& broker;
     types::Uuid systemId;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp?rev=1353001&r1=1353000&r2=1353001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp Fri Jun 22 19:28:42 2012
@@ -28,70 +28,42 @@ namespace ha {
 
 
 void Membership::reset(const BrokerInfo& b) {
-    sys::Mutex::ScopedLock l(lock);
     brokers.clear();
     brokers[b.getSystemId()] = b;
-    update(l);
 }
 
 void Membership::add(const BrokerInfo& b) {
-    sys::Mutex::ScopedLock l(lock);
     brokers[b.getSystemId()] = b;
-    update(l);
 }
 
 
 void Membership::remove(const types::Uuid& id) {
-    sys::Mutex::ScopedLock l(lock);
     BrokerInfo::Map::iterator i = brokers.find(id);
     if (i != brokers.end()) {
         brokers.erase(i);
-        update(l);
-    }
+        }
 }
 
 bool Membership::contains(const types::Uuid& id) {
-    sys::Mutex::ScopedLock l(lock);
     return brokers.find(id) != brokers.end();
 }
 
 void Membership::assign(const types::Variant::List& list) {
-    sys::Mutex::ScopedLock l(lock);
     brokers.clear();
     for (types::Variant::List::const_iterator i = list.begin(); i != 
list.end(); ++i) {
         BrokerInfo b(i->asMap());
         brokers[b.getSystemId()] = b;
     }
-    update(l);
 }
 
 types::Variant::List Membership::asList() const {
-    sys::Mutex::ScopedLock l(lock);
-    return asList(l);
-}
-
-types::Variant::List Membership::asList(sys::Mutex::ScopedLock&) const {
     types::Variant::List list;
     for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != 
brokers.end(); ++i)
         list.push_back(i->second.asMap());
     return list;
 }
 
-void Membership::update(sys::Mutex::ScopedLock& l) {
-    if (updateCallback) {
-        types::Variant::List list = asList(l);
-        sys::Mutex::ScopedUnlock u(lock);
-        updateCallback(list);
-    }
-    QPID_LOG(debug, " HA: Membership update: " << brokers);
-}
-
 BrokerInfo::Set Membership::otherBackups() const {
-    sys::Mutex::ScopedLock l(lock);
-    return otherBackups(l);
-}
-
-BrokerInfo::Set Membership::otherBackups(sys::Mutex::ScopedLock&) const {
     BrokerInfo::Set result;
     for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != 
brokers.end(); ++i)
         if (isBackup(i->second.getStatus()) && i->second.getSystemId() != self)
@@ -100,7 +72,6 @@ BrokerInfo::Set Membership::otherBackups
 }
 
 bool Membership::get(const types::Uuid& id, BrokerInfo& result) {
-    sys::Mutex::ScopedLock l(lock);
     BrokerInfo::Map::iterator i = brokers.find(id);
     if (i == brokers.end()) return false;
     result = i->second;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h?rev=1353001&r1=1353000&r2=1353001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h Fri Jun 22 19:28:42 2012
@@ -36,14 +36,12 @@ namespace ha {
 
 /**
  * Keep track of the brokers in the membership.
- * THREAD SAFE: updated in arbitrary connection threads.
+ * THREAD UNSAFE: caller must serialize
  */
 class Membership
 {
   public:
-    typedef boost::function<void (const types::Variant::List&) > 
UpdateCallback;
-    Membership(const types::Uuid& self_, UpdateCallback updateFn)
-        : self(self_), updateCallback(updateFn) {}
+    Membership(const types::Uuid& self_) : self(self_) {}
 
     void reset(const BrokerInfo& b); ///< Reset to contain just one member.
     void add(const BrokerInfo& b);
@@ -58,14 +56,8 @@ class Membership
     bool get(const types::Uuid& id, BrokerInfo& result);
 
   private:
-    BrokerInfo::Set otherBackups(sys::Mutex::ScopedLock&) const;
-    types::Variant::List asList(sys::Mutex::ScopedLock&) const;
-    void update(sys::Mutex::ScopedLock&);
-
-    mutable sys::Mutex lock;
     types::Uuid self;
     BrokerInfo::Map brokers;
-    UpdateCallback updateCallback;
 };
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1353001&r1=1353000&r2=1353001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Fri Jun 22 19:28:42 2012
@@ -114,7 +114,7 @@ void Primary::checkReady(BackupMap::iter
     if (i != backups.end() && i->second->isReady()) {
         BrokerInfo info = i->second->getBrokerInfo();
         info.setStatus(READY);
-        haBroker.getMembership().add(info);
+        haBroker.addBroker(info);
         initialBackups.erase(i->second);
         checkReady(l);
     }
@@ -160,7 +160,7 @@ void Primary::opened(broker::Connection&
         else {
             QPID_LOG(debug, logPrefix << "Known backup connected: " << info);
         }
-        haBroker.getMembership().add(info);
+        haBroker.addBroker(info);
     }
 }
 
@@ -168,7 +168,7 @@ void Primary::closed(broker::Connection&
     Mutex::ScopedLock l(lock);
     BrokerInfo info;
     if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
-        haBroker.getMembership().remove(info.getSystemId());
+        haBroker.removeBroker(info.getSystemId());
         QPID_LOG(debug, logPrefix << "Backup disconnected: " << info);
     }
     // NOTE: we do not modify backups here, we only add to the known backups 
set



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to