Author: aconway
Date: Fri Jun 22 19:28:42 2012
New Revision: 1353001
URL: http://svn.apache.org/viewvc?rev=1353001&view=rev
Log:
NO-JIRA: Simplify locking and remove member-update callback in HA code.
Get rid of the separate Membership lock and put HaBroker in control of
membership changes. Removes a potential deadlock which could explain
some observed failures in long-running failover tests.
Modified:
qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h
qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1353001&r1=1353000&r2=1353001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp Fri Jun 22 19:28:42 2012
@@ -83,6 +83,8 @@ void Backup::initialize(const Url& broke
false, // durable
settings.mechanism, settings.username, settings.password,
false); // amq.failover
+
+ sys::Mutex::ScopedLock l(lock);
link = result.first;
link->setUrl(url);
replicator.reset(new BrokerReplicator(haBroker, link));
@@ -93,7 +95,6 @@ void Backup::initialize(const Url& broke
Backup::~Backup() {
if (link) link->close();
if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
- replicator.reset();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1353001&r1=1353000&r2=1353001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Fri Jun 22 19:28:42
2012
@@ -546,7 +546,7 @@ void BrokerReplicator::doResponseHaBroke
if (mine != primary)
throw Exception(QPID_MSG("Replicate default on backup (" << mine
<< ") does not match primary (" <<
primary << ")"));
- haBroker.getMembership().assign(values[MEMBERS].asList());
+ haBroker.setMembership(values[MEMBERS].asList());
} catch (const std::exception& e) {
QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " <<
e.what());
haBroker.shutdown();
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1353001&r1=1353000&r2=1353001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Fri Jun 22 19:28:42 2012
@@ -66,7 +66,7 @@ HaBroker::HaBroker(broker::Broker& b, co
brokerInfo(broker.getSystem()->getNodeName(),
// TODO aconway 2012-05-24: other transports?
broker.getPort(broker::Broker::TCP_TRANSPORT), systemId),
- membership(systemId, boost::bind(&HaBroker::membershipUpdate, this, _1)),
+ membership(systemId),
replicationTest(s.replicateDefault.get())
{
// Set up the management object.
@@ -271,17 +271,53 @@ void HaBroker::statusChanged(Mutex::Scop
setLinkProperties(l);
}
-void HaBroker::membershipUpdate(const Variant::List& brokers) {
- // FIXME aconway 2012-06-12: nasty callback in callback, clean up.
- BrokerInfo info;
- if (getStatus() == CATCHUP && getMembership().get(systemId, info) &&
info.getStatus() == READY)
- setStatus(READY);
-
- // No lock, only calls thread-safe objects.
+void HaBroker::membershipUpdated(const Variant::List& brokers) {
+ // No lock, these are thread-safe.
mgmtObject->set_members(brokers);
broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
}
+void HaBroker::setMembership(const Variant::List& brokers) {
+ Mutex::ScopedLock l(lock);
+ membership.assign(brokers);
+ BrokerInfo info;
+ // Check if my own status has been updated to READY
+ if (getStatus() == CATCHUP &&
+ membership.get(systemId, info) && info.getStatus() == READY)
+ setStatus(READY, l);
+ membershipUpdated(brokers);
+}
+
+void HaBroker::resetMembership(const BrokerInfo& b) {
+ Variant::List members;
+ {
+ Mutex::ScopedLock l(lock);
+ membership.reset(b);
+ members = membership.asList();
+ }
+ membershipUpdated(members);
+}
+
+void HaBroker::addBroker(const BrokerInfo& b) {
+ Variant::List members;
+ {
+ Mutex::ScopedLock l(lock);
+ membership.add(b);
+ members = membership.asList();
+ }
+ membershipUpdated(members);
+}
+
+void HaBroker::removeBroker(const Uuid& id) {
+ Variant::List members;
+ {
+ Mutex::ScopedLock l(lock);
+ membership.remove(id);
+ members = membership.asList();
+ }
+ membershipUpdated(members);
+}
+
void HaBroker::setLinkProperties(Mutex::ScopedLock&) {
framing::FieldTable linkProperties = broker.getLinkClientProperties();
if (isBackup(status)) {
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1353001&r1=1353000&r2=1353001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Fri Jun 22 19:28:42 2012
@@ -87,8 +87,11 @@ class HaBroker : public management::Mana
boost::shared_ptr<ConnectionObserver> getObserver() { return observer; }
const BrokerInfo& getBrokerInfo() const { return brokerInfo; }
- Membership& getMembership() { return membership; }
- void membershipUpdate(const types::Variant::List&);
+
+ void setMembership(const types::Variant::List&); // Set membership from
list.
+ void resetMembership(const BrokerInfo& b); // Reset to contain just one
member.
+ void addBroker(const BrokerInfo& b); // Add a broker to the
membership.
+ void removeBroker(const types::Uuid& id); // Remove a broker from
membership.
private:
void setClientUrl(const Url&, sys::Mutex::ScopedLock&);
@@ -105,6 +108,8 @@ class HaBroker : public management::Mana
std::vector<Url> getKnownBrokers() const;
+ void membershipUpdated(const types::Variant::List&);
+
std::string logPrefix;
broker::Broker& broker;
types::Uuid systemId;
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp?rev=1353001&r1=1353000&r2=1353001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp Fri Jun 22 19:28:42 2012
@@ -28,70 +28,42 @@ namespace ha {
void Membership::reset(const BrokerInfo& b) {
- sys::Mutex::ScopedLock l(lock);
brokers.clear();
brokers[b.getSystemId()] = b;
- update(l);
}
void Membership::add(const BrokerInfo& b) {
- sys::Mutex::ScopedLock l(lock);
brokers[b.getSystemId()] = b;
- update(l);
}
void Membership::remove(const types::Uuid& id) {
- sys::Mutex::ScopedLock l(lock);
BrokerInfo::Map::iterator i = brokers.find(id);
if (i != brokers.end()) {
brokers.erase(i);
- update(l);
- }
+ }
}
bool Membership::contains(const types::Uuid& id) {
- sys::Mutex::ScopedLock l(lock);
return brokers.find(id) != brokers.end();
}
void Membership::assign(const types::Variant::List& list) {
- sys::Mutex::ScopedLock l(lock);
brokers.clear();
for (types::Variant::List::const_iterator i = list.begin(); i !=
list.end(); ++i) {
BrokerInfo b(i->asMap());
brokers[b.getSystemId()] = b;
}
- update(l);
}
types::Variant::List Membership::asList() const {
- sys::Mutex::ScopedLock l(lock);
- return asList(l);
-}
-
-types::Variant::List Membership::asList(sys::Mutex::ScopedLock&) const {
types::Variant::List list;
for (BrokerInfo::Map::const_iterator i = brokers.begin(); i !=
brokers.end(); ++i)
list.push_back(i->second.asMap());
return list;
}
-void Membership::update(sys::Mutex::ScopedLock& l) {
- if (updateCallback) {
- types::Variant::List list = asList(l);
- sys::Mutex::ScopedUnlock u(lock);
- updateCallback(list);
- }
- QPID_LOG(debug, " HA: Membership update: " << brokers);
-}
-
BrokerInfo::Set Membership::otherBackups() const {
- sys::Mutex::ScopedLock l(lock);
- return otherBackups(l);
-}
-
-BrokerInfo::Set Membership::otherBackups(sys::Mutex::ScopedLock&) const {
BrokerInfo::Set result;
for (BrokerInfo::Map::const_iterator i = brokers.begin(); i !=
brokers.end(); ++i)
if (isBackup(i->second.getStatus()) && i->second.getSystemId() != self)
@@ -100,7 +72,6 @@ BrokerInfo::Set Membership::otherBackups
}
bool Membership::get(const types::Uuid& id, BrokerInfo& result) {
- sys::Mutex::ScopedLock l(lock);
BrokerInfo::Map::iterator i = brokers.find(id);
if (i == brokers.end()) return false;
result = i->second;
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h?rev=1353001&r1=1353000&r2=1353001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h Fri Jun 22 19:28:42 2012
@@ -36,14 +36,12 @@ namespace ha {
/**
* Keep track of the brokers in the membership.
- * THREAD SAFE: updated in arbitrary connection threads.
+ * THREAD UNSAFE: caller must serialize
*/
class Membership
{
public:
- typedef boost::function<void (const types::Variant::List&) >
UpdateCallback;
- Membership(const types::Uuid& self_, UpdateCallback updateFn)
- : self(self_), updateCallback(updateFn) {}
+ Membership(const types::Uuid& self_) : self(self_) {}
void reset(const BrokerInfo& b); ///< Reset to contain just one member.
void add(const BrokerInfo& b);
@@ -58,14 +56,8 @@ class Membership
bool get(const types::Uuid& id, BrokerInfo& result);
private:
- BrokerInfo::Set otherBackups(sys::Mutex::ScopedLock&) const;
- types::Variant::List asList(sys::Mutex::ScopedLock&) const;
- void update(sys::Mutex::ScopedLock&);
-
- mutable sys::Mutex lock;
types::Uuid self;
BrokerInfo::Map brokers;
- UpdateCallback updateCallback;
};
}} // namespace qpid::ha
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1353001&r1=1353000&r2=1353001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Fri Jun 22 19:28:42 2012
@@ -114,7 +114,7 @@ void Primary::checkReady(BackupMap::iter
if (i != backups.end() && i->second->isReady()) {
BrokerInfo info = i->second->getBrokerInfo();
info.setStatus(READY);
- haBroker.getMembership().add(info);
+ haBroker.addBroker(info);
initialBackups.erase(i->second);
checkReady(l);
}
@@ -160,7 +160,7 @@ void Primary::opened(broker::Connection&
else {
QPID_LOG(debug, logPrefix << "Known backup connected: " << info);
}
- haBroker.getMembership().add(info);
+ haBroker.addBroker(info);
}
}
@@ -168,7 +168,7 @@ void Primary::closed(broker::Connection&
Mutex::ScopedLock l(lock);
BrokerInfo info;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
- haBroker.getMembership().remove(info.getSystemId());
+ haBroker.removeBroker(info.getSystemId());
QPID_LOG(debug, logPrefix << "Backup disconnected: " << info);
}
// NOTE: we do not modify backups here, we only add to the known backups
set
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]