Repository: zookeeper
Updated Branches:
  refs/heads/master 0b65e3d4c -> 95557a30e


ZOOKEEPER-3131: Remove watcher when session closed in NettyServerCnxn

Currently, it doesn't remove itself from ZK server when the cnxn is closed, 
which
will leak watchers, close it to make it align with NIO implementation.

Author: Fangmin Lyu <allen...@fb.com>

Reviewers: hanm, anmolnar, nkalmar

Closes #612 from lvfangmin/ZOOKEEPER-3131


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/95557a30
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/95557a30
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/95557a30

Branch: refs/heads/master
Commit: 95557a30edbdfdf4479a1cb142e0d82a4ba6061d
Parents: 0b65e3d
Author: Fangmin Lyu <allen...@fb.com>
Authored: Thu Sep 6 17:34:55 2018 -0700
Committer: Michael Han <h...@apache.org>
Committed: Thu Sep 6 17:34:55 2018 -0700

----------------------------------------------------------------------
 .../apache/zookeeper/server/NettyServerCnxn.java  | 18 +++++++++++-------
 .../zookeeper/server/NettyServerCnxnTest.java     |  9 +++++++--
 2 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/95557a30/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java 
b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
index ec808a6..948fb3a 100644
--- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -73,7 +73,7 @@ public class NettyServerCnxn extends ServerCnxn {
 
     NettyServerCnxnFactory factory;
     boolean initialized;
-    
+
     NettyServerCnxn(Channel channel, ZooKeeperServer zks, 
NettyServerCnxnFactory factory) {
         this.channel = channel;
         this.closingChannel = false;
@@ -83,11 +83,11 @@ public class NettyServerCnxn extends ServerCnxn {
             this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
         }
     }
-    
+
     @Override
     public void close() {
         closingChannel = true;
-        
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("close called for sessionid:0x"
                     + Long.toHexString(sessionId));
@@ -119,6 +119,10 @@ public class NettyServerCnxn extends ServerCnxn {
             }
         }
 
+        if (zkServer != null) {
+            zkServer.removeCnxn(this);
+        }
+
         if (channel.isOpen()) {
             // Since we don't check on the futures created by write calls to 
the channel complete we need to make sure
             // that all writes have been completed before closing the channel 
or we risk data loss
@@ -174,7 +178,7 @@ public class NettyServerCnxn extends ServerCnxn {
         @Override
         public ChannelFuture getFuture() {return null;}
     };
-    
+
     @Override
     public void sendResponse(ReplyHeader h, Record r, String tag)
             throws IOException {
@@ -226,7 +230,7 @@ public class NettyServerCnxn extends ServerCnxn {
      */
     private class SendBufferWriter extends Writer {
         private StringBuffer sb = new StringBuffer();
-        
+
         /**
          * Check if we are ready to send another chunk.
          * @param force force sending, even if not a full chunk
@@ -415,7 +419,7 @@ public class NettyServerCnxn extends ServerCnxn {
     public void disableRecv() {
         disableRecvNoWait().awaitUninterruptibly();
     }
-    
+
     private ChannelFuture disableRecvNoWait() {
         throttled = true;
         if (LOG.isDebugEnabled()) {
@@ -423,7 +427,7 @@ public class NettyServerCnxn extends ServerCnxn {
         }
         return channel.setReadable(false);
     }
-    
+
     @Override
     public long getOutstandingRequests() {
         return outstandingCount.longValue();

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/95557a30/src/java/test/org/apache/zookeeper/server/NettyServerCnxnTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/NettyServerCnxnTest.java 
b/src/java/test/org/apache/zookeeper/server/NettyServerCnxnTest.java
index 2038d8b..15f993c 100644
--- a/src/java/test/org/apache/zookeeper/server/NettyServerCnxnTest.java
+++ b/src/java/test/org/apache/zookeeper/server/NettyServerCnxnTest.java
@@ -56,7 +56,7 @@ public class NettyServerCnxnTest extends ClientBase {
      * servercnxnfactory should remove all channel references to avoid
      * duplicate channel closure. Duplicate closure may result in indefinite
      * hanging due to netty open issue.
-     * 
+     *
      * @see <a href="https://issues.jboss.org/browse/NETTY-412";>NETTY-412</a>
      */
     @Test(timeout = 40000)
@@ -66,13 +66,16 @@ public class NettyServerCnxnTest extends ClientBase {
                 serverFactory instanceof NettyServerCnxnFactory);
 
         final ZooKeeper zk = createClient();
+        final ZooKeeperServer zkServer = getServer(serverFactory);
         final String path = "/a";
         try {
             // make sure zkclient works
             zk.create(path, "test".getBytes(), Ids.OPEN_ACL_UNSAFE,
                     CreateMode.PERSISTENT);
+            // set on watch
             Assert.assertNotNull("Didn't create znode:" + path,
-                    zk.exists(path, false));
+                    zk.exists(path, true));
+            Assert.assertEquals(1, 
zkServer.getZKDatabase().getDataTree().getWatchCount());
             Iterable<ServerCnxn> connections = serverFactory.getConnections();
             Assert.assertEquals("Mismatch in number of live connections!", 1,
                     serverFactory.getNumAliveConnections());
@@ -88,6 +91,8 @@ public class NettyServerCnxnTest extends ClientBase {
                     Assert.fail("The number of live connections should be 0");
                 }
             }
+            // make sure the watch is removed when the connection closed
+            Assert.assertEquals(0, 
zkServer.getZKDatabase().getDataTree().getWatchCount());
         } finally {
             zk.close();
         }

Reply via email to