Author: aconway
Date: Wed Jan 6 17:00:57 2010
New Revision: 896536
URL: http://svn.apache.org/viewvc?rev=896536&view=rev
Log:
Don't hold up broker initialization for cluster initialization.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
qpid/trunk/qpid/python/qpid/brokertest.py
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=896536&r1=896535&r2=896536&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jan 6 17:00:57 2010
@@ -258,15 +258,14 @@
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
store.load();
+ if (store.getState() == STORE_STATE_DIRTY_STORE)
+ broker.setRecovery(false); // Ditch my current store.
if (store.getClusterId())
clusterId = store.getClusterId(); // Use stored ID if there is one.
QPID_LOG(notice, "Cluster store state: " << store)
}
cpg.join(name);
- // Pump the CPG dispatch manually till we get initialized.
- while (state == INIT)
- cpg.dispatchOne();
}
Cluster::~Cluster() {
@@ -277,18 +276,6 @@
if (settings.quorum) quorum.start(poller);
if (myUrl.empty())
myUrl =
Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
- // Cluster constructor will leave us in either READY or JOINER state.
- switch (state) {
- case READY:
- mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()),
self);
- break;
- case JOINER:
- mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(),
myUrl.str()), self);
- break;
- default:
- assert(0);
- }
- QPID_LOG(notice, *this << (state == READY ? " joined" : " joining") << "
cluster " << name);
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
@@ -389,23 +376,9 @@
deliverEvent(e);
}
-void Cluster::deliverEvent(const Event& e) {
- // During initialization, execute events directly in the same thread.
- // Once initialized, push to pollable queue to be processed in another
thread.
- if (state == INIT)
- deliveredEvent(e);
- else
- deliverEventQueue.push(e);
-}
-
-void Cluster::deliverFrame(const EventFrame& e) {
- // During initialization, execute events directly in the same thread.
- // Once initialized, push to pollable queue to be processed in another
thread.
- if (state == INIT)
- deliveredFrame(e);
- else
- deliverFrameQueue.push(e);
-}
+void Cluster::deliverEvent(const Event& e) { deliverEventQueue.push(e); }
+
+void Cluster::deliverFrame(const EventFrame& e) { deliverFrameQueue.push(e); }
const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
return (body && body->getMethod() &&
@@ -621,13 +594,16 @@
broker.setRecovery(false); // Ditch my current store.
broker.setClusterUpdatee(true);
state = JOINER;
+ mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(),
myUrl.str()), self);
+ QPID_LOG(notice, *this << " joining cluster " << name);
}
else { // I can go ready.
discarding = false;
setReady(l);
memberUpdate(l);
+ mcast.mcastControl(ClusterReadyBody(ProtocolVersion(),
myUrl.str()), self);
+ QPID_LOG(notice, *this << " joined cluster " << name);
}
- QPID_LOG(debug, *this << "Initialization complete");
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=896536&r1=896535&r2=896536&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Wed Jan 6 17:00:57
2010
@@ -55,15 +55,8 @@
void Multicaster::mcast(const Event& e) {
{
sys::Mutex::ScopedLock l(lock);
- if (!ready) {
- if (e.isConnection()) holdingQueue.push_back(e);
- else {
- iovec iov = e.toIovec();
- // FIXME aconway 2009-11-23: configurable retry --cluster-retry
- if (!cpg.mcast(&iov, 1))
- throw Exception("CPG flow control error during
initialization");
- QPID_LOG(trace, "MCAST (direct) " << e);
- }
+ if (!ready && e.isConnection()) {
+ holdingQueue.push_back(e);
return;
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=896536&r1=896535&r2=896536&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h Wed Jan 6 17:00:57
2010
@@ -55,7 +55,7 @@
return i;
}
catch (const std::exception& e) {
- QPID_LOG(error, message << ": " << e.what());
+ QPID_LOG(critical, message << ": " << e.what());
this->stop();
error();
return values.end();
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=896536&r1=896535&r2=896536&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Wed Jan 6 17:00:57 2010
@@ -193,7 +193,8 @@
a.terminate()
cluster2 = self.cluster(1, args=self.args())
try:
- a = cluster2.start("a", expect=EXPECT_EXIT_FAIL)
+ a = cluster2.start("a", expect=EXPECT_EXIT_OK)
+ a.ready()
self.fail("Expected exception")
except: pass
@@ -214,8 +215,10 @@
self.assertEqual(c.wait(), 0)
# Mix members from both shutdown events, they should fail
- a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
- b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
+ a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+ self.assertRaises(Exception, lambda: a.ready())
+ self.assertRaises(Exception, lambda: b.ready())
def test_total_failure(self):
# Verify we abort with sutiable error message if no clean stores.
@@ -224,10 +227,14 @@
b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=True)
a.kill()
b.kill()
- a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
- b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
- assert a.wait() != 0
- assert b.wait() != 0
+ a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+ self.assertRaises(Exception, lambda: a.ready())
+ self.assertRaises(Exception, lambda: b.ready())
msg = re.compile("critical.*no clean store")
- assert msg.search(file(a.log).read())
- assert msg.search(file(b.log).read())
+ assert a.search_log(msg)
+ assert b.search_log(msg)
+ # FIXME aconway 2009-12-03: verify correct store ID in log message
+ # FIXME aconway 2009-12-03: verify manual restore procedure
+
+
Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=896536&r1=896535&r2=896536&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Wed Jan 6 17:00:57 2010
@@ -20,7 +20,7 @@
# Support library for tests that start multiple brokers, e.g. cluster
# or federation
-import os, signal, string, tempfile, popen2, socket, threading, time, imp
+import os, signal, string, tempfile, popen2, socket, threading, time, imp, re
import qpid, traceback
from qpid import connection, messaging, util
from qpid.compat import format_exc
@@ -163,6 +163,13 @@
"A broker process. Takes care of start, stop and logging."
_broker_count = 0
+ def find_log(self):
+ self.log = "%s.log" % self.name
+ i = 1
+ while (os.path.exists(self.log)):
+ self.log = "%s-%d.log" % (self.name, i)
+ i += 1
+
def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING):
"""Start a broker daemon. name determines the data-dir and log
file names."""
@@ -174,7 +181,7 @@
else:
self.name = "broker%d" % Broker._broker_count
Broker._broker_count += 1
- self.log = self.name+".log"
+ self.find_log()
cmd += ["--log-to-file", self.log, "--log-prefix", self.name]
cmd += ["--log-to-stderr=no"]
self.datadir = self.name
@@ -182,7 +189,7 @@
Popen.__init__(self, cmd, expect)
test.cleanup_stop(self)
self.host = "localhost"
- log.debug("Started broker %s (%s)" % (self.name, self.pname))
+ log.debug("Started broker %s (%s, %s)" % (self.name, self.pname,
self.log))
def port(self):
# Read port from broker process stdout if not already read.
@@ -240,8 +247,21 @@
return m
def host_port(self): return "%s:%s" % (self.host, self.port())
+
+ def search_log(self, regex):
+ """Search for regular expression in broker log, return match"""
+ return regex.search(file(self.log).read())
+
+ def get_member_id(self):
+ """Search log file for cluster member ID"""
+ match = self.search_log(re.compile(r"cluster\(([0-9.:]*) INIT\)"))
+ if not match: raise Exception("No cluster member-id found in "+log)
+ return match.group(1)
-
+ def ready(self):
+ """Wait till broker is ready to serve clients"""
+ self.connect().close()
+
class Cluster:
"""A cluster of brokers in a test."""
@@ -333,7 +353,7 @@
def wait():
"""Wait for all brokers in the cluster to be ready"""
for b in _brokers: b.connect().close()
-
+
class RethrownException(Exception):
"""Captures the original stack trace to be thrown later"""
def __init__(self, e, msg=""):
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]