Repository: zookeeper
Updated Branches:
  refs/heads/branch-3.5 d7c26e33f -> 23962f123

ZOOKEEPER-2786: Flaky test: 

This is the third time this failure has popped up. This time it seemed to only 
impact tests run using NettyServerCnxnFactory (so it only impacts 3.5 and 
master) and I was able to get it to pop up with sufficient frequency when 
running the tests locally.

The issue is caused by improper handling of netty's futures. When we call 
`channel.write(wrappedBuffer(sendBuffer));` the write is completed 
asynchronously. The close call `channel.close();` is also asynchronous. So we 
can run into the case where the close occurs before a write.

This patch changes our close call to be a callback for the completion of an 
empty write. This way we are guaranteed that the channel has "drained" before a 

My primary concern with this patch is the channel being used while it is 
closing (between the write of an empty buffer and the execution of the close 
callback). I have added a `closingChannel` boolean to track that, which I 
believe is sufficient. Let me know if anyone finds a situation where that is 
not the case.

Author: Abraham Fine <>

Reviewers: Michael Han <>

Closes #327 from afine/ZOOKEEPER-2786_third_times_a_charm


Branch: refs/heads/branch-3.5
Commit: 23962f12395ada67e689b8ff57573fc1398a54eb
Parents: d7c26e3
Author: Abraham Fine <>
Authored: Wed Aug 9 11:31:06 2017 -0700
Committer: Michael Han <>
Committed: Wed Aug 9 11:31:06 2017 -0700

 .../org/apache/zookeeper/server/    | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
diff --git a/src/java/main/org/apache/zookeeper/server/ 
index 142e916..9ff12e9 100644
--- a/src/java/main/org/apache/zookeeper/server/
+++ b/src/java/main/org/apache/zookeeper/server/
@@ -47,6 +47,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,6 +63,7 @@ public class NettyServerCnxn extends ServerCnxn {
     int sessionTimeout;
     AtomicLong outstandingCount = new AtomicLong();
     Certificate[] clientChain;
+    volatile boolean closingChannel;
     /** The ZooKeeperServer for this connection. May be null if the server
      * is not currently serving requests (for example if the server is not
@@ -74,6 +76,7 @@ public class NettyServerCnxn extends ServerCnxn {
     NettyServerCnxn(Channel channel, ZooKeeperServer zks, 
NettyServerCnxnFactory factory) { = channel;
+        this.closingChannel = false;
         this.zkServer = zks;
         this.factory = factory;
         if (this.factory.login != null) {
@@ -83,6 +86,8 @@ public class NettyServerCnxn extends ServerCnxn {
     public void close() {
+        closingChannel = true;
         if (LOG.isDebugEnabled()) {
             LOG.debug("close called for sessionid:0x"
                     + Long.toHexString(sessionId));
@@ -115,7 +120,10 @@ public class NettyServerCnxn extends ServerCnxn {
         if (channel.isOpen()) {
-            channel.close();
+            // Since we don't check on the futures created by write calls to 
the channel complete we need to make sure
+            // that all writes have been completed before closing the channel 
or we risk data loss
+            // See:
@@ -171,7 +179,7 @@ public class NettyServerCnxn extends ServerCnxn {
     public void sendResponse(ReplyHeader h, Record r, String tag)
             throws IOException {
-        if (!channel.isOpen()) {
+        if (closingChannel || !channel.isOpen()) {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();

Reply via email to