This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new a2a09ce  ZOOKEEPER-3131: Remove watcher when session closed in 
NettyServerCnxn
a2a09ce is described below

commit a2a09ce21d9745ad686b612555fb97d1adafc57b
Author: Fangmin Lyu <[email protected]>
AuthorDate: Wed Feb 6 11:18:31 2019 +0100

    ZOOKEEPER-3131: Remove watcher when session closed in NettyServerCnxn
    
    Backport of 
https://github.com/apache/zookeeper/commit/95557a30edbdfdf4479a1cb142e0d82a4ba6061d
    
    Author: Fangmin Lyu <[email protected]>
    
    Reviewers: [email protected]
    
    Closes #804 from anmolnar/ZOOKEEPER-3131_35
---
 .../org/apache/zookeeper/server/NettyServerCnxn.java   | 18 +++++++++++-------
 .../apache/zookeeper/server/NettyServerCnxnTest.java   |  9 +++++++--
 2 files changed, 18 insertions(+), 9 deletions(-)

diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
index ec808a6..948fb3a 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -73,7 +73,7 @@ public class NettyServerCnxn extends ServerCnxn {
 
     NettyServerCnxnFactory factory;
     boolean initialized;
-    
+
     NettyServerCnxn(Channel channel, ZooKeeperServer zks, 
NettyServerCnxnFactory factory) {
         this.channel = channel;
         this.closingChannel = false;
@@ -83,11 +83,11 @@ public class NettyServerCnxn extends ServerCnxn {
             this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
         }
     }
-    
+
     @Override
     public void close() {
         closingChannel = true;
-        
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("close called for sessionid:0x"
                     + Long.toHexString(sessionId));
@@ -119,6 +119,10 @@ public class NettyServerCnxn extends ServerCnxn {
             }
         }
 
+        if (zkServer != null) {
+            zkServer.removeCnxn(this);
+        }
+
         if (channel.isOpen()) {
             // Since we don't check on the futures created by write calls to 
the channel complete we need to make sure
             // that all writes have been completed before closing the channel 
or we risk data loss
@@ -174,7 +178,7 @@ public class NettyServerCnxn extends ServerCnxn {
         @Override
         public ChannelFuture getFuture() {return null;}
     };
-    
+
     @Override
     public void sendResponse(ReplyHeader h, Record r, String tag)
             throws IOException {
@@ -226,7 +230,7 @@ public class NettyServerCnxn extends ServerCnxn {
      */
     private class SendBufferWriter extends Writer {
         private StringBuffer sb = new StringBuffer();
-        
+
         /**
          * Check if we are ready to send another chunk.
          * @param force force sending, even if not a full chunk
@@ -415,7 +419,7 @@ public class NettyServerCnxn extends ServerCnxn {
     public void disableRecv() {
         disableRecvNoWait().awaitUninterruptibly();
     }
-    
+
     private ChannelFuture disableRecvNoWait() {
         throttled = true;
         if (LOG.isDebugEnabled()) {
@@ -423,7 +427,7 @@ public class NettyServerCnxn extends ServerCnxn {
         }
         return channel.setReadable(false);
     }
-    
+
     @Override
     public long getOutstandingRequests() {
         return outstandingCount.longValue();
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
index 2038d8b..15f993c 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
@@ -56,7 +56,7 @@ public class NettyServerCnxnTest extends ClientBase {
      * servercnxnfactory should remove all channel references to avoid
      * duplicate channel closure. Duplicate closure may result in indefinite
      * hanging due to netty open issue.
-     * 
+     *
      * @see <a href="https://issues.jboss.org/browse/NETTY-412";>NETTY-412</a>
      */
     @Test(timeout = 40000)
@@ -66,13 +66,16 @@ public class NettyServerCnxnTest extends ClientBase {
                 serverFactory instanceof NettyServerCnxnFactory);
 
         final ZooKeeper zk = createClient();
+        final ZooKeeperServer zkServer = getServer(serverFactory);
         final String path = "/a";
         try {
             // make sure zkclient works
             zk.create(path, "test".getBytes(), Ids.OPEN_ACL_UNSAFE,
                     CreateMode.PERSISTENT);
+            // set on watch
             Assert.assertNotNull("Didn't create znode:" + path,
-                    zk.exists(path, false));
+                    zk.exists(path, true));
+            Assert.assertEquals(1, 
zkServer.getZKDatabase().getDataTree().getWatchCount());
             Iterable<ServerCnxn> connections = serverFactory.getConnections();
             Assert.assertEquals("Mismatch in number of live connections!", 1,
                     serverFactory.getNumAliveConnections());
@@ -88,6 +91,8 @@ public class NettyServerCnxnTest extends ClientBase {
                     Assert.fail("The number of live connections should be 0");
                 }
             }
+            // make sure the watch is removed when the connection closed
+            Assert.assertEquals(0, 
zkServer.getZKDatabase().getDataTree().getWatchCount());
         } finally {
             zk.close();
         }

Reply via email to