Author: aconway
Date: Wed Jan  6 17:00:57 2010
New Revision: 896536

URL: http://svn.apache.org/viewvc?rev=896536&view=rev
Log:
Don't hold up broker initialization for cluster initialization.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/python/qpid/brokertest.py

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=896536&r1=896535&r2=896536&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jan  6 17:00:57 2010
@@ -258,15 +258,14 @@
     // Load my store status before we go into initialization
     if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
         store.load();
+        if (store.getState() == STORE_STATE_DIRTY_STORE)
+            broker.setRecovery(false); // Ditch my current store.
         if (store.getClusterId())
             clusterId = store.getClusterId(); // Use stored ID if there is one.
         QPID_LOG(notice, "Cluster store state: " << store)
     }
 
     cpg.join(name);
-    // Pump the CPG dispatch manually till we get initialized. 
-    while (state == INIT)
-        cpg.dispatchOne();
 }
 
 Cluster::~Cluster() {
@@ -277,18 +276,6 @@
     if (settings.quorum) quorum.start(poller);
     if (myUrl.empty())
         myUrl = 
Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
-    // Cluster constructor will leave us in either READY or JOINER state.
-    switch (state) {
-      case READY:
-        mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 
self);
-        break;
-      case JOINER:
-        mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), 
myUrl.str()), self);
-        break;
-      default:
-        assert(0);
-    }
-    QPID_LOG(notice, *this << (state == READY ? " joined" : " joining") << " 
cluster " << name);
     broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
     broker.setExpiryPolicy(expiryPolicy);
     dispatcher.start();
@@ -389,23 +376,9 @@
     deliverEvent(e);
 }
 
-void Cluster::deliverEvent(const Event& e) {
-    // During initialization, execute events directly in the same thread.
-    // Once initialized, push to pollable queue to be processed in another 
thread.
-    if (state == INIT)
-        deliveredEvent(e);
-    else
-        deliverEventQueue.push(e);
-}
-
-void Cluster::deliverFrame(const EventFrame& e) {
-    // During initialization, execute events directly in the same thread.
-    // Once initialized, push to pollable queue to be processed in another 
thread.
-    if (state == INIT)
-        deliveredFrame(e);
-    else
-        deliverFrameQueue.push(e);
-}
+void Cluster::deliverEvent(const Event& e) { deliverEventQueue.push(e); }
+
+void Cluster::deliverFrame(const EventFrame& e) { deliverFrameQueue.push(e); }
 
 const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
     return  (body && body->getMethod() &&
@@ -621,13 +594,16 @@
             broker.setRecovery(false); // Ditch my current store.
             broker.setClusterUpdatee(true);
             state = JOINER;
+            mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), 
myUrl.str()), self);
+            QPID_LOG(notice, *this << " joining cluster " << name);
         }
         else {                  // I can go ready.
             discarding = false;
             setReady(l);
             memberUpdate(l);
+            mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), 
myUrl.str()), self);
+            QPID_LOG(notice, *this << " joined cluster " << name);
         }
