Repository: zookeeper Updated Branches: refs/heads/master 31501b8ab -> e104175bb
ZOOKEEPER-2786: Flaky test: org.apache.zookeeper.test.ClientTest.testNonExistingOpCode This is the third time this failure has popped up. This time it seemed to only impact tests run using NettyServerCnxnFactory (so it only impacts 3.5 and master) and I was able to get it to pop up with sufficient frequency when running the tests locally. The issue is caused by improper handling of netty's futures. When we call `channel.write(wrappedBuffer(sendBuffer));` the write is completed asynchronously. The close call `channel.close();` is also asynchronous. So we can run into the case where the close occurs before a write. This patch changes our close call to be a callback for the completion of an empty write. This way we are guaranteed that the channel has "drained" before a close. My primary concern with this patch is the channel being used while it is closing (between the write of an empty buffer and the execution of the close callback). I have added a `closingChannel` boolean to track that, which I believe is sufficient. Let me know if anyone finds a situation where that is not the case. Author: Abraham Fine <af...@apache.org> Reviewers: Michael Han <h...@apache.org> Closes #327 from afine/ZOOKEEPER-2786_third_times_a_charm (cherry picked from commit 23962f12395ada67e689b8ff57573fc1398a54eb) Signed-off-by: Michael Han <h...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/e104175b Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/e104175b Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/e104175b Branch: refs/heads/master Commit: e104175bb47baeb800354078c015e78bfcb7c953 Parents: 31501b8 Author: Abraham Fine <af...@apache.org> Authored: Wed Aug 9 11:31:06 2017 -0700 Committer: Michael Han <h...@apache.org> Committed: Wed Aug 9 11:31:21 2017 -0700 ---------------------------------------------------------------------- .../org/apache/zookeeper/server/NettyServerCnxn.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e104175b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java index 142e916..9ff12e9 100644 --- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java @@ -47,6 +47,7 @@ import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.MessageEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +63,7 @@ public class NettyServerCnxn extends ServerCnxn { int sessionTimeout; AtomicLong outstandingCount = new AtomicLong(); Certificate[] clientChain; + volatile boolean closingChannel; /** The ZooKeeperServer for this connection. May be null if the server * is not currently serving requests (for example if the server is not @@ -74,6 +76,7 @@ public class NettyServerCnxn extends ServerCnxn { NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) { this.channel = channel; + this.closingChannel = false; this.zkServer = zks; this.factory = factory; if (this.factory.login != null) { @@ -83,6 +86,8 @@ public class NettyServerCnxn extends ServerCnxn { @Override public void close() { + closingChannel = true; + if (LOG.isDebugEnabled()) { LOG.debug("close called for sessionid:0x" + Long.toHexString(sessionId)); @@ -115,7 +120,10 @@ public class NettyServerCnxn extends ServerCnxn { } if (channel.isOpen()) { - channel.close(); + // Since we don't check on the futures created by write calls to the channel complete we need to make sure + // that all writes have been completed before closing the channel or we risk data loss + // See: http://lists.jboss.org/pipermail/netty-users/2009-August/001122.html + channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } } @@ -171,7 +179,7 @@ public class NettyServerCnxn extends ServerCnxn { @Override public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { - if (!channel.isOpen()) { + if (closingChannel || !channel.isOpen()) { return; } ByteArrayOutputStream baos = new ByteArrayOutputStream();