Hello,

I am new to ZooKeeper and have created a simple testing tool that runs a user configurable number of QuorumPeer instances in the same JVM to simulate a ZooKeeper cluster. My aim with this is to use this tool to test the behaviour of of my application under conditions such as session timeout and network partitioning of ZooKeeper client from cluster. This tool is based on the QuorumBase class from the ZooKeeper test packages.

The tool sucessfully starts the required number of ZK instances (say 3) but when those instances are shutdown by calling QuorumPeer.shutdown() on each instance I notice that the LearnerHandler sendPackets() threads do not shutdown. My analysis of what is happening follows.

The call to QuorumPeer.shutdown() calls Leader.shutdown() which iterates its LearnerHandler instances and calls LearnerHandler.shutdown() for each. I have copied the relevant code from LearnerHandler and annotated the reason why the sendPackets() thread does not exit.

//---------------------------------------------------------------------------------------------

    public void shutdown() {
        try {
            if (sock != null && !sock.isClosed()) {
sock.close(); //******************************** 1. This causes LearnerHandler.run() to terminate at point 4
            }
        } catch (IOException e) {
LOG.warn("Ignoring unexpected exception during socket close", e);
        }
this.interrupt(); //******************************** 2. This sets the LearnerHandler thread interrupt flag which causes failure at point 6
        leader.removeLearnerHandler(this);
    }

//---------------------------------------------------------------------------------------------

    public void run() {
       try {
            . . .
// *************************************** 3. The thread that calls sendPackets(). sendPackets() contains a loop that relies on // *************************************** the proposalOfDeath at point 6 in order to exit the loop. Also, perhaps this should be // *************************************** a daemon thread to ensure that it does not prevent an application from shutting down //*************************************** if it does not itself shutdown.
            // Start sending packets
            new Thread() {
                public void run() {
                    Thread.currentThread().setName(
                            "Sender-" + sock.getRemoteSocketAddress());
                    try {
                        sendPackets();
                    } catch (InterruptedException e) {
                        LOG.warn("Unexpected interruption",e);
                    }
                }
            }.start();
            . . .

            while (true) {
                qp = new QuorumPacket();
ia.readRecord(qp, "packet"); //******************** 4. The socket.close() from 1 cause an IOException here which is handled at 5
                 ...
            }
        } catch (IOException e) {
if (sock != null && !sock.isClosed()) { // ********************** 5. Handling IOException caused by socket closing. The if() condition actually evaluates // ********************** to false because the socket has been closed in the shutdown() method. Execution
                                                                               
//***********************    falls through to finally clause below, which 
contains 6.
LOG.error("Unexpected exception causing shutdown while sock "
                        + "still open", e);
                //close the socket to make sure the
                //other side can see it being close
                try {
                    sock.close();
                } catch(IOException ie) {
                    // do nothing
                }
            }
        } catch (InterruptedException e) {
            LOG.error("Unexpected exception causing shutdown", e);
        } finally {
            LOG.warn("******* GOODBYE "
+ (sock != null ? sock.getRemoteSocketAddress() : "<null>")
                    + " ********");
            // Send the packet of death
            try {
queuedPackets.put(proposalOfDeath); //********************* 6. The call to queuedPackets.put() exits with an InterruptedException without putting the proposalOfDeath //********************* on the queue because the threads interrupt flag has been set at point 2 (The exception comes from // ********************* java.util.concurrent.locks.ReentrantLock which is used in java.util.concurrent.LinkedBlockingQueue). //********************* The sendPackets() thread at 3 never receives the proposalOfDeath and remains running after shutdown.
            } catch (InterruptedException e) {
                LOG.warn("Ignoring unexpected exception", e);
            }
            shutdown();
        }
  }

//---------------------------------------------------------------------------------------------

There is also a race condition that can cause intermittent failures because if 6 executes before the interrupt() call at 2 then the proposalOfDeath will be enqueued and cause sendPackets() to exit as required.

If someone could change the shutdown process and provide me with a patched JAR I will be happy to retest it.

Regards
Mauri

Reply via email to