Repository: zookeeper
Updated Branches:
  refs/heads/branch-3.5 d7c26e33f -> 23962f123


ZOOKEEPER-2786: Flaky test: 
org.apache.zookeeper.test.ClientTest.testNonExistingOpCode

This is the third time this failure has popped up. This time it seemed to only 
impact tests run using NettyServerCnxnFactory (so it only impacts 3.5 and 
master) and I was able to get it to pop up with sufficient frequency when 
running the tests locally.

The issue is caused by improper handling of netty's futures. When we call 
`channel.write(wrappedBuffer(sendBuffer));` the write is completed 
asynchronously. The close call `channel.close();` is also asynchronous. So we 
can run into the case where the close occurs before a write.

This patch changes our close call to be a callback for the completion of an 
empty write. This way we are guaranteed that the channel has "drained" before a 
close.

My primary concern with this patch is the channel being used while it is 
closing (between the write of an empty buffer and the execution of the close 
callback). I have added a `closingChannel` boolean to track that, which I 
believe is sufficient. Let me know if anyone finds a situation where that is 
not the case.

Author: Abraham Fine <af...@apache.org>

Reviewers: Michael Han <h...@apache.org>

Closes #327 from afine/ZOOKEEPER-2786_third_times_a_charm


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/23962f12
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/23962f12
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/23962f12

Branch: refs/heads/branch-3.5
Commit: 23962f12395ada67e689b8ff57573fc1398a54eb
Parents: d7c26e3
Author: Abraham Fine <af...@apache.org>
Authored: Wed Aug 9 11:31:06 2017 -0700
Committer: Michael Han <h...@apache.org>
Committed: Wed Aug 9 11:31:06 2017 -0700

----------------------------------------------------------------------
 .../org/apache/zookeeper/server/NettyServerCnxn.java    | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/23962f12/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java 
b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
index 142e916..9ff12e9 100644
--- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -47,6 +47,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
 import org.jboss.netty.channel.MessageEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,6 +63,7 @@ public class NettyServerCnxn extends ServerCnxn {
     int sessionTimeout;
     AtomicLong outstandingCount = new AtomicLong();
     Certificate[] clientChain;
+    volatile boolean closingChannel;
 
     /** The ZooKeeperServer for this connection. May be null if the server
      * is not currently serving requests (for example if the server is not
@@ -74,6 +76,7 @@ public class NettyServerCnxn extends ServerCnxn {
     
     NettyServerCnxn(Channel channel, ZooKeeperServer zks, 
NettyServerCnxnFactory factory) {
         this.channel = channel;
+        this.closingChannel = false;
         this.zkServer = zks;
         this.factory = factory;
         if (this.factory.login != null) {
@@ -83,6 +86,8 @@ public class NettyServerCnxn extends ServerCnxn {
     
     @Override
     public void close() {
+        closingChannel = true;
+        
         if (LOG.isDebugEnabled()) {
             LOG.debug("close called for sessionid:0x"
                     + Long.toHexString(sessionId));
@@ -115,7 +120,10 @@ public class NettyServerCnxn extends ServerCnxn {
         }
 
         if (channel.isOpen()) {
-            channel.close();
+            // Since we don't check on the futures created by write calls to 
the channel complete we need to make sure
+            // that all writes have been completed before closing the channel 
or we risk data loss
+            // See: 
http://lists.jboss.org/pipermail/netty-users/2009-August/001122.html
+            
channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
         }
     }
 
@@ -171,7 +179,7 @@ public class NettyServerCnxn extends ServerCnxn {
     @Override
     public void sendResponse(ReplyHeader h, Record r, String tag)
             throws IOException {
-        if (!channel.isOpen()) {
+        if (closingChannel || !channel.isOpen()) {
             return;
         }
         ByteArrayOutputStream baos = new ByteArrayOutputStream();

Reply via email to