Author: aconway
Date: Wed Dec 9 16:58:51 2009
New Revision: 888874
URL: http://svn.apache.org/viewvc?rev=888874&view=rev
Log:
QPID-2253 - Cluster node shuts down with inconsistent error.
Add a missing memberUpdate on the transition to CATCHUP mode.
The inconsistent error was caused because the newly updated member
did not have its membership updated and so was missing an failover
update message that the existing members sent to a new client.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/java/testkit/bin/qpid-python-testkit
qpid/trunk/qpid/python/qpid/brokertest.py
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=888874&r1=888873&r2=888874&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Dec 9 16:58:51 2009
@@ -600,6 +600,7 @@
void Cluster::initMapCompleted(Lock& l) {
// Called on completion of the initial status map.
+ QPID_LOG(debug, *this << " initial status map complete. ");
if (state == INIT) {
// We have status for all members so we can make join descisions.
initMap.checkConsistent();
@@ -705,10 +706,7 @@
member,
ClusterInitialStatusBody(ProtocolVersion(), version, active, id,
store, shutdownId)
);
- if (initMap.transitionToComplete()) {
- QPID_LOG(debug, *this << " initial status map complete. ");
- initMapCompleted(l);
- }
+ if (initMap.transitionToComplete()) initMapCompleted(l);
}
void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
@@ -808,10 +806,11 @@
checkUpdateIn(l);
}
-void Cluster::checkUpdateIn(Lock&) {
+void Cluster::checkUpdateIn(Lock& l) {
if (state != UPDATEE) return; // Wait till we reach the stall point.
if (updatedMap) { // We're up to date
map = *updatedMap;
+ memberUpdate(l);
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()),
self);
state = CATCHUP;
discarding = false; // ok to set, we're stalled for update.
Modified: qpid/trunk/qpid/java/testkit/bin/qpid-python-testkit
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/bin/qpid-python-testkit?rev=888874&r1=888873&r2=888874&view=diff
==============================================================================
--- qpid/trunk/qpid/java/testkit/bin/qpid-python-testkit (original)
+++ qpid/trunk/qpid/java/testkit/bin/qpid-python-testkit Wed Dec 9 16:58:51
2009
@@ -25,10 +25,6 @@
. ./setenv.sh
export PYTHONPATH=../:$PYTHONPATH
-
-if [ -d $OUTDIR ]; then
- rm -rf $OUTDIR
-fi
-
-$PYTHON_DIR/qpid-python-test -DOUTDIR=$OUTDIR -m testkit
+rm -rf $OUTDIR
+$PYTHON_DIR/qpid-python-test -DOUTDIR=$OUTDIR -m testkit "$@"
Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=888874&r1=888873&r2=888874&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Wed Dec 9 16:58:51 2009
@@ -89,6 +89,7 @@
self.stdout = ExceptionWrapper(self.fromchild, msg)
self.stderr = ExceptionWrapper(self.childerr, msg)
self.dump(self.cmd_str(), "cmd")
+ log.debug("Started process %s" % self.pname)
def dump(self, str, ext):
name = "%s.%s" % (self.pname, ext)
@@ -107,7 +108,7 @@
try:
self.kill()
except:
- self.unexpected("Exit code %d" % self.wait())
+ self.unexpected("expected running, exit code %d" % self.wait())
else:
# Give the process some time to exit.
delay = 0.1
@@ -393,13 +394,11 @@
self.condition.release()
def stop(self):
- log.debug("NumberedSender.stop")
self.condition.acquire()
self.stopped = True
self.condition.notify()
self.condition.release()
self.join()
- log.debug("NumberedSender.stop - joined")
if self.error: raise self.error
class NumberedReceiver(Thread):
@@ -437,18 +436,14 @@
if self.sender:
self.sender.notify_received(self.received)
except Exception, e:
- log.debug("NumberedReceiver.run exception %s" % (e)) # FIXME
aconway 2009-12-02:
self.error = RethrownException(e, self.receiver.pname)
def stop(self, count):
"""Returns when received >= count"""
- log.debug("NumberedReceiver.stop") # FIXME aconway 2009-12-02:
self.lock.acquire()
- log.debug("NumberedReceiver.stop at %d, received=%d" % (count,
self.received))
self.stopat = count
self.lock.release()
self.join()
- log.debug("NumberedReceiver.stop - joined")
if self.error: raise self.error
class ErrorGenerator(StoppableThread):
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]