This is an automated email from the ASF dual-hosted git repository.
hanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 02c203f ZOOKEEPER-3421: Better insight into Observer connections
02c203f is described below
commit 02c203fa486ac24553d9584768fe97960b6dec24
Author: Brian Nixon <[email protected]>
AuthorDate: Fri Jun 28 15:33:38 2019 -0700
ZOOKEEPER-3421: Better insight into Observer connections
Author: Brian Nixon <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Michael Han
<[email protected]>
Closes #978 from enixon/observer-cmds-and-metrics
---
.../org/apache/zookeeper/server/ServerMetrics.java | 10 ++++
.../apache/zookeeper/server/admin/Commands.java | 64 ++++++++++++++++++++++
.../apache/zookeeper/server/quorum/Follower.java | 20 ++++++-
.../org/apache/zookeeper/server/quorum/Leader.java | 18 ++++++
.../zookeeper/server/quorum/LearnerHandler.java | 56 +++++++++++++++++++
.../zookeeper/server/quorum/ObserverMaster.java | 16 ++++++
.../zookeeper/server/admin/CommandsTest.java | 12 ++++
7 files changed, 195 insertions(+), 1 deletion(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index 62e98bf..366770b 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -179,6 +179,11 @@ public final class ServerMetrics {
COMMIT_PROCESS_TIME = metricsContext.getSummary("commit_process_time",
DetailLevel.BASIC);
+ /**
+ * Observer Master processing metrics.
+ */
+ OM_PROPOSAL_PROCESS_TIME =
metricsContext.getSummary("om_proposal_process_time_ms", DetailLevel.ADVANCED);
+ OM_COMMIT_PROCESS_TIME =
metricsContext.getSummary("om_commit_process_time_ms", DetailLevel.ADVANCED);
/**
* Time spent by the final processor. This is tracked in the commit
processor.
@@ -382,6 +387,11 @@ public final class ServerMetrics {
public final Summary COMMIT_PROCESS_TIME;
+ /**
+ * Observer Master processing metrics.
+ */
+ public final Summary OM_PROPOSAL_PROCESS_TIME;
+ public final Summary OM_COMMIT_PROCESS_TIME;
/**
* Time spent by the final processor. This is tracked in the commit
processor.
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
index 1c381bf..428910f 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
@@ -38,6 +38,8 @@ import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.persistence.SnapshotInfo;
+import org.apache.zookeeper.server.quorum.Follower;
+import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
import org.apache.zookeeper.server.quorum.QuorumPeer;
@@ -126,11 +128,13 @@ public class Commands {
registerCommand(new LastSnapshotCommand());
registerCommand(new LeaderCommand());
registerCommand(new MonitorCommand());
+ registerCommand(new ObserverCnxnStatResetCommand());
registerCommand(new RuokCommand());
registerCommand(new SetTraceMaskCommand());
registerCommand(new SrvrCommand());
registerCommand(new StatCommand());
registerCommand(new StatResetCommand());
+ registerCommand(new SyncedObserverConsCommand());
registerCommand(new WatchCommand());
registerCommand(new WatchesByPathCommand());
registerCommand(new WatchSummaryCommand());
@@ -382,6 +386,28 @@ public class Commands {
}}
/**
+ * Reset all observer connection statistics.
+ */
+ public static class ObserverCnxnStatResetCommand extends CommandBase {
+ public ObserverCnxnStatResetCommand() {
+ super(Arrays.asList("observer_connection_stat_reset", "orst"));
+ }
+
+ @Override
+ public CommandResponse run(ZooKeeperServer zkServer, Map<String,
String> kwargs) {
+ CommandResponse response = initializeResponse();
+ if (zkServer instanceof LeaderZooKeeperServer) {
+ Leader leader = ((LeaderZooKeeperServer) zkServer).getLeader();
+ leader.resetObserverConnectionStats();
+ } else if (zkServer instanceof FollowerZooKeeperServer) {
+ Follower follower = ((FollowerZooKeeperServer)
zkServer).getFollower();
+ follower.resetObserverConnectionStats();
+ }
+ return response;
+ }
+ }
+
+ /**
* No-op command, check if the server is running
*/
public static class RuokCommand extends CommandBase {
@@ -497,6 +523,44 @@ public class Commands {
}
/**
+ * Information on observer connections to server. Returned Map contains:
+ * - "synced_observers": Integer (leader/follower only)
+ * - "observers": list of observer learner handler info objects
(leader/follower only)
+ * @see
org.apache.zookeeper.server.quorum.LearnerHandler#getLearnerHandlerInfo()
+ */
+ public static class SyncedObserverConsCommand extends CommandBase {
+ public SyncedObserverConsCommand() {
+ super(Arrays.asList("observers", "obsr"));
+ }
+
+ @Override
+ public CommandResponse run(ZooKeeperServer zkServer, Map<String,
String> kwargs) {
+
+ CommandResponse response = initializeResponse();
+
+ if (zkServer instanceof LeaderZooKeeperServer) {
+ Leader leader = ((LeaderZooKeeperServer) zkServer).getLeader();
+
+ response.put("synced_observers",
leader.getObservingLearners().size());
+ response.put("observers", leader.getObservingLearnersInfo());
+ return response;
+ } else if (zkServer instanceof FollowerZooKeeperServer) {
+ Follower follower = ((FollowerZooKeeperServer)
zkServer).getFollower();
+ Integer syncedObservers = follower.getSyncedObserverSize();
+ if (syncedObservers != null) {
+ response.put("synced_observers", syncedObservers);
+ response.put("observers",
follower.getSyncedObserversInfo());
+ return response;
+ }
+ }
+
+ response.put("synced_observers", 0);
+ response.put("observers", Collections.emptySet());
+ return response;
+ }
+ }
+
+ /**
* Watch information aggregated by session. Returned Map contains:
* - "session_id_to_watched_paths": Map<Long, Set<String>>
session ID -> watched paths
* @see DataTree#getWatches()
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
index 1d809d2..16d0384 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
@@ -19,8 +19,9 @@
package org.apache.zookeeper.server.quorum;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
import org.apache.jute.Record;
import org.apache.zookeeper.ZooDefs.OpCode;
@@ -174,14 +175,18 @@ public class Follower extends Learner{
}
}
if (om != null) {
+ final long startTime = Time.currentElapsedTime();
om.proposalReceived(qp);
+
ServerMetrics.getMetrics().OM_PROPOSAL_PROCESS_TIME.add(Time.currentElapsedTime()
- startTime);
}
break;
case Leader.COMMIT:
ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
fzk.commit(qp.getZxid());
if (om != null) {
+ final long startTime = Time.currentElapsedTime();
om.proposalCommitted(qp.getZxid());
+
ServerMetrics.getMetrics().OM_COMMIT_PROCESS_TIME.add(Time.currentElapsedTime()
- startTime);
}
break;
@@ -251,6 +256,19 @@ public class Follower extends Learner{
return om == null ? null : om.getNumActiveObservers();
}
+ public Iterable<Map<String, Object>> getSyncedObserversInfo() {
+ if (om != null && om.getNumActiveObservers() > 0) {
+ return om.getActiveObservers();
+ }
+ return Collections.emptySet();
+ }
+
+ public void resetObserverConnectionStats() {
+ if (om != null && om.getNumActiveObservers() > 0) {
+ om.resetObserverConnectionStats();
+ }
+ }
+
@Override
public void shutdown() {
LOG.info("shutdown called", new Exception("shutdown Follower"));
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
index 7a0a444..052438b 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
@@ -204,6 +204,24 @@ public class Leader implements LearnerMaster {
}
}
+ public Iterable<Map<String, Object>> getObservingLearnersInfo() {
+ Set<Map<String,Object>> info = new HashSet<>();
+ synchronized (observingLearners) {
+ for (LearnerHandler lh: observingLearners) {
+ info.add(lh.getLearnerHandlerInfo());
+ }
+ }
+ return info;
+ }
+
+ public void resetObserverConnectionStats() {
+ synchronized (observingLearners) {
+ for (LearnerHandler lh : observingLearners) {
+ lh.resetObserverConnectionStats();
+ }
+ }
+ }
+
// Pending sync requests. Must access under 'this' lock.
private final Map<Long,List<LearnerSyncRequest>> pendingSyncs =
new HashMap<Long,List<LearnerSyncRequest>>();
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
index 09e7d5f..a3d5a4d 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -23,13 +23,18 @@ import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
+import java.lang.System;
import java.net.Socket;
import java.nio.ByteBuffer;
+import java.util.Date;
import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@@ -99,6 +104,24 @@ public class LearnerHandler extends ZooKeeperThread {
*/
final LinkedBlockingQueue<QuorumPacket> queuedPackets =
new LinkedBlockingQueue<QuorumPacket>();
+ private final AtomicLong queuedPacketsSize = new AtomicLong();
+
+ protected final AtomicLong packetsReceived = new AtomicLong();
+ protected final AtomicLong packetsSent = new AtomicLong();
+
+ protected final AtomicLong requestsReceived = new AtomicLong();
+
+ protected volatile long lastZxid = -1;
+
+ public synchronized long getLastZxid() {
+ return lastZxid;
+ }
+
+ protected final Date established = new Date();
+
+ public Date getEstablished() {
+ return (Date)established.clone();
+ }
/**
* Marker packets would be added to quorum packet queue after every
@@ -297,6 +320,7 @@ public class LearnerHandler extends ZooKeeperThread {
continue;
}
+ queuedPacketsSize.addAndGet(-packetSize(p));
if (p == proposalOfDeath) {
// Packet of death!
break;
@@ -310,7 +334,13 @@ public class LearnerHandler extends ZooKeeperThread {
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
}
+
+ // Log the zxid of the last request, if it is a valid zxid.
+ if (p.getZxid() > 0) {
+ lastZxid = p.getZxid();
+ }
oa.writeRecord(p, "packet");
+ packetsSent.incrementAndGet();
} catch (IOException e) {
if (!sock.isClosed()) {
LOG.warn("Unexpected exception at " + this, e);
@@ -602,6 +632,7 @@ public class LearnerHandler extends ZooKeeperThread {
}
tickOfNextAckDeadline =
learnerMaster.getTickOfNextAckDeadline();
+ packetsReceived.incrementAndGet();
ByteBuffer bb;
long sessionId;
@@ -647,6 +678,7 @@ public class LearnerHandler extends ZooKeeperThread {
}
si.setOwner(this);
learnerMaster.submitLearnerRequest(si);
+ requestsReceived.incrementAndGet();
break;
default:
LOG.warn("unexpected quorum packet, type: {}",
packetToString(qp));
@@ -1025,6 +1057,7 @@ public class LearnerHandler extends ZooKeeperThread {
packetCounter.getAndIncrement() % markerPacketInterval == 0) {
queuedPackets.add(new MarkerQuorumPacket(System.nanoTime()));
}
+ queuedPacketsSize.addAndGet(packetSize(p));
}
static long packetSize(QuorumPacket p) {
@@ -1041,6 +1074,29 @@ public class LearnerHandler extends ZooKeeperThread {
return isAlive() && learnerMaster.getCurrentTick() <=
tickOfNextAckDeadline;
}
+ public synchronized Map<String, Object> getLearnerHandlerInfo() {
+ Map<String, Object> info = new LinkedHashMap<>(9);
+ info.put("remote_socket_address", getRemoteAddress());
+ info.put("sid", getSid());
+ info.put("established", getEstablished());
+ info.put("queued_packets", queuedPackets.size());
+ info.put("queued_packets_size", queuedPacketsSize.get());
+ info.put("packets_received", packetsReceived.longValue());
+ info.put("packets_sent", packetsSent.longValue());
+ info.put("requests", requestsReceived.longValue());
+ info.put("last_zxid", getLastZxid());
+
+ return info;
+ }
+
+ public synchronized void resetObserverConnectionStats() {
+ packetsReceived.set(0);
+ packetsSent.set(0);
+ requestsReceived.set(0);
+
+ lastZxid = -1;
+ }
+
/**
* For testing, return packet queue
* @return
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java
index 7308f65..368f5ef 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java
@@ -33,6 +33,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -510,6 +512,20 @@ public class ObserverMaster implements LearnerMaster,
Runnable {
return activeObservers.size();
}
+ public Iterable<Map<String, Object>> getActiveObservers() {
+ Set<Map<String,Object>> info = new HashSet<>();
+ for (LearnerHandler lh:activeObservers) {
+ info.add(lh.getLearnerHandlerInfo());
+ }
+ return info;
+ }
+
+ public void resetObserverConnectionStats() {
+ for (LearnerHandler lh:activeObservers) {
+ lh.resetObserverConnectionStats();
+ }
+ }
+
int getPktsSizeLimit() {
return pktsSizeLimit;
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
index be803c0..86b5207 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
@@ -120,6 +120,18 @@ public class CommandsTest extends ClientBase {
}
@Test
+ public void testObservers() throws IOException, InterruptedException {
+ testCommand("observers",
+ new Field("synced_observers", Integer.class),
+ new Field("observers", Iterable.class));
+ }
+
+ @Test
+ public void testObserverConnectionStatReset() throws IOException,
InterruptedException {
+ testCommand("observer_connection_stat_reset");
+ }
+
+ @Test
public void testConnectionStatReset() throws IOException,
InterruptedException {
testCommand("connection_stat_reset");
}