Hello,
I am new to ZooKeeper and have created a simple testing tool that runs a
user configurable number of QuorumPeer instances in the same JVM to
simulate a ZooKeeper cluster.
My aim with this is to use this tool to test the behaviour of of my
application under conditions such as session timeout and network
partitioning of ZooKeeper client from cluster. This tool is based on the
QuorumBase class from the ZooKeeper test packages.
The tool sucessfully starts the required number of ZK instances (say 3)
but when those instances are shutdown by calling QuorumPeer.shutdown()
on each instance I notice that the LearnerHandler sendPackets() threads
do not shutdown. My analysis of what is happening follows.
The call to QuorumPeer.shutdown() calls Leader.shutdown() which iterates
its LearnerHandler instances and calls LearnerHandler.shutdown() for each.
I have copied the relevant code from LearnerHandler and annotated the
reason why the sendPackets() thread does not exit.
//---------------------------------------------------------------------------------------------
public void shutdown() {
try {
if (sock != null && !sock.isClosed()) {
sock.close(); //******************************** 1.
This causes LearnerHandler.run() to terminate at point 4
}
} catch (IOException e) {
LOG.warn("Ignoring unexpected exception during socket
close", e);
}
this.interrupt(); //******************************** 2.
This sets the LearnerHandler thread interrupt flag which causes failure
at point 6
leader.removeLearnerHandler(this);
}
//---------------------------------------------------------------------------------------------
public void run() {
try {
. . .
// *************************************** 3. The thread
that calls sendPackets(). sendPackets() contains a loop that relies on
// *************************************** the
proposalOfDeath at point 6 in order to exit the loop. Also, perhaps this
should be
// *************************************** a daemon
thread to ensure that it does not prevent an application from shutting down
//*************************************** if it does
not itself shutdown.
// Start sending packets
new Thread() {
public void run() {
Thread.currentThread().setName(
"Sender-" + sock.getRemoteSocketAddress());
try {
sendPackets();
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption",e);
}
}
}.start();
. . .
while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
//******************** 4. The socket.close() from 1 cause an IOException
here which is handled at 5
...
}
} catch (IOException e) {
if (sock != null && !sock.isClosed()) { //
********************** 5. Handling IOException caused by socket closing.
The if() condition actually evaluates
// ********************** to false because the socket has
been closed in the shutdown() method. Execution
//*********************** falls through to finally clause below, which
contains 6.
LOG.error("Unexpected exception causing shutdown while
sock "
+ "still open", e);
//close the socket to make sure the
//other side can see it being close
try {
sock.close();
} catch(IOException ie) {
// do nothing
}
}
} catch (InterruptedException e) {
LOG.error("Unexpected exception causing shutdown", e);
} finally {
LOG.warn("******* GOODBYE "
+ (sock != null ? sock.getRemoteSocketAddress() :
"<null>")
+ " ********");
// Send the packet of death
try {
queuedPackets.put(proposalOfDeath);
//********************* 6. The call to queuedPackets.put() exits with an
InterruptedException without putting the proposalOfDeath
//********************* on the queue because the
threads interrupt flag has been set at point 2 (The exception comes from
// *********************
java.util.concurrent.locks.ReentrantLock which is used in
java.util.concurrent.LinkedBlockingQueue).
//********************* The sendPackets() thread
at 3 never receives the proposalOfDeath and remains running after shutdown.
} catch (InterruptedException e) {
LOG.warn("Ignoring unexpected exception", e);
}
shutdown();
}
}
//---------------------------------------------------------------------------------------------
There is also a race condition that can cause intermittent failures
because if 6 executes before the interrupt() call at 2 then the
proposalOfDeath will be enqueued and cause sendPackets() to exit as
required.
If someone could change the shutdown process and provide me with a
patched JAR I will be happy to retest it.
Regards
Mauri