This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new a2a09ce ZOOKEEPER-3131: Remove watcher when session closed in
NettyServerCnxn
a2a09ce is described below
commit a2a09ce21d9745ad686b612555fb97d1adafc57b
Author: Fangmin Lyu <[email protected]>
AuthorDate: Wed Feb 6 11:18:31 2019 +0100
ZOOKEEPER-3131: Remove watcher when session closed in NettyServerCnxn
Backport of
https://github.com/apache/zookeeper/commit/95557a30edbdfdf4479a1cb142e0d82a4ba6061d
Author: Fangmin Lyu <[email protected]>
Reviewers: [email protected]
Closes #804 from anmolnar/ZOOKEEPER-3131_35
---
.../org/apache/zookeeper/server/NettyServerCnxn.java | 18 +++++++++++-------
.../apache/zookeeper/server/NettyServerCnxnTest.java | 9 +++++++--
2 files changed, 18 insertions(+), 9 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
index ec808a6..948fb3a 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -73,7 +73,7 @@ public class NettyServerCnxn extends ServerCnxn {
NettyServerCnxnFactory factory;
boolean initialized;
-
+
NettyServerCnxn(Channel channel, ZooKeeperServer zks,
NettyServerCnxnFactory factory) {
this.channel = channel;
this.closingChannel = false;
@@ -83,11 +83,11 @@ public class NettyServerCnxn extends ServerCnxn {
this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
}
}
-
+
@Override
public void close() {
closingChannel = true;
-
+
if (LOG.isDebugEnabled()) {
LOG.debug("close called for sessionid:0x"
+ Long.toHexString(sessionId));
@@ -119,6 +119,10 @@ public class NettyServerCnxn extends ServerCnxn {
}
}
+ if (zkServer != null) {
+ zkServer.removeCnxn(this);
+ }
+
if (channel.isOpen()) {
// Since we don't check on the futures created by write calls to
the channel complete we need to make sure
// that all writes have been completed before closing the channel
or we risk data loss
@@ -174,7 +178,7 @@ public class NettyServerCnxn extends ServerCnxn {
@Override
public ChannelFuture getFuture() {return null;}
};
-
+
@Override
public void sendResponse(ReplyHeader h, Record r, String tag)
throws IOException {
@@ -226,7 +230,7 @@ public class NettyServerCnxn extends ServerCnxn {
*/
private class SendBufferWriter extends Writer {
private StringBuffer sb = new StringBuffer();
-
+
/**
* Check if we are ready to send another chunk.
* @param force force sending, even if not a full chunk
@@ -415,7 +419,7 @@ public class NettyServerCnxn extends ServerCnxn {
public void disableRecv() {
disableRecvNoWait().awaitUninterruptibly();
}
-
+
private ChannelFuture disableRecvNoWait() {
throttled = true;
if (LOG.isDebugEnabled()) {
@@ -423,7 +427,7 @@ public class NettyServerCnxn extends ServerCnxn {
}
return channel.setReadable(false);
}
-
+
@Override
public long getOutstandingRequests() {
return outstandingCount.longValue();
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
index 2038d8b..15f993c 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
@@ -56,7 +56,7 @@ public class NettyServerCnxnTest extends ClientBase {
* servercnxnfactory should remove all channel references to avoid
* duplicate channel closure. Duplicate closure may result in indefinite
* hanging due to netty open issue.
- *
+ *
* @see <a href="https://issues.jboss.org/browse/NETTY-412">NETTY-412</a>
*/
@Test(timeout = 40000)
@@ -66,13 +66,16 @@ public class NettyServerCnxnTest extends ClientBase {
serverFactory instanceof NettyServerCnxnFactory);
final ZooKeeper zk = createClient();
+ final ZooKeeperServer zkServer = getServer(serverFactory);
final String path = "/a";
try {
// make sure zkclient works
zk.create(path, "test".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ // set on watch
Assert.assertNotNull("Didn't create znode:" + path,
- zk.exists(path, false));
+ zk.exists(path, true));
+ Assert.assertEquals(1,
zkServer.getZKDatabase().getDataTree().getWatchCount());
Iterable<ServerCnxn> connections = serverFactory.getConnections();
Assert.assertEquals("Mismatch in number of live connections!", 1,
serverFactory.getNumAliveConnections());
@@ -88,6 +91,8 @@ public class NettyServerCnxnTest extends ClientBase {
Assert.fail("The number of live connections should be 0");
}
}
+ // make sure the watch is removed when the connection closed
+ Assert.assertEquals(0,
zkServer.getZKDatabase().getDataTree().getWatchCount());
} finally {
zk.close();
}