This is an automated email from the ASF dual-hosted git repository.
fangmin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new d8db889 ZOOKEEPER-3326: Add session/connection related metrics
d8db889 is described below
commit d8db88914e7af7c11a2588be202d90b925791d31
Author: Jie Huang <[email protected]>
AuthorDate: Mon Apr 22 10:15:22 2019 -0700
ZOOKEEPER-3326: Add session/connection related metrics
Author: Jie Huang <[email protected]>
Reviewers: [email protected], [email protected]
Closes #861 from jhuan31/ZOOKEEPER-3326
---
.../org/apache/zookeeper/server/NIOServerCnxn.java | 2 +
.../zookeeper/server/NIOServerCnxnFactory.java | 1 +
.../apache/zookeeper/server/NettyServerCnxn.java | 1 +
.../org/apache/zookeeper/server/ServerMetrics.java | 18 ++
.../zookeeper/server/SessionTrackerImpl.java | 1 +
.../apache/zookeeper/server/ZooKeeperServer.java | 2 +
.../zookeeper/server/quorum/LearnerHandler.java | 1 +
.../zookeeper/server/ConnectionMetricsTest.java | 239 +++++++++++++++++++++
8 files changed, 265 insertions(+)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
index c47267c..c8ee076 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
@@ -155,6 +155,7 @@ public class NIOServerCnxn extends ServerCnxn {
if (incomingBuffer.remaining() != 0) { // have we read length bytes?
int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
if (rc < 0) {
+ ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1);
throw new EndOfStreamException(
"Unable to read additional data from client sessionid
0x"
+ Long.toHexString(sessionId)
@@ -314,6 +315,7 @@ public class NIOServerCnxn extends ServerCnxn {
if (k.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
+ ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1);
throw new EndOfStreamException(
"Unable to read additional data from client
sessionid 0x"
+ Long.toHexString(sessionId)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java
index 23ec3e6..289c7a2 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java
@@ -576,6 +576,7 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory
{
continue;
}
for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) {
+
ServerMetrics.getMetrics().SESSIONLESS_CONNECTIONS_EXPIRED.add(1);
conn.close();
}
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
index 571c88a..50cd674 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -124,6 +124,7 @@ public class NettyServerCnxn extends ServerCnxn {
}
});
} else {
+ ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1);
channel.eventLoop().execute(this::releaseQueuedBuffer);
}
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index cdd57f0..5fefdfc 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -122,6 +122,15 @@ public final class ServerMetrics {
OUTSTANDING_CHANGES_REMOVED =
metricsContext.getCounter("outstanding_changes_removed");
PREP_PROCESS_TIME = metricsContext.getSummary("prep_process_time",
DetailLevel.BASIC);
CLOSE_SESSION_PREP_TIME =
metricsContext.getSummary("close_session_prep_time", DetailLevel.ADVANCED);
+
+ REVALIDATE_COUNT = metricsContext.getCounter("revalidate_count");
+ CONNECTION_DROP_COUNT =
metricsContext.getCounter("connection_drop_count");
+ CONNECTION_REVALIDATE_COUNT =
metricsContext.getCounter("connection_revalidate_count");
+
+ // Expiry queue stats
+ SESSIONLESS_CONNECTIONS_EXPIRED =
metricsContext.getCounter("sessionless_connections_expired");
+ STALE_SESSIONS_EXPIRED =
metricsContext.getCounter("stale_sessions_expired");
+
}
/**
@@ -167,6 +176,15 @@ public final class ServerMetrics {
public final Counter SNAP_COUNT;
public final Counter COMMIT_COUNT;
public final Counter CONNECTION_REQUEST_COUNT;
+
+ public final Counter REVALIDATE_COUNT;
+ public final Counter CONNECTION_DROP_COUNT;
+ public final Counter CONNECTION_REVALIDATE_COUNT;
+
+ // Expiry queue stats
+ public final Counter SESSIONLESS_CONNECTIONS_EXPIRED;
+ public final Counter STALE_SESSIONS_EXPIRED;
+
// Connection throttling related
public final Summary CONNECTION_TOKEN_DEFICIT;
public final Counter CONNECTION_REJECTED;
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java
index 0699620..a984f20 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java
@@ -153,6 +153,7 @@ public class SessionTrackerImpl extends
ZooKeeperCriticalThread implements
}
for (SessionImpl s : sessionExpiryQueue.poll()) {
+ ServerMetrics.getMetrics().STALE_SESSIONS_EXPIRED.add(1);
setSessionClosing(s.sessionId);
expirer.expire(s);
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 2aec4ec..262df5f 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -1180,6 +1180,8 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
}
cnxn.setSessionId(sessionId);
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
+ ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);
+
}
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
index 0f953c8..539f579 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -582,6 +582,7 @@ public class LearnerHandler extends ZooKeeperThread {
}
break;
case Leader.REVALIDATE:
+ ServerMetrics.getMetrics().REVALIDATE_COUNT.add(1);
learnerMaster.revalidateSession(qp, this);
break;
case Leader.REQUEST:
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ConnectionMetricsTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ConnectionMetricsTest.java
new file mode 100644
index 0000000..d6ade36
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ConnectionMetricsTest.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoop;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.metrics.MetricsUtils;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.QuorumUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static
org.apache.zookeeper.server.NIOServerCnxnFactory.ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConnectionMetricsTest extends ZKTestCase {
+ protected static final Logger LOG =
LoggerFactory.getLogger(ConnectionMetricsTest.class);
+
+ @Test
+ public void testRevalidateCount() throws Exception {
+ ServerMetrics.getMetrics().resetAll();
+ QuorumUtil util = new QuorumUtil(1); // create a quorum of 3 servers
+ // disable local session to make sure we create a global session
+ util.enableLocalSession(false);
+ util.startAll();
+
+ int follower1 = (int)util.getFollowerQuorumPeers().get(0).getId();
+ int follower2 = (int)util.getFollowerQuorumPeers().get(1).getId();
+ LOG.info("connecting to server: {}", follower1);
+ ClientBase.CountdownWatcher watcher = new
ClientBase.CountdownWatcher();
+ // create a connection to follower
+ ZooKeeper zk = new
ZooKeeper(util.getConnectionStringForServer(follower1),
ClientBase.CONNECTION_TIMEOUT, watcher);
+ watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+ LOG.info("connected");
+
+ // update the connection to allow to connect to the other follower
+ zk.updateServerList(util.getConnectionStringForServer(follower2));
+
+ // follower is shut down and zk should be disconnected
+ util.shutdown(follower1);
+ watcher.waitForDisconnected(ClientBase.CONNECTION_TIMEOUT);
+ LOG.info("disconnected");
+ // should reconnect to another follower, will ask leader to revalidate
+ watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+ LOG.info("reconnected");
+
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+ Assert.assertEquals(1L, values.get("connection_revalidate_count"));
+ Assert.assertEquals(1L, values.get("revalidate_count"));
+
+ zk.close();
+ util.shutdownAll();
+ }
+
+
+ private class MockNIOServerCnxn extends NIOServerCnxn {
+ public MockNIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
+ SelectionKey sk, NIOServerCnxnFactory factory,
+ NIOServerCnxnFactory.SelectorThread
selectorThread) throws IOException {
+ super(zk, sock, sk, factory, selectorThread);
+ }
+
+ @Override
+ protected boolean isSocketOpen() {
+ return true;
+ }
+ }
+
+ private static class FakeSK extends SelectionKey {
+
+ @Override
+ public SelectableChannel channel() {
+ return null;
+ }
+
+ @Override
+ public Selector selector() {
+ return mock(Selector.class);
+ }
+
+ @Override
+ public boolean isValid() {
+ return true;
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ @Override
+ public int interestOps() {
+ return ops;
+ }
+
+ private int ops = OP_WRITE + OP_READ;
+
+ @Override
+ public SelectionKey interestOps(int ops) {
+ this.ops = ops;
+ return this;
+ }
+
+ @Override
+ public int readyOps() {
+ return ops;
+ }
+
+ }
+
+ private NIOServerCnxn createMockNIOCnxn() throws IOException {
+ InetSocketAddress socketAddr = new InetSocketAddress(80);
+ Socket socket = mock(Socket.class);
+ when(socket.getRemoteSocketAddress()).thenReturn(socketAddr);
+ SocketChannel sock = mock(SocketChannel.class);
+ when(sock.socket()).thenReturn(socket);
+ when(sock.read(any(ByteBuffer.class))).thenReturn(-1);
+
+ return new MockNIOServerCnxn(mock(ZooKeeperServer.class), sock, null,
mock(NIOServerCnxnFactory.class), null);
+ }
+
+ @Test
+ public void testNIOConnectionDropCount() throws Exception {
+ ServerMetrics.getMetrics().resetAll();
+
+ NIOServerCnxn cnxn = createMockNIOCnxn();
+ cnxn.doIO(new FakeSK());
+
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+ Assert.assertEquals(1L, values.get("connection_drop_count"));
+ }
+
+ @Test
+ public void testNettyConnectionDropCount() throws Exception {
+ InetSocketAddress socketAddr = new InetSocketAddress(80);
+ Channel channel = mock(Channel.class);
+ when(channel.isOpen()).thenReturn(false);
+ when(channel.remoteAddress()).thenReturn(socketAddr);
+ EventLoop eventLoop = mock(EventLoop.class);
+ when(channel.eventLoop()).thenReturn(eventLoop);
+
+ ServerMetrics.getMetrics().resetAll();
+
+ NettyServerCnxnFactory factory = new NettyServerCnxnFactory();
+ NettyServerCnxn cnxn = new NettyServerCnxn(channel,
mock(ZooKeeperServer.class), factory);
+
+ // pretend it's connected
+ factory.cnxns.add(cnxn);
+ cnxn.close();
+
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+ Assert.assertEquals(1L, values.get("connection_drop_count"));
+ }
+
+ @Test
+ public void testSessionlessConnectionsExpired() throws Exception {
+ ServerCnxnFactory factory = new NIOServerCnxnFactory();
+ factory.configure(new InetSocketAddress(PortAssignment.unique()),
1000);
+ factory.start();
+ int timeout = Integer.getInteger(
+ ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
+
+ ServerMetrics.getMetrics().resetAll();
+ // add two connections w/o touching them so they will expire
+ ((NIOServerCnxnFactory) factory).touchCnxn(createMockNIOCnxn());
+ ((NIOServerCnxnFactory) factory).touchCnxn(createMockNIOCnxn());
+
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+ int sleptTime = 0;
+ while (values.get("sessionless_connections_expired") == null ||
sleptTime < 2*timeout){
+ Thread.sleep(100);
+ sleptTime += 100;
+ values = MetricsUtils.currentServerMetrics();
+ }
+
+ Assert.assertEquals(2L, values.get("sessionless_connections_expired"));
+
+ factory.shutdown();
+ }
+
+ @Test
+ public void testStaleSessionsExpired() throws Exception {
+ int tickTime = 1000;
+ SessionTrackerImpl tracker = new
SessionTrackerImpl(mock(ZooKeeperServer.class),
+ new ConcurrentHashMap<>(), tickTime, 1L, null);
+
+ tracker.sessionsById.put(1L,
mock(SessionTrackerImpl.SessionImpl.class));
+ tracker.sessionsById.put(2L,
mock(SessionTrackerImpl.SessionImpl.class));
+
+ tracker.touchSession(1L, tickTime);
+ tracker.touchSession(2L, tickTime);
+
+ ServerMetrics.getMetrics().resetAll();
+
+ tracker.start();
+
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+ int sleptTime = 0;
+ while (values.get("stale_sessions_expired") == null || sleptTime <
2*tickTime) {
+ Thread.sleep(100);
+ sleptTime += 100;
+ values = MetricsUtils.currentServerMetrics();
+ }
+
+ Assert.assertEquals(2L, values.get("stale_sessions_expired"));
+
+ tracker.shutdown();
+ }
+}