Author: aconway
Date: Wed Jan  6 17:01:50 2010
New Revision: 896538

URL: http://svn.apache.org/viewvc?rev=896538&view=rev
Log:
    Added config-seq counter to track config changes since cluster init.

    Config-seq is recorded persitently to help identify best store when
    recovering from total failure.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h
    qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/xml/cluster.xml
    qpid/trunk/qpid/python/qpid/brokertest.py

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=896538&r1=896537&r2=896538&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jan  6 17:01:50 2010
@@ -186,10 +186,12 @@
     void updateRequest(const std::string& url) { cluster.updateRequest(member, 
url, l); }
 
     void initialStatus(uint32_t version, bool active, const Uuid& clusterId,
-                       uint8_t storeState, const Uuid& shutdownId)
+                       uint8_t storeState, const Uuid& shutdownId,
+                       const framing::SequenceNumber& configSeq)
     {
-        cluster.initialStatus(member, version, active, clusterId, 
-                              framing::cluster::StoreState(storeState), 
shutdownId, l);
+        cluster.initialStatus(
+            member, version, active, clusterId,
+            framing::cluster::StoreState(storeState), shutdownId, configSeq, 
l);
     }
     void ready(const std::string& url) { cluster.ready(member, url, l); }
     void configChange(const std::string& current) { 
cluster.configChange(member, current, l); }
@@ -521,28 +523,17 @@
 struct AddrList {
     const cpg_address* addrs;
     int count;
-    const char *prefix, *suffix;
-    AddrList(const cpg_address* a, int n, const char* p="", const char* s="")
-        : addrs(a), count(n), prefix(p), suffix(s) {}
+    const char *prefix;
+    AddrList(const cpg_address* a, int n, const char* p="")
+        : addrs(a), count(n), prefix(p) {}
 };
 
 ostream& operator<<(ostream& o, const AddrList& a) {
     if (!a.count) return o;
     o << a.prefix;
-    for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
-        const char* reasonString;
-        switch (p->reason) {
-          case CPG_REASON_JOIN: reasonString =  "(joined) "; break;
-          case CPG_REASON_LEAVE: reasonString =  "(left) "; break;
-          case CPG_REASON_NODEDOWN: reasonString =  "(node-down) "; break;
-          case CPG_REASON_NODEUP: reasonString =  "(node-up) "; break;
-          case CPG_REASON_PROCDOWN: reasonString =  "(process-down) "; break;
-          default: reasonString = " ";
-        }
-        qpid::cluster::MemberId member(*p);
-        o << member << reasonString;
-    }
-    return o << a.suffix;
+    for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p)
+        o << qpid::cluster::MemberId(*p) << " ";
+    return o;
 }
 
 void Cluster::configChange ( 
@@ -599,6 +590,8 @@
         }
         else {                  // I can go ready.
             discarding = false;
+            map.resetConfigSeq(); // Start from config-seq = 0
+            store.setConfigSeq(map.getConfigSeq());
             setReady(l);
             memberUpdate(l);
             mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), 
myUrl.str()), self);
@@ -618,6 +611,8 @@
         return;
     }
     bool memberChange = map.configChange(config);
+    QPID_LOG(debug, "Config sequence " << map.getConfigSeq());
+    store.setConfigSeq(map.getConfigSeq());
 
     // Update initital status for new members joining.
     initMap.configChange(config);
@@ -625,7 +620,7 @@
         mcast.mcastControl(
             ClusterInitialStatusBody(
                 ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, 
-                store.getState(), store.getShutdownId()
+                store.getState(), store.getShutdownId(), store.getConfigSeq()
             ),
             self);
     }
@@ -671,6 +666,7 @@
                             const framing::Uuid& id, 
                             framing::cluster::StoreState store,
                             const framing::Uuid& shutdownId,
+                            const framing::SequenceNumber& configSeq,
                             Lock& l)
 {
     if (version != CLUSTER_VERSION) {
@@ -681,7 +677,8 @@
     }
     initMap.received(
         member,
-        ClusterInitialStatusBody(ProtocolVersion(), version, active, id, 
store, shutdownId)
+        ClusterInitialStatusBody(ProtocolVersion(), version, active, id,
+                                 store, shutdownId, configSeq)
     );
     if (initMap.transitionToComplete()) initMapCompleted(l);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=896538&r1=896537&r2=896538&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Jan  6 17:01:50 2010
@@ -154,6 +154,7 @@
                        const framing::Uuid& clusterId,
                        framing::cluster::StoreState,
                        const framing::Uuid& shutdownId,
+                       const framing::SequenceNumber& configSeq,
                        Lock&);
     void ready(const MemberId&, const std::string&, Lock&);
     void configChange(const MemberId&, const std::string& current, Lock& l);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=896538&r1=896537&r2=896538&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Wed Jan  6 17:01:50 2010
@@ -57,15 +57,17 @@
 
 }
 
-ClusterMap::ClusterMap() : frameSeq(0) {}
+ClusterMap::ClusterMap() : frameSeq(0), configSeq(0) {}
 
-ClusterMap::ClusterMap(const Map& map) : frameSeq(0) {
+ClusterMap::ClusterMap(const Map& map) : frameSeq(0), configSeq(0) {
     transform(map.begin(), map.end(), inserter(alive, alive.begin()), 
bind(&Map::value_type::first, _1));
     members = map;
 }
 
-ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& 
membersFt, framing::SequenceNumber frameSeq_)
-  : frameSeq(frameSeq_)
+ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& 
membersFt,
+                       framing::SequenceNumber frameSeq_,
+                       framing::SequenceNumber configSeq_)
+    : frameSeq(frameSeq_), configSeq(configSeq_)
 {
     for_each(joinersFt.begin(), joinersFt.end(), bind(&addFieldTableValue, _1, 
ref(joiners), ref(alive)));
     for_each(membersFt.begin(), membersFt.end(), bind(&addFieldTableValue, _1, 
ref(members), ref(alive)));
@@ -81,6 +83,7 @@
     b.getMembers().clear();
     for_each(members.begin(), members.end(), 
bind(&insertFieldTableFromMapValue, ref(b.getMembers()), _1));
     b.setFrameSeq(frameSeq);
+    b.setConfigSeq(configSeq);
 }
 
 Url ClusterMap::getUrl(const Map& map, const  MemberId& id) {
@@ -133,6 +136,7 @@
         else o << "(unknown)";
         o << " ";
     }
+    o << "frameSeq=" << m.getFrameSeq() << " configSeq=" << m.getConfigSeq();
     return o;
 }
 
@@ -153,6 +157,7 @@
 }
 
 bool ClusterMap::configChange(const Set& update) {
+    ++configSeq;
     bool memberChange = false;
     Set removed;
     set_difference(alive.begin(), alive.end(),

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=896538&r1=896537&r2=896538&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Wed Jan  6 17:01:50 2010
@@ -49,7 +49,8 @@
 
     ClusterMap();
     ClusterMap(const Map& map);
-    ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& 
members, framing::SequenceNumber frameSeq);
+    ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& 
members,
+               framing::SequenceNumber frameSeq, framing::SequenceNumber 
configSeq);
 
     /** Update from config change.
      *...@return true if member set changed.
@@ -83,8 +84,10 @@
     /*...@return true If this is a new member */ 
     bool ready(const MemberId& id, const Url&);
 
-    framing::SequenceNumber getFrameSeq() { return frameSeq; }
+    framing::SequenceNumber getFrameSeq() const { return frameSeq; }
     framing::SequenceNumber incrementFrameSeq() { return ++frameSeq; }
+    framing::SequenceNumber getConfigSeq() const { return configSeq; }
+    void resetConfigSeq() { configSeq = 0; }
     
     /** Clear out all knowledge of joiners & members, just keep alive set */
     void clearStatus() { joiners.clear(); members.clear(); }
@@ -94,7 +97,7 @@
     
     Map joiners, members;
     Set alive;
-    framing::SequenceNumber frameSeq;
+    framing::SequenceNumber frameSeq, configSeq;
 
   friend std::ostream& operator<<(std::ostream&, const Map&);
   friend std::ostream& operator<<(std::ostream&, const ClusterMap&);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=896538&r1=896537&r2=896538&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Jan  6 17:01:50 2010
@@ -322,9 +322,12 @@
     output.setSendMax(sendMax);
 }
 
-void Connection::membership(const FieldTable& joiners, const FieldTable& 
members, const framing::SequenceNumber& frameSeq) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& 
members,
+                            const framing::SequenceNumber& frameSeq,
+                            const framing::SequenceNumber& configSeq)
+{
     QPID_LOG(debug, cluster << " incoming update complete on connection " << 
*this);
-    cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
+    cluster.updateInDone(ClusterMap(joiners, members, frameSeq, configSeq));
     consumerNumbering.clear();
     self.second = 0;        // Mark this as completed update connection.
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=896538&r1=896537&r2=896538&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Jan  6 17:01:50 2010
@@ -125,7 +125,9 @@
     
     void shadowReady(uint64_t memberId, uint64_t connectionId, const 
std::string& username, const std::string& fragment, uint32_t sendMax);
 
-    void membership(const framing::FieldTable&, const framing::FieldTable&, 
const framing::SequenceNumber& frameSeq);
+    void membership(const framing::FieldTable&, const framing::FieldTable&,
+                    const framing::SequenceNumber& frameSeq,
+                    const framing::SequenceNumber& configSeq);
 
     void retractOffer();
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp?rev=896538&r1=896537&r2=896538&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp Wed Jan  6 
17:01:50 2010
@@ -20,7 +20,9 @@
  */
 #include "InitialStatusMap.h"
 #include "StoreStatus.h"
+#include "qpid/log/Statement.h"
 #include <algorithm>
+#include <vector>
 #include <boost/bind.hpp>
 
 namespace qpid {
@@ -138,7 +140,7 @@
 }
 
 // Get cluster ID from an active member or the youngest newcomer.
-framing::Uuid InitialStatusMap::getClusterId() {
+Uuid InitialStatusMap::getClusterId() {
     assert(isComplete());
     assert(!map.empty());
     Map::iterator i = find_if(map.begin(), map.end(), &isActive);
@@ -166,6 +168,7 @@
     Uuid shutdownId;
 
     for (Map::iterator i = map.begin(); i != map.end(); ++i) {
+        assert(i->second);
         if (i->second->getActive()) ++active;
         switch (i->second->getStoreState()) {
           case STORE_STATE_NO_STORE: ++none; break;
@@ -187,10 +190,11 @@
     // Can't mix transient and persistent members.
     if (none && (clean+dirty+empty))
         throw Exception("Mixing transient and persistent brokers in a 
cluster");
+
     // If there are no active members and there are dirty stores there
     // must be at least one clean store.
     if (!active && dirty && !clean)
-        throw Exception("Cannot recover, no clean store");
+        throw Exception("Cannot recover, no clean store.");
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp?rev=896538&r1=896537&r2=896538&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp Wed Jan  6 17:01:50 
2010
@@ -34,7 +34,7 @@
 using std::ostream;
 
 StoreStatus::StoreStatus(const std::string& d)
-    : state(STORE_STATE_NO_STORE), dataDir(d)
+    : state(STORE_STATE_NO_STORE), dataDir(d), configSeq(0)
 {}
 
 namespace {
@@ -42,6 +42,7 @@
 const char* SUBDIR="cluster";
 const char* CLUSTER_ID_FILE="cluster.uuid";
 const char* SHUTDOWN_ID_FILE="shutdown.uuid";
+const char* CONFIG_SEQ_FILE="config.seq";
 
 Uuid loadUuid(const fs::path& path) {
     Uuid ret;
@@ -62,23 +63,39 @@
 
 void StoreStatus::load() {
     fs::path dir = fs::path(dataDir, fs::native)/SUBDIR;
-    create_directory(dir);
-    clusterId = loadUuid(dir/CLUSTER_ID_FILE);
-    shutdownId = loadUuid(dir/SHUTDOWN_ID_FILE);
-
-    if (clusterId && shutdownId) state = STORE_STATE_CLEAN_STORE;
-    else if (clusterId) state = STORE_STATE_DIRTY_STORE;
-    else state = STORE_STATE_EMPTY_STORE;
+    try {
+        create_directory(dir);
+        clusterId = loadUuid(dir/CLUSTER_ID_FILE);
+        shutdownId = loadUuid(dir/SHUTDOWN_ID_FILE);
+        fs::ifstream is(dir/CONFIG_SEQ_FILE);
+        uint32_t n;
+        is >> n;
+        configSeq = framing::SequenceNumber(n);
+        if (clusterId && shutdownId) state = STORE_STATE_CLEAN_STORE;
+        else if (clusterId) state = STORE_STATE_DIRTY_STORE;
+        else state = STORE_STATE_EMPTY_STORE;
+    }
+    catch (const std::exception&e) {
+        throw Exception(QPID_MSG("Cannot load cluster store status: " << 
e.what()));
+    }
 }
 
 void StoreStatus::save() {
     fs::path dir = fs::path(dataDir, fs::native)/SUBDIR;
-    create_directory(dir);
-    saveUuid(dir/CLUSTER_ID_FILE, clusterId);
-    saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId);
+    try {
+        create_directory(dir);
+        saveUuid(dir/CLUSTER_ID_FILE, clusterId);
+        saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId);
+        fs::ofstream os(dir/CONFIG_SEQ_FILE);
+        os << configSeq.getValue();
+    }
+    catch (const std::exception&e) {
+        throw Exception(QPID_MSG("Cannot save cluster store status: " << 
e.what()));
+    }
 }
 
 void StoreStatus::dirty(const Uuid& clusterId_) {
+    assert(clusterId_);
     clusterId = clusterId_;
     shutdownId = Uuid();
     state = STORE_STATE_DIRTY_STORE;
@@ -86,22 +103,38 @@
 }
 
 void StoreStatus::clean(const Uuid& shutdownId_) {
+    assert(shutdownId_);
     state = STORE_STATE_CLEAN_STORE;
     shutdownId = shutdownId_;
     save();
 }
 
+void StoreStatus::setConfigSeq(framing::SequenceNumber seq) {
+    configSeq = seq;
+    save();
+}
+
+const char* stateName(StoreState s) {
+    switch (s) {
+      case STORE_STATE_NO_STORE: return "none";
+      case STORE_STATE_EMPTY_STORE: return "empty";
+      case STORE_STATE_DIRTY_STORE: return "dirty";
+      case STORE_STATE_CLEAN_STORE: return "clean";
+    }
+    assert(0);
+    return "unknown";
+}
+
+ostream& operator<<(ostream& o, framing::cluster::StoreState s) { return o << 
stateName(s); }
+
 ostream& operator<<(ostream& o, const StoreStatus& s) {
-    switch (s.getState()) {
-      case STORE_STATE_NO_STORE: o << "no store"; break;
-      case STORE_STATE_EMPTY_STORE: o << "empty store"; break;
-      case STORE_STATE_DIRTY_STORE:
-        o << "dirty store, cluster-id=" << s.getClusterId();
-        break;
-      case STORE_STATE_CLEAN_STORE:
-        o << "clean store, cluster-id=" << s.getClusterId()
+    o << s.getState();
+    if (s.getState() ==  STORE_STATE_DIRTY_STORE)
+        o << " cluster-id=" << s.getClusterId()
+          << " config-sequence=" << s.getConfigSeq();
+    if (s.getState() == STORE_STATE_CLEAN_STORE) {
+        o << " cluster-id=" << s.getClusterId()
           << " shutdown-id=" << s.getShutdownId();
-        break;
     }
     return o;
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h?rev=896538&r1=896537&r2=896538&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h Wed Jan  6 17:01:50 2010
@@ -23,6 +23,7 @@
  */
 
 #include "qpid/framing/Uuid.h"
+#include "qpid/framing/SequenceNumber.h"
 #include "qpid/framing/enum.h"
 #include <iosfwd>
 
@@ -43,9 +44,11 @@
     framing::cluster::StoreState getState() const { return state; }
     const Uuid& getClusterId() const { return clusterId; }
     const Uuid& getShutdownId() const { return shutdownId; }
+    framing::SequenceNumber getConfigSeq() const { return configSeq; }
 
     void dirty(const Uuid& start); // Start using the store.
     void clean(const Uuid& stop); // Stop using the store.
+    void setConfigSeq(framing::SequenceNumber seq); // Update the config seq 
number.
 
     void load();
     void save();
@@ -56,8 +59,11 @@
     framing::cluster::StoreState state;
     Uuid clusterId, shutdownId;
     std::string dataDir;
+    framing::SequenceNumber configSeq;
 };
 
+const char* stateName(framing::cluster::StoreState);
+std::ostream& operator<<(std::ostream&, framing::cluster::StoreState);
 std::ostream& operator<<(std::ostream&, const StoreStatus&);
 }} // namespace qpid::cluster
 

Modified: qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp?rev=896538&r1=896537&r2=896538&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp Wed Jan  6 17:01:50 2010
@@ -37,15 +37,15 @@
 typedef InitialStatusMap::Status Status;
 
 Status activeStatus(const Uuid& id=Uuid()) {
-    return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, 
Uuid());
+    return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, 
Uuid(), 0);
 }
 
 Status newcomerStatus(const Uuid& id=Uuid()) {
-    return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, 
Uuid());
+    return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, 
Uuid(), 0);
 }
 
 Status storeStatus(bool active, StoreState state, Uuid start=Uuid(), Uuid 
stop=Uuid()) {
-    return Status(ProtocolVersion(), 0, active, start, state, stop);
+    return Status(ProtocolVersion(), 0, active, start, state, stop, 0);
 }
 
 QPID_AUTO_TEST_CASE(testFirstInCluster) {
@@ -241,7 +241,6 @@
 }
 
 // FIXME aconway 2009-11-20: consistency tests for mixed stores,
-// tests for manual intervention case.
 
 QPID_AUTO_TEST_SUITE_END()
 

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=896538&r1=896537&r2=896538&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Wed Jan  6 17:01:50 2010
@@ -85,6 +85,29 @@
         os.remove("direct.dump")
         os.remove("updatee.dump")
         
+    def test_config_change_seq(self):
+        """Check that cluster members have the correct config change sequence 
numbers"""
+        cluster = self.cluster(0)
+        cluster.start()
+        cluster.start(expect=EXPECT_EXIT_OK)
+        cluster[1].terminate(); cluster[1].wait()
+        cluster.start()
+
+        update_re = re.compile(r"member update: (.*) frameSeq=[0-9]+ 
configSeq=([0-9]+)")
+        matches = [ update_re.search(file(b.log).read()) for b in cluster ]
+        sequences = [ m.group(2) for m in matches]
+        self.assertEqual(sequences, ["0", "1", "3"])
+
+        # Check that configurations with same seq. number match
+        configs={}
+        for b in cluster:
+            matches = update_re.findall(file(b.log).read())
+            for m in matches:
+                seq=m[1]
+                config=re.sub("\((member|unknown)\)", "", m[0])
+                if not seq in configs: configs[seq] = config
+                else: self.assertEqual(configs[seq], config)
+
 class LongTests(BrokerTest):
     """Tests that can run for a long time if -DDURATION=<minutes> is set"""
     def duration(self):
@@ -234,7 +257,7 @@
         msg = re.compile("critical.*no clean store")
         assert a.search_log(msg)
         assert b.search_log(msg)
-        # FIXME aconway 2009-12-03: verify correct store ID in log message
+
         # FIXME aconway 2009-12-03: verify manual restore procedure
 
 

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=896538&r1=896537&r2=896538&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Wed Jan  6 17:01:50 2010
@@ -64,6 +64,7 @@
       <field name="cluster-id" type="uuid"/>>
       <field name="store-state" type="store-state"/>
       <field name="shutdown-id" type="uuid"/>
+      <field name="config-seq" type="sequence-no"/>
     </control>
 
     <!-- New member or updater is ready as an active member. -->
@@ -201,6 +202,7 @@
       <field name="joiners" type="map"/> <!-- member-id -> URL -->
       <field name="members" type="map"/> <!-- member-id -> state -->
       <field name="frame-seq" type="sequence-no"/> <!-- frame sequence number 
-->
+      <field name="config-seq" type="sequence-no"/> <!-- config change seq 
no.-->
     </control>
 
     <!-- Updater cannot fulfill an update offer. -->

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=896538&r1=896537&r2=896538&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Wed Jan  6 17:01:50 2010
@@ -248,16 +248,6 @@
 
     def host_port(self): return "%s:%s" % (self.host, self.port())
 
-    def search_log(self, regex):
-        """Search for regular expression in broker log, return match"""
-        return regex.search(file(self.log).read())
-
-    def get_member_id(self):
-        """Search log file for cluster member ID"""
-        match = self.search_log(re.compile(r"cluster\(([0-9.:]*) INIT\)"))
-        if not match: raise Exception("No cluster member-id found in "+log)
-        return match.group(1)
-    
     def ready(self):
         """Wait till broker is ready to serve clients"""
         self.connect().close()
@@ -275,6 +265,7 @@
         # Use unique cluster name
         self.args = copy(args)
         self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, 
socket.gethostname(), os.getpid()) ]
+        self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
         assert BrokerTest.cluster_lib
         self.args += [ "--load-module", BrokerTest.cluster_lib ]
         self.start_n(count, expect=expect, wait=wait)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to