Repository: zookeeper Updated Branches: refs/heads/branch-3.4 f0ef24a5e -> 4b9fe5ea7
ZOOKEEPER-2044: CancelledKeyException in zookeeper branch-3.4 Patch from Germán Blanco, test case from Flavio. Author: Michael Han <[email protected]> Reviewers: Rakesh Radhakrishnan <[email protected]> Closes #156 from hanm/ZOOKEEPER-2044 and squashes the following commits: f1f9790 [Michael Han] Use createZKClient to set session timeout value directly. 831e560 [Michael Han] Make test run faster by reducing client cnx timeouts; otherwise client will wait for server until session timeout because the channel has been closed on server side. 736091a [Michael Han] Address Rakesh's code review comments. a220d95 [Michael Han] ZOOKEEPER-2044:CancelledKeyException in zookeeper 3.4.5. Patch from Germán Blanco, test case from Flavio. Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/4b9fe5ea Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/4b9fe5ea Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/4b9fe5ea Branch: refs/heads/branch-3.4 Commit: 4b9fe5ea71d73ba26ca722e3919b8d0afe84ab86 Parents: f0ef24a Author: Michael Han <[email protected]> Authored: Wed Feb 1 03:23:42 2017 +0530 Committer: Rakesh Radhakrishnan <[email protected]> Committed: Wed Feb 1 03:23:42 2017 +0530 ---------------------------------------------------------------------- .../apache/zookeeper/server/NIOServerCnxn.java | 71 +++++++++++--------- .../zookeeper/server/NIOServerCnxnTest.java | 39 ++++++++++- 2 files changed, 78 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/4b9fe5ea/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java b/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java index 746102d..4ea7fb2 100644 --- a/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java +++ b/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java @@ -62,7 +62,7 @@ public class NIOServerCnxn extends ServerCnxn { final SocketChannel sock; - private final SelectionKey sk; + protected final SelectionKey sk; boolean initialized; @@ -74,7 +74,7 @@ public class NIOServerCnxn extends ServerCnxn { int sessionTimeout; - private final ZooKeeperServer zkServer; + protected final ZooKeeperServer zkServer; /** * The number of requests that have been submitted but not yet responded to. @@ -144,38 +144,49 @@ public class NIOServerCnxn extends ServerCnxn { public void sendBuffer(ByteBuffer bb) { try { - if (bb != ServerCnxnFactory.closeConn) { - // We check if write interest here because if it is NOT set, - // nothing is queued, so we can try to send the buffer right - // away without waking up the selector - if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) { - try { - sock.write(bb); - } catch (IOException e) { - // we are just doing best effort right now - } - } - // if there is nothing left to send, we are done - if (bb.remaining() == 0) { - packetSent(); - return; + internalSendBuffer(bb); + } catch(Exception e) { + LOG.error("Unexpected Exception: ", e); + } + } + + /** + * This method implements the internals of sendBuffer. We + * have separated it from send buffer to be able to catch + * exceptions when testing. + * + * @param bb Buffer to send. + */ + protected void internalSendBuffer(ByteBuffer bb) { + if (bb != ServerCnxnFactory.closeConn) { + // We check if write interest here because if it is NOT set, + // nothing is queued, so we can try to send the buffer right + // away without waking up the selector + if(sk.isValid() && + ((sk.interestOps() & SelectionKey.OP_WRITE) == 0)) { + try { + sock.write(bb); + } catch (IOException e) { + // we are just doing best effort right now } } + // if there is nothing left to send, we are done + if (bb.remaining() == 0) { + packetSent(); + return; + } + } - synchronized(this.factory){ - sk.selector().wakeup(); - if (LOG.isTraceEnabled()) { - LOG.trace("Add a buffer to outgoingBuffers, sk " + sk - + " is valid: " + sk.isValid()); - } - outgoingBuffers.add(bb); - if (sk.isValid()) { - sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); - } + synchronized(this.factory){ + sk.selector().wakeup(); + if (LOG.isTraceEnabled()) { + LOG.trace("Add a buffer to outgoingBuffers, sk " + sk + + " is valid: " + sk.isValid()); + } + outgoingBuffers.add(bb); + if (sk.isValid()) { + sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); } - - } catch(Exception e) { - LOG.error("Unexpected Exception: ", e); } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/4b9fe5ea/src/java/test/org/apache/zookeeper/server/NIOServerCnxnTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/NIOServerCnxnTest.java b/src/java/test/org/apache/zookeeper/server/NIOServerCnxnTest.java index 5c94ed7..bdee20f 100644 --- a/src/java/test/org/apache/zookeeper/server/NIOServerCnxnTest.java +++ b/src/java/test/org/apache/zookeeper/server/NIOServerCnxnTest.java @@ -18,14 +18,15 @@ package org.apache.zookeeper.server; import java.io.IOException; - -import junit.framework.Assert; +import java.nio.ByteBuffer; +import java.nio.channels.CancelledKeyException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.test.ClientBase; +import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,4 +69,38 @@ public class NIOServerCnxnTest extends ClientBase { zk.close(); } } + + /** + * Mock extension of NIOServerCnxn to test for + * CancelledKeyException (ZOOKEEPER-2044). + */ + private static class MockNIOServerCnxn extends NIOServerCnxn { + public MockNIOServerCnxn(NIOServerCnxn cnxn) + throws IOException { + super(cnxn.zkServer, cnxn.sock, cnxn.sk, cnxn.factory); + } + + public void mockSendBuffer(ByteBuffer bb) throws Exception { + super.internalSendBuffer(bb); + } + } + + @Test(timeout = 30000) + public void testValidSelectionKey() throws Exception { + final ZooKeeper zk = createZKClient(hostPort, 3000); + try { + Iterable<ServerCnxn> connections = serverFactory.getConnections(); + for (ServerCnxn serverCnxn : connections) { + MockNIOServerCnxn mock = new MockNIOServerCnxn((NIOServerCnxn) serverCnxn); + // Cancel key + ((NIOServerCnxn) serverCnxn).sock.keyFor(((NIOServerCnxnFactory) serverFactory).selector).cancel();; + mock.mockSendBuffer(ByteBuffer.allocate(8)); + } + } catch (CancelledKeyException e) { + LOG.error("Exception while sending bytes!", e); + Assert.fail(e.toString()); + } finally { + zk.close(); + } + } }