-        QPID_LOG(debug, *this << "Initialization complete");
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=896536&r1=896535&r2=896536&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Wed Jan  6 17:00:57 
2010
@@ -55,15 +55,8 @@
 void Multicaster::mcast(const Event& e) {
     {
         sys::Mutex::ScopedLock l(lock);
-        if (!ready) {
-            if (e.isConnection()) holdingQueue.push_back(e);
-            else {
-                iovec iov = e.toIovec();
-                // FIXME aconway 2009-11-23: configurable retry --cluster-retry
-                if (!cpg.mcast(&iov, 1))
-                    throw Exception("CPG flow control error during 
initialization");
-                QPID_LOG(trace, "MCAST (direct) " << e);
-            }
+        if (!ready && e.isConnection()) {
+            holdingQueue.push_back(e);
             return;
         }
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=896536&r1=896535&r2=896536&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h Wed Jan  6 17:00:57 
2010
@@ -55,7 +55,7 @@
             return i;
         }
         catch (const std::exception& e) {
-            QPID_LOG(error, message << ": " << e.what());
+            QPID_LOG(critical, message << ": " << e.what());
             this->stop();
             error();
             return values.end();

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=896536&r1=896535&r2=896536&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Wed Jan  6 17:00:57 2010
@@ -193,7 +193,8 @@
         a.terminate()
         cluster2 = self.cluster(1, args=self.args())
         try:
-            a = cluster2.start("a", expect=EXPECT_EXIT_FAIL)
+            a = cluster2.start("a", expect=EXPECT_EXIT_OK)
+            a.ready()
             self.fail("Expected exception")
         except: pass
 
@@ -214,8 +215,10 @@
         self.assertEqual(c.wait(), 0)
 
         # Mix members from both shutdown events, they should fail
-        a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
-        b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
+        a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+        b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+        self.assertRaises(Exception, lambda: a.ready())
+        self.assertRaises(Exception, lambda: b.ready())
 
     def test_total_failure(self):
         # Verify we abort with sutiable error message if no clean stores.
@@ -224,10 +227,14 @@
         b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=True)
         a.kill()
         b.kill()
-        a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
-        b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
-        assert a.wait() != 0
-        assert b.wait() != 0
+        a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+        b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+        self.assertRaises(Exception, lambda: a.ready())
+        self.assertRaises(Exception, lambda: b.ready())
         msg = re.compile("critical.*no clean store")
-        assert msg.search(file(a.log).read())
-        assert msg.search(file(b.log).read())
+        assert a.search_log(msg)
+        assert b.search_log(msg)
+        # FIXME aconway 2009-12-03: verify correct store ID in log message
+        # FIXME aconway 2009-12-03: verify manual restore procedure
+
+

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=896536&r1=896535&r2=896536&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Wed Jan  6 17:00:57 2010
@@ -20,7 +20,7 @@
 # Support library for tests that start multiple brokers, e.g. cluster
 # or federation
 
-import os, signal, string, tempfile, popen2, socket, threading, time, imp
+import os, signal, string, tempfile, popen2, socket, threading, time, imp, re
 import qpid, traceback
 from qpid import connection, messaging, util
 from qpid.compat import format_exc
@@ -163,6 +163,13 @@
     "A broker process. Takes care of start, stop and logging."
     _broker_count = 0
 
+    def find_log(self):
+        self.log = "%s.log" % self.name
+        i = 1
+        while (os.path.exists(self.log)):
+            self.log = "%s-%d.log" % (self.name, i)
+            i += 1
+
     def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING):
         """Start a broker daemon. name determines the data-dir and log
         file names."""
@@ -174,7 +181,7 @@
         else:
             self.name = "broker%d" % Broker._broker_count
             Broker._broker_count += 1
-        self.log = self.name+".log"
+        self.find_log()
         cmd += ["--log-to-file", self.log, "--log-prefix", self.name]
         cmd += ["--log-to-stderr=no"] 
         self.datadir = self.name
@@ -182,7 +189,7 @@
         Popen.__init__(self, cmd, expect)
         test.cleanup_stop(self)
         self.host = "localhost"
-        log.debug("Started broker %s (%s)" % (self.name, self.pname))
+        log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, 
self.log))
 
     def port(self):
         # Read port from broker process stdout if not already read.
@@ -240,8 +247,21 @@
         return m
 
     def host_port(self): return "%s:%s" % (self.host, self.port())
+
+    def search_log(self, regex):
+        """Search for regular expression in broker log, return match"""
+        return regex.search(file(self.log).read())
+
+    def get_member_id(self):
+        """Search log file for cluster member ID"""
+        match = self.search_log(re.compile(r"cluster\(([0-9.:]*) INIT\)"))
+        if not match: raise Exception("No cluster member-id found in "+log)
+        return match.group(1)
     
-        
+    def ready(self):
+        """Wait till broker is ready to serve clients"""
+        self.connect().close()
+
 class Cluster:
     """A cluster of brokers in a test."""
 
@@ -333,7 +353,7 @@
     def wait():
         """Wait for all brokers in the cluster to be ready"""
         for b in _brokers: b.connect().close()
-        
+
 class RethrownException(Exception):
     """Captures the original stack trace to be thrown later""" 
     def __init__(self, e, msg=""):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to