Author: aconway Date: Wed Nov 25 18:36:09 2009 New Revision: 884226 URL: http://svn.apache.org/viewvc?rev=884226&view=rev Log: Consistency checks for persistent cluster startup.
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h qpid/trunk/qpid/cpp/src/tests/cluster_tests.py qpid/trunk/qpid/cpp/xml/cluster.xml qpid/trunk/qpid/python/qpid/brokertest.py Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=884226&r1=884225&r2=884226&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Nov 25 18:36:09 2009 @@ -175,7 +175,7 @@ * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 835547; +const uint32_t Cluster::CLUSTER_VERSION = 884125; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -202,7 +202,7 @@ cluster.errorCheck(member, type, frameSeq, l); } - void shutdown() { cluster.shutdown(member, l); } + void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; @@ -287,7 +287,7 @@ default: assert(0); } - QPID_LOG(notice, *this << (state == READY ? "joined" : "joining") << " cluster " << name << " with url=" << myUrl); + QPID_LOG(notice, *this << (state == READY ? " joined" : " joining") << " cluster " << name); broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); @@ -601,6 +601,7 @@ // Called on completion of the initial status map. if (state == INIT) { // We have status for all members so we can make join descisions. + initMap.checkConsistent(); elders = initMap.getElders(); QPID_LOG(debug, *this << " elders: " << elders); if (!elders.empty()) { // I'm not the elder, I don't handle links & replication. @@ -611,17 +612,8 @@ else { QPID_LOG(info, this << " active for links."); } - // Check that cluster ID matches persistent store. - Uuid agreedId = initMap.getClusterId(); - if (store.hasStore()) { - Uuid storeId = store.getClusterId(); - if (storeId && storeId != agreedId) - throw Exception( - QPID_MSG("Persistent cluster-id " << storeId - << " doesn't match cluster " << agreedId)); - store.dirty(agreedId); - } - setClusterId(agreedId, l); + setClusterId(initMap.getClusterId(), l); + if (store.hasStore()) store.dirty(clusterId); if (initMap.isUpdateNeeded()) { // Joining established cluster. broker.setRecovery(false); // Ditch my current store. @@ -822,13 +814,13 @@ mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; discarding = false; // ok to set, we're stalled for update. - QPID_LOG(notice, *this << " update complete, starting catch-up, members: " << map); + QPID_LOG(notice, *this << " update complete, starting catch-up."); deliverEventQueue.start(); } else if (updateRetracted) { // Update was retracted, request another update updateRetracted = false; state = JOINER; - QPID_LOG(notice, *this << " update retracted, sending new update request"); + QPID_LOG(notice, *this << " update retracted, sending new update request."); mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); deliverEventQueue.start(); } @@ -853,10 +845,9 @@ updateOutDone(l); } -void Cluster ::shutdown(const MemberId& , Lock& l) { +void Cluster ::shutdown(const MemberId& , const Uuid& id, Lock& l) { QPID_LOG(notice, *this << " cluster shut down by administrator."); - // FIXME aconway 2009-11-20: incorrect! Need to pass UUID on shutdown command. - if (store.hasStore()) store.clean(Uuid(true)); + if (store.hasStore()) store.clean(Uuid(id)); leave(l); } @@ -885,13 +876,13 @@ } void Cluster::stopClusterNode(Lock& l) { - QPID_LOG(notice, *this << " stopped by admin"); + QPID_LOG(notice, *this << " cluster member stopped by administrator."); leave(l); } void Cluster::stopFullCluster(Lock& ) { QPID_LOG(notice, *this << " shutting down cluster " << name); - mcast.mcastControl(ClusterShutdownBody(), self); + mcast.mcastControl(ClusterShutdownBody(ProtocolVersion(), Uuid(true)), self); } void Cluster::memberUpdate(Lock& l) { Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=884226&r1=884225&r2=884226&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Nov 25 18:36:09 2009 @@ -160,7 +160,7 @@ void messageExpired(const MemberId&, uint64_t, Lock& l); void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&); - void shutdown(const MemberId&, Lock&); + void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&); // Helper functions ConnectionPtr getConnection(const EventFrame&, Lock&); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp?rev=884226&r1=884225&r2=884226&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp Wed Nov 25 18:36:09 2009 @@ -29,6 +29,7 @@ using namespace std; using namespace boost; using namespace framing::cluster; +using namespace framing; InitialStatusMap::InitialStatusMap(const MemberId& self_, size_t size_) : self(self_), completed(), resendNeeded(), size(size_) @@ -106,7 +107,6 @@ } bool InitialStatusMap::isUpdateNeeded() { - // FIXME aconway 2009-11-20: consistency checks isComplete or here? assert(isComplete()); // We need an update if there are any active members. if (find_if(map.begin(), map.end(), &isActive) != map.end()) return true; @@ -145,7 +145,43 @@ if (i != map.end()) return i->second->getClusterId(); // An active member else - return map.begin()->second->getClusterId(); + return map.begin()->second->getClusterId(); // Youngest newcomer in node-id order } +void InitialStatusMap::checkConsistent() { + assert(isComplete()); + bool persistent = (map.begin()->second->getStoreState() != STORE_STATE_NO_STORE); + Uuid clusterId; + for (Map::iterator i = map.begin(); i != map.end(); ++i) { + // Must not mix transient and persistent members. + if (persistent != (i->second->getStoreState() != STORE_STATE_NO_STORE)) + throw Exception("Mixing transient and persistent brokers in a cluster"); + // Members with non-empty stores must have same cluster-id + switch (i->second->getStoreState()) { + case STORE_STATE_NO_STORE: + case STORE_STATE_EMPTY_STORE: + break; + case STORE_STATE_DIRTY_STORE: + case STORE_STATE_CLEAN_STORE: + if (!clusterId) clusterId = i->second->getClusterId(); + assert(clusterId); + if (clusterId != i->second->getClusterId()) + throw Exception("Cluster-id mismatch, brokers belonged to different clusters."); + } + } + // If this is a newly forming cluster, clean stores must have same shutdown-id + if (find_if(map.begin(), map.end(), &isActive) == map.end()) { + Uuid shutdownId; + for (Map::iterator i = map.begin(); i != map.end(); ++i) { + if (i->second->getStoreState() == STORE_STATE_CLEAN_STORE) { + if (!shutdownId) shutdownId = i->second->getShutdownId(); + assert(shutdownId); + if (shutdownId != i->second->getShutdownId()) + throw Exception("Shutdown-id mismatch, brokers were not shut down together."); + } + } + } +} + + }} // namespace qpid::cluster Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h?rev=884226&r1=884225&r2=884226&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h Wed Nov 25 18:36:09 2009 @@ -56,13 +56,14 @@ bool isUpdateNeeded(); /*...@pre isComplete(). @return Cluster-wide cluster ID. */ framing::Uuid getClusterId(); + /*...@pre isComplete(). @throw Exception if there are any inconsistencies. */ + void checkConsistent(); private: typedef std::map<MemberId, boost::optional<Status> > Map; static bool notInitialized(const Map::value_type&); static bool isActive(const Map::value_type&); static bool hasStore(const Map::value_type&); - void check(); Map map; MemberSet firstConfig; MemberId self; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp?rev=884226&r1=884225&r2=884226&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp Wed Nov 25 18:36:09 2009 @@ -85,8 +85,6 @@ } void StoreStatus::clean(const Uuid& shutdownId_) { - assert(clusterId); // FIXME aconway 2009-11-20: throw exception - assert(shutdownId_); state = STORE_STATE_CLEAN_STORE; shutdownId = shutdownId_; save(); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h?rev=884226&r1=884225&r2=884226&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h Wed Nov 25 18:36:09 2009 @@ -50,7 +50,6 @@ void save(); bool hasStore() { return state != framing::cluster::STORE_STATE_NO_STORE; } - bool isEmpty() { return state != framing::cluster::STORE_STATE_EMPTY_STORE; } private: framing::cluster::StoreState state; Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=884226&r1=884225&r2=884226&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original) +++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Wed Nov 25 18:36:09 2009 @@ -122,9 +122,9 @@ def test_persistent_restart(self): """Verify persistent cluster shutdown/restart scenarios""" cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"]) - a = cluster.start("a", expect=EXPECT_EXIT_OK, wait_for_start=False) - b = cluster.start("b", expect=EXPECT_EXIT_OK, wait_for_start=False) - c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait_for_start=True) + a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) + b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) + c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait=True) a.send_message("q", Message("1", durable=True)) # Kill & restart one member. c.kill() @@ -135,30 +135,30 @@ # Shut down the entire cluster cleanly and bring it back up a.send_message("q", Message("3", durable=True)) qpid_cluster.main(["qpid-cluster", "-kf", a.host_port()]) - a = cluster.start("a", wait_for_start=False) - b = cluster.start("b", wait_for_start=False) - c = cluster.start("c", wait_for_start=True) + a = cluster.start("a", wait=False) + b = cluster.start("b", wait=False) + c = cluster.start("c", wait=True) self.assertEqual(a.get_message("q").content, "3") def test_persistent_partial_failure(self): # Kill 2 members, shut down the last cleanly then restart # Ensure we use the clean database cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"]) - a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait_for_start=False) - b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait_for_start=False) - c = cluster.start("c", expect=EXPECT_EXIT_OK, wait_for_start=True) + a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False) + b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False) + c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=True) a.send_message("q", Message("4", durable=True)) a.kill() b.kill() self.assertEqual(c.get_message("q").content, "4") c.send_message("q", Message("clean", durable=True)) qpid_cluster.main(["qpid-cluster", "-kf", c.host_port()]) - a = cluster.start("a", wait_for_start=False) - b = cluster.start("b", wait_for_start=False) - c = cluster.start("c", wait_for_start=True) + a = cluster.start("a", wait=False) + b = cluster.start("b", wait=False) + c = cluster.start("c", wait=True) self.assertEqual(a.get_message("q").content, "clean") - def test_wrong_store_uuid(self): + def test_wrong_cluster_id(self): # Start a cluster1 broker, then try to restart in cluster2 cluster1 = self.cluster(0, args=self.args()) a = cluster1.start("a", expect=EXPECT_EXIT_OK) @@ -168,4 +168,25 @@ a = cluster2.start("a", expect=EXPECT_EXIT_FAIL) self.fail("Expected exception") except: pass - + + def test_wrong_shutdown_id(self): + # Start 2 members and shut down. + cluster = self.cluster(0, args=self.args()+["--cluster-size=2"]) + a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) + b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) + self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()])) + self.assertEqual(a.wait(), 0) + self.assertEqual(b.wait(), 0) + + # Restart with a different member and shut down. + a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) + c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=False) + self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()])) + self.assertEqual(a.wait(), 0) + self.assertEqual(c.wait(), 0) + + # Mix members from both shutdown events, they should fail + a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False) + b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False) + + Modified: qpid/trunk/qpid/cpp/xml/cluster.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=884226&r1=884225&r2=884226&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/xml/cluster.xml (original) +++ qpid/trunk/qpid/cpp/xml/cluster.xml Wed Nov 25 18:36:09 2009 @@ -92,9 +92,11 @@ <field name="type" type="error-type"/> <field name="frame-seq" type="sequence-no"/> </control> - - <control name="shutdown" code="0x20" label="Shut down entire cluster"/> + <!-- Shut down the entire cluster --> + <control name="shutdown" code="0x20"> + <field name="shutdown-id" type="uuid"/> + </control> </class> Modified: qpid/trunk/qpid/python/qpid/brokertest.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=884226&r1=884225&r2=884226&view=diff ============================================================================== --- qpid/trunk/qpid/python/qpid/brokertest.py (original) +++ qpid/trunk/qpid/python/qpid/brokertest.py Wed Nov 25 18:36:09 2009 @@ -215,7 +215,7 @@ _cluster_count = 0 - def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait_for_start=True): + def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True): self.test = test self._brokers=[] self.name = "cluster%d" % Cluster._cluster_count @@ -225,17 +225,17 @@ self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ] assert BrokerTest.cluster_lib self.args += [ "--load-module", BrokerTest.cluster_lib ] - self.start_n(count, expect=expect, wait_for_start=wait_for_start) + self.start_n(count, expect=expect, wait=wait) - def start(self, name=None, expect=EXPECT_RUNNING, wait_for_start=True): + def start(self, name=None, expect=EXPECT_RUNNING, wait=True): """Add a broker to the cluster. Returns the index of the new broker.""" if not name: name="%s-%d" % (self.name, len(self._brokers)) log.debug("Cluster %s starting member %s" % (self.name, name)) - self._brokers.append(self.test.broker(self.args, name, expect, wait_for_start)) + self._brokers.append(self.test.broker(self.args, name, expect, wait)) return self._brokers[-1] - def start_n(self, count, expect=EXPECT_RUNNING, wait_for_start=True): - for i in range(count): self.start(expect=expect, wait_for_start=wait_for_start) + def start_n(self, count, expect=EXPECT_RUNNING, wait=True): + for i in range(count): self.start(expect=expect, wait=wait) # Behave like a list of brokers. def __len__(self): return len(self._brokers) @@ -275,8 +275,6 @@ except Exception, e: err.append(str(e)) if err: raise Exception("Unexpected process status:\n "+"\n ".join(err)) - # FIXME aconway 2009-11-06: check for core files of exited processes. - def cleanup_stop(self, stopable): """Call thing.stop at end of test""" self.stopem.append(stopable) @@ -288,17 +286,21 @@ self.cleanup_stop(p) return p - def broker(self, args=[], name=None, expect=EXPECT_RUNNING,wait_for_start=True): + def broker(self, args=[], name=None, expect=EXPECT_RUNNING,wait=True): """Create and return a broker ready for use""" b = Broker(self, args=args, name=name, expect=expect) - if (wait_for_start): b.connect().close() + if (wait): b.connect().close() return b - def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait_for_start=True): + def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True): """Create and return a cluster ready for use""" - cluster = Cluster(self, count, args, expect=expect, wait_for_start=wait_for_start) + cluster = Cluster(self, count, args, expect=expect, wait=wait) return cluster + def wait(): + """Wait for all brokers in the cluster to be ready""" + for b in _brokers: b.connect().close() + class RethrownException(Exception): """Captures the original stack trace to be thrown later""" def __init__(self, e, msg=""): --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org