Repository: zookeeper Updated Branches: refs/heads/master cb9f303bd -> 1ce2ca810
ZOOKEEPER-3163: Use session map in the Netty to improve close session performance This is a refactor to make the Netty able to use the same closeSession logic in NIOServerCnxn, which is more efficient with the sessionMap. Rely on the existing tests for the refactor work here. Author: Fangmin Lyu <allen...@fb.com> Reviewers: Tamás Pénzes, Enrico Olivelli, maoling, Michael Han Closes #665 from lvfangmin/ZOOKEEPER-3163 Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/1ce2ca81 Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/1ce2ca81 Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/1ce2ca81 Branch: refs/heads/master Commit: 1ce2ca8107438d283581d18d064a25bd6b74adf7 Parents: cb9f303 Author: Fangmin Lyu <allen...@fb.com> Authored: Tue Oct 23 19:03:01 2018 -0700 Committer: Michael Han <h...@apache.org> Committed: Tue Oct 23 19:03:01 2018 -0700 ---------------------------------------------------------------------- .../zookeeper/server/NIOServerCnxnFactory.java | 22 +----------- .../zookeeper/server/NettyServerCnxn.java | 3 ++ .../server/NettyServerCnxnFactory.java | 18 ---------- .../zookeeper/server/ServerCnxnFactory.java | 38 ++++++++++++++++---- 4 files changed, 36 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/1ce2ca81/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java index d1c4137..4300d10 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java @@ -604,9 +604,6 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory { return directBufferBytes > 0 ? directBuffer.get() : null; } - // sessionMap is used by closeSession() - private final ConcurrentHashMap<Long, NIOServerCnxn> sessionMap = - new ConcurrentHashMap<Long, NIOServerCnxn>(); // ipMap is used to limit connections per IP private final ConcurrentHashMap<InetAddress, Set<NIOServerCnxn>> ipMap = new ConcurrentHashMap<InetAddress, Set<NIOServerCnxn>>( ); @@ -787,10 +784,7 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory { } cnxnExpiryQueue.remove(cnxn); - long sessionId = cnxn.getSessionId(); - if (sessionId != 0) { - sessionMap.remove(sessionId); - } + removeCnxnFromSessionMap(cnxn); InetAddress addr = cnxn.getSocketAddress(); if (addr != null) { @@ -922,20 +916,6 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory { } } - public void addSession(long sessionId, NIOServerCnxn cnxn) { - sessionMap.put(sessionId, cnxn); - } - - @Override - public boolean closeSession(long sessionId) { - NIOServerCnxn cnxn = sessionMap.remove(sessionId); - if (cnxn != null) { - cnxn.close(); - return true; - } - return false; - } - @Override public void join() throws InterruptedException { if (acceptThread != null) { http://git-wip-us.apache.org/repos/asf/zookeeper/blob/1ce2ca81/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java index 0b27724..ab3fd09 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java @@ -112,6 +112,8 @@ public class NettyServerCnxn extends ServerCnxn { + Long.toHexString(sessionId)); } + factory.removeCnxnFromSessionMap(this); + synchronized (factory.ipMap) { Set<NettyServerCnxn> s = factory.ipMap.get(((InetSocketAddress)channel @@ -198,6 +200,7 @@ public class NettyServerCnxn extends ServerCnxn { @Override public void setSessionId(long sessionId) { this.sessionId = sessionId; + factory.addSession(sessionId, this); } @Override http://git-wip-us.apache.org/repos/asf/zookeeper/blob/1ce2ca81/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java index 547d291..d3abf38 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java @@ -416,24 +416,6 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory { } @Override - public boolean closeSession(long sessionId) { - if (LOG.isDebugEnabled()) { - LOG.debug("closeSession sessionid:0x" + sessionId); - } - for (ServerCnxn cnxn : cnxns) { - if (cnxn.getSessionId() == sessionId) { - try { - cnxn.close(); - } catch (Exception e) { - LOG.warn("exception during session close", e); - } - return true; - } - } - return false; - } - - @Override public void configure(InetSocketAddress addr, int maxClientCnxns, boolean secure) throws IOException { http://git-wip-us.apache.org/repos/asf/zookeeper/blob/1ce2ca81/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxnFactory.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxnFactory.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxnFactory.java index dbe47a2..bbe6ece 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxnFactory.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxnFactory.java @@ -54,6 +54,38 @@ public abstract class ServerCnxnFactory { */ static final ByteBuffer closeConn = ByteBuffer.allocate(0); + // sessionMap is used by closeSession() + final ConcurrentHashMap<Long, ServerCnxn> sessionMap = + new ConcurrentHashMap<Long, ServerCnxn>(); + + public void addSession(long sessionId, ServerCnxn cnxn) { + sessionMap.put(sessionId, cnxn); + } + + public void removeCnxnFromSessionMap(ServerCnxn cnxn) { + long sessionId = cnxn.getSessionId(); + if (sessionId != 0) { + sessionMap.remove(sessionId); + } + } + + /** + * @return true if the cnxn that contains the sessionId exists in this ServerCnxnFactory + * and it's closed. Otherwise false. + */ + public boolean closeSession(long sessionId) { + ServerCnxn cnxn = sessionMap.remove(sessionId); + if (cnxn != null) { + try { + cnxn.close(); + } catch (Exception e) { + LOG.warn("exception during session close", e); + } + return true; + } + return false; + } + public abstract int getLocalPort(); public abstract Iterable<ServerCnxn> getConnections(); @@ -66,12 +98,6 @@ public abstract class ServerCnxnFactory { return zkServer; } - /** - * @return true if the cnxn that contains the sessionId exists in this ServerCnxnFactory - * and it's closed. Otherwise false. - */ - public abstract boolean closeSession(long sessionId); - public void configure(InetSocketAddress addr, int maxcc) throws IOException { configure(addr, maxcc, false); }