Author: aconway
Date: Wed Nov 18 17:26:43 2009
New Revision: 881839

URL: http://svn.apache.org/viewvc?rev=881839&view=rev
Log:
Added cluster option --cluster-size.

--cluster-size=N means that during start-up the cluster waits to have N members
before accepting any clients.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
    qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/src/tests/run_cluster_tests
    qpid/trunk/qpid/python/qpid/brokertest.py

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=881839&r1=881838&r2=881839&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Nov 18 17:26:43 2009
@@ -208,7 +208,7 @@
     decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
     discarding(true),
     state(INIT),
-    initMap(self),
+    initMap(self, settings.size),
     lastSize(0),
     lastBroker(false),
     updateRetracted(false),
@@ -403,8 +403,7 @@
                  << ": " << msg);
         leave(l);
     }
-    else if (settings.checkErrors)
-        error.error(connection, type, map.getFrameSeq(), map.getMembers(), 
msg);
+    error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg);
 }
 
 // Handler for deliverFrameQueue.
@@ -423,7 +422,7 @@
         deliverEventQueue.start(); 
     }
     // Process each frame through the error checker.
-    if (settings.checkErrors && error.isUnresolved()) {
+    if (error.isUnresolved()) {
         error.delivered(e);
         while (error.canProcess())  // There is a frame ready to process.
             processFrame(error.getNext(), l);
@@ -874,7 +873,7 @@
     };
     assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
     o << "cluster(" << cluster.self << " " << STATE[cluster.state];
-    if (cluster.settings.checkErrors && cluster.error.isUnresolved()) o << 
"/error";
+    if (cluster.error.isUnresolved()) o << "/error";
     return o << ")";;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=881839&r1=881838&r2=881839&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Wed Nov 18 17:26:43 
2009
@@ -71,9 +71,8 @@
 #if HAVE_LIBCMAN_H
             ("cluster-cman", optValue(settings.quorum), "Integrate with 
Cluster Manager (CMAN) cluster.")
 #endif
+            ("cluster-size", optValue(settings.size, "N"), "Wait for N cluster 
members before allowing clients to connect.")
             ("cluster-read-max", optValue(settings.readMax,"N"), 
"Experimental: flow-control limit  reads per connection. 0=no limit.")
-            // TODO aconway 2009-05-20: temporary, remove
-            ("cluster-check-errors", optValue(settings.checkErrors, "yes|no"), 
"Enable/disable cluster error checks. Normally should be enabled.")
             ;
     }
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h?rev=881839&r1=881838&r2=881839&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h Wed Nov 18 17:26:43 
2009
@@ -34,10 +34,9 @@
     bool quorum;
     size_t readMax;
     std::string username, password, mechanism;
-    bool checkErrors;
+    size_t size;
 
-    ClusterSettings() : quorum(false), readMax(10),
-                        checkErrors(true) // TODO aconway 2009-05-20: remove 
this option.
+    ClusterSettings() : quorum(false), readMax(10), size(1)
     {}
   
     Url getUrl(uint16_t port) const {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp?rev=881839&r1=881838&r2=881839&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp Wed Nov 18 
17:26:43 2009
@@ -28,8 +28,8 @@
 namespace qpid {
 namespace cluster {
 
-InitialStatusMap::InitialStatusMap(const MemberId& self_)
-    : self(self_), completed(), resendNeeded()
+InitialStatusMap::InitialStatusMap(const MemberId& self_, size_t size_)
+    : self(self_), completed(), resendNeeded(), size(size_)
 {}
 
 void InitialStatusMap::configChange(const MemberSet& members) {
@@ -83,7 +83,8 @@
 }
 
 bool InitialStatusMap::isComplete() {
-    return !map.empty() && find_if(map.begin(), map.end(), &notInitialized) == 
map.end();
+    return !map.empty() && find_if(map.begin(), map.end(), &notInitialized) == 
map.end()
+        && (map.size() >= size);
 }
 
 bool InitialStatusMap::transitionToComplete() {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h?rev=881839&r1=881838&r2=881839&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h Wed Nov 18 17:26:43 
2009
@@ -37,7 +37,7 @@
   public:
     typedef framing::ClusterInitialStatusBody Status;
 
-    InitialStatusMap(const MemberId& self);
+    InitialStatusMap(const MemberId& self, size_t size);
     /** Process a config change. @return true if we need to re-send our status 
*/
     void configChange(const MemberSet& newConfig);
     /** @return true if we need to re-send status */
@@ -71,6 +71,7 @@
     MemberSet firstConfig;
     MemberId self;
     bool completed, resendNeeded;
+    size_t size;
 };
 }} // namespace qpid::cluster
 

Modified: qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp?rev=881839&r1=881838&r2=881839&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp Wed Nov 18 17:26:43 2009
@@ -40,7 +40,7 @@
 
 QPID_AUTO_TEST_CASE(testFirstInCluster) {
     // Single member is first in cluster.
-    InitialStatusMap map(MemberId(0));
+    InitialStatusMap map(MemberId(0), 1);
     Uuid id(true);
     BOOST_CHECK(!map.isComplete());
     MemberSet members = list_of(MemberId(0));
@@ -56,7 +56,7 @@
 
 QPID_AUTO_TEST_CASE(testJoinExistingCluster) {
     // Single member 0 joins existing cluster 1,2
-    InitialStatusMap map(MemberId(0));
+    InitialStatusMap map(MemberId(0), 1);
     Uuid id(true);
     MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2));
     map.configChange(members);
@@ -79,7 +79,7 @@
 
 QPID_AUTO_TEST_CASE(testMultipleFirstInCluster) {
     // Multiple members 0,1,2 join at same time.
-    InitialStatusMap map(MemberId(1)); // self is 1
+    InitialStatusMap map(MemberId(1), 1); // self is 1
     Uuid id(true);
     MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2));
     map.configChange(members);
@@ -99,7 +99,7 @@
 
 QPID_AUTO_TEST_CASE(testMultipleJoinExisting) {
     // Multiple members 1,2,3 join existing cluster containing 0.
-    InitialStatusMap map(MemberId(2)); // self is 2
+    InitialStatusMap map(MemberId(2), 1); // self is 2
     Uuid id(true);
     MemberSet members = 
list_of(MemberId(0))(MemberId(1))(MemberId(2))(MemberId(3));
     map.configChange(members);
@@ -119,7 +119,7 @@
 
 QPID_AUTO_TEST_CASE(testMembersLeave) {
     // Test that map completes if members leave rather than send status.
-    InitialStatusMap map(MemberId(0));
+    InitialStatusMap map(MemberId(0), 1);
     Uuid id(true);
     map.configChange(list_of(MemberId(0))(MemberId(1))(MemberId(2)));
     map.received(MemberId(0), newcomerStatus());
@@ -134,7 +134,7 @@
 
 QPID_AUTO_TEST_CASE(testInteveningConfig) {
     // Multiple config changes arrives before we complete the map.
-    InitialStatusMap map(MemberId(0));
+    InitialStatusMap map(MemberId(0), 1);
     Uuid id(true);
 
     map.configChange(list_of<MemberId>(0)(1));
@@ -159,6 +159,20 @@
     BOOST_CHECK_EQUAL(map.getClusterId(), id);
 }
 
+QPID_AUTO_TEST_CASE(testInitialSize) {
+    InitialStatusMap map(MemberId(0), 3);
+    map.configChange(list_of<MemberId>(0)(1));
+    map.received(MemberId(0), newcomerStatus());
+    map.received(MemberId(1), newcomerStatus());
+    BOOST_CHECK(!map.isComplete());
+
+    map.configChange(list_of<MemberId>(0)(1)(2));
+    map.received(MemberId(0), newcomerStatus());
+    map.received(MemberId(1), newcomerStatus());
+    map.received(MemberId(2), newcomerStatus());
+    BOOST_CHECK(map.isComplete());
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=881839&r1=881838&r2=881839&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Wed Nov 18 17:26:43 2009
@@ -26,13 +26,8 @@
 from threading import Thread
 
 
-class ClusterTests(BrokerTest):
-    """Cluster tests with support for testing with a store plugin."""
-
-    def duration(self):
-        d = self.config.defines.get("DURATION")
-        if d: return float(d)*60
-        else: return 3
+class ShortTests(BrokerTest):
+    """Short cluster functionality tests."""
 
     def test_message_replication(self):
         """Test basic cluster message replication."""
@@ -57,6 +52,42 @@
         self.assertEqual("y", m.content)
         s2.connection.close()
 
+    def test_cluster_size(self):
+        """Verify cluster startup waits for N brokers if --cluster-size=N"""
+        class ConnectThread(Thread):
+            def __init__(self, broker):
+                Thread.__init__(self)
+                self.broker=broker
+                self.connected = False
+                self.error = None
+
+            def run(self):
+                try:
+                    self.broker.connect()
+                    self.connected = True
+                except Exception, e: self.error = RethrownException(e)
+
+        cluster = self.cluster(1, args=["--cluster-size=3"], 
wait_for_start=False)
+        c = ConnectThread(cluster[0])
+        c.start()
+        time.sleep(.01)
+        assert not c.connected
+        cluster.start(wait_for_start=False)
+        time.sleep(.01)
+        assert not c.connected
+        cluster.start(wait_for_start=False)
+        c.join(1)
+        assert not c.isAlive()       # Join didn't time out
+        assert c.connected
+        if c.error: raise c.error
+
+class LongTests(BrokerTest):
+    """Tests that can run for a long time if -DDURATION=<minutes> is set"""
+    def duration(self):
+        d = self.config.defines.get("DURATION")
+        if d: return float(d)*60
+        else: return 3                  # Default is to be quick
+
     def test_failover(self):
         """Test fail-over during continuous send-receive with errors"""
 
@@ -84,7 +115,8 @@
         receiver.stop(sender.sent)
         for i in range(i, len(cluster)): cluster[i].kill()
 
-class ClusterStoreTests(BrokerTest):
+
+class StoreTests(BrokerTest):
     """
     Cluster tests that can only be run if there is a store available.
     """

Modified: qpid/trunk/qpid/cpp/src/tests/run_cluster_tests
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_cluster_tests?rev=881839&r1=881838&r2=881839&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_cluster_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_cluster_tests Wed Nov 18 17:26:43 2009
@@ -45,8 +45,10 @@
 mkdir -p $OUTDIR
 
 # Ignore tests requiring a store by default. 
-TESTS="-i cluster_tests.ClusterStoreTests.* -I $srcdir/cluster_tests.fail $*"
+CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:--i cluster_tests.StoreTests.* -I 
$srcdir/cluster_tests.fail}
+CLUSTER_TESTS=${CLUSTER_TESTS:-$*}
 
-with_ais_group $PYTHON_COMMANDS/qpid-python-test -DOUTDIR=$OUTDIR -m 
cluster_tests $TESTS || exit 1
+set -x
+with_ais_group $PYTHON_COMMANDS/qpid-python-test -DOUTDIR=$OUTDIR -m 
cluster_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1
 rm -rf $OUTDIR
 #exit 0

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=881839&r1=881838&r2=881839&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Wed Nov 18 17:26:43 2009
@@ -206,7 +206,7 @@
     _cluster_lib = checkenv("CLUSTER_LIB")
     _cluster_count = 0
 
-    def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING):
+    def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, 
wait_for_start=True):
         self.test = test
         self._brokers=[]
         self.name = "cluster%d" % Cluster._cluster_count
@@ -215,21 +215,16 @@
         self.args = copy(args)
         self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, 
socket.gethostname(), os.getpid()) ]
         self.args += [ "--load-module", self._cluster_lib ]
-        self.start_n(count, expect=expect)
+        self.start_n(count, expect=expect, wait_for_start=wait_for_start)
 
-    def start(self, name=None, expect=EXPECT_RUNNING):
+    def start(self, name=None, expect=EXPECT_RUNNING, wait_for_start=True):
         """Add a broker to the cluster. Returns the index of the new broker."""
         if not name: name="%s-%d" % (self.name, len(self._brokers))
-        self._brokers.append(self.test.broker(self.args, name, expect))
+        self._brokers.append(self.test.broker(self.args, name, expect, 
wait_for_start))
         return self._brokers[-1]
 
-    def start_n(self, count, expect=EXPECT_RUNNING):
-        for i in range(count): self.start(expect=expect)
-
-    def wait(self):
-        """Wait for all cluster members to be ready"""
-        for b in self._brokers:
-            b.connect().close()
+    def start_n(self, count, expect=EXPECT_RUNNING, wait_for_start=True):
+        for i in range(count): self.start(expect=expect, 
wait_for_start=wait_for_start)
 
     # Behave like a list of brokers.
     def __len__(self): return len(self._brokers)
@@ -280,16 +275,15 @@
         self.cleanup_stop(p)
         return p
 
-    def broker(self, args=[], name=None, expect=EXPECT_RUNNING):
+    def broker(self, args=[], name=None, 
expect=EXPECT_RUNNING,wait_for_start=True):
         """Create and return a broker ready for use"""
         b = Broker(self, args=args, name=name, expect=expect)
-        b.connect().close()
+        if (wait_for_start): b.connect().close()
         return b
 
-    def cluster(self, count=0, args=[], expect=EXPECT_RUNNING):
+    def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, 
wait_for_start=True):
         """Create and return a cluster ready for use"""
-        cluster = Cluster(self, count, args, expect=expect)
-        cluster.wait()
+        cluster = Cluster(self, count, args, expect=expect, 
wait_for_start=wait_for_start)
         return cluster
 
 class RethrownException(Exception):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to