Repository: zookeeper
Updated Branches:
  refs/heads/master 31501b8ab -> e104175bb


ZOOKEEPER-2786: Flaky test: 
org.apache.zookeeper.test.ClientTest.testNonExistingOpCode

This is the third time this failure has popped up. This time it seemed to only 
impact tests run using NettyServerCnxnFactory (so it only impacts 3.5 and 
master) and I was able to get it to pop up with sufficient frequency when 
running the tests locally.

The issue is caused by improper handling of netty's futures. When we call 
`channel.write(wrappedBuffer(sendBuffer));` the write is completed 
asynchronously. The close call `channel.close();` is also asynchronous. So we 
can run into the case where the close occurs before a write.

This patch changes our close call to be a callback for the completion of an 
empty write. This way we are guaranteed that the channel has "drained" before a 
close.

My primary concern with this patch is the channel being used while it is 
closing (between the write of an empty buffer and the execution of the close 
callback). I have added a `closingChannel` boolean to track that, which I 
believe is sufficient. Let me know if anyone finds a situation where that is 
not the case.

Author: Abraham Fine <af...@apache.org>

Reviewers: Michael Han <h...@apache.org>

Closes #327 from afine/ZOOKEEPER-2786_third_times_a_charm

(cherry picked from commit 23962f12395ada67e689b8ff57573fc1398a54eb)
Signed-off-by: Michael Han <h...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/e104175b
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/e104175b
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/e104175b

Branch: refs/heads/master
Commit: e104175bb47baeb800354078c015e78bfcb7c953
Parents: 31501b8
Author: Abraham Fine <af...@apache.org>
Authored: Wed Aug 9 11:31:06 2017 -0700
Committer: Michael Han <h...@apache.org>
Committed: Wed Aug 9 11:31:21 2017 -0700

----------------------------------------------------------------------
 .../org/apache/zookeeper/server/NettyServerCnxn.java    | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e104175b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java 
b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
index 142e916..9ff12e9 100644
--- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -47,6 +47,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
 import org.jboss.netty.channel.MessageEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,6 +63,7 @@ public class NettyServerCnxn extends ServerCnxn {
     int sessionTimeout;
     AtomicLong outstandingCount = new AtomicLong();
     Certificate[] clientChain;
+    volatile boolean closingChannel;
 
     /** The ZooKeeperServer for this connection. May be null if the server
      * is not currently serving requests (for example if the server is not
@@ -74,6 +76,7 @@ public class NettyServerCnxn extends ServerCnxn {
     
     NettyServerCnxn(Channel channel, ZooKeeperServer zks, 
NettyServerCnxnFactory factory) {
         this.channel = channel;
+        this.closingChannel = false;
         this.zkServer = zks;
         this.factory = factory;
         if (this.factory.login != null) {
@@ -83,6 +86,8 @@ public class NettyServerCnxn extends ServerCnxn {
     
     @Override
     public void close() {
+        closingChannel = true;
+        
         if (LOG.isDebugEnabled()) {
             LOG.debug("close called for sessionid:0x"
                     + Long.toHexString(sessionId));
@@ -115,7 +120,10 @@ public class NettyServerCnxn extends ServerCnxn {
         }
 
         if (channel.isOpen()) {
-            channel.close();
+            // Since we don't check on the futures created by write calls to 
the channel complete we need to make sure
+            // that all writes have been completed before closing the channel 
or we risk data loss
+            // See: 
http://lists.jboss.org/pipermail/netty-users/2009-August/001122.html
+            
channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
         }
     }
 
@@ -171,7 +179,7 @@ public class NettyServerCnxn extends ServerCnxn {
     @Override
     public void sendResponse(ReplyHeader h, Record r, String tag)
             throws IOException {
-        if (!channel.isOpen()) {
+        if (closingChannel || !channel.isOpen()) {
             return;
         }
         ByteArrayOutputStream baos = new ByteArrayOutputStream();

Reply via email to