This is an automated email from the ASF dual-hosted git repository.
fangmin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 36bca12 ZOOKEEPER-3319: Add metrics for follower and observer
36bca12 is described below
commit 36bca12d5e70f96de4701aa2fd41979ff8128a1f
Author: Jie Huang <[email protected]>
AuthorDate: Thu May 2 15:44:08 2019 -0700
ZOOKEEPER-3319: Add metrics for follower and observer
Author: Jie Huang <[email protected]>
Reviewers: [email protected], [email protected]
Closes #856 from jhuan31/ZOOKEEPER-3319
---
.../java/org/apache/zookeeper/server/Request.java | 36 +++++++++
.../org/apache/zookeeper/server/ServerMetrics.java | 12 +++
.../apache/zookeeper/server/quorum/Follower.java | 17 +++-
.../server/quorum/FollowerZooKeeperServer.java | 2 +
.../apache/zookeeper/server/quorum/Observer.java | 3 +
.../server/quorum/SendAckRequestProcessor.java | 3 +
.../server/quorum/LearnerMetricsTest.java | 92 ++++++++++++++++++++++
7 files changed, 163 insertions(+), 2 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index aef11bf..c8f7c74 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -26,6 +26,8 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.metrics.Summary;
+import org.apache.zookeeper.metrics.SummarySet;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.txn.TxnHeader;
@@ -311,4 +313,38 @@ public class Request {
public KeeperException getException() {
return e;
}
+
+ public void logLatency(Summary metric) {
+ logLatency(metric, Time.currentWallTime());
+ }
+
+ public void logLatency(Summary metric, long currentTime){
+ if (hdr != null) {
+ /* Request header is created by leader. If there is clock drift
+ * latency might be negative. Headers use wall time, not
+ * CLOCK_MONOTONIC.
+ */
+ long latency = currentTime - hdr.getTime();
+ if (latency > 0) {
+ metric.add(latency);
+ }
+ }
+ }
+
+ public void logLatency(SummarySet metric, String key, long currentTime) {
+ if (hdr != null) {
+ /* Request header is created by leader. If there is clock drift
+ * latency might be negative. Headers use wall time, not
+ * CLOCK_MONOTONIC.
+ */
+ long latency = currentTime - hdr.getTime();
+ if (latency > 0) {
+ metric.add(key, latency);
+ }
+ }
+ }
+
+ public void logLatency(SummarySet metric, String key) {
+ logLatency(metric, key, Time.currentWallTime());
+ }
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index 3c089de..6ff2474 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -186,6 +186,12 @@ public final class ServerMetrics {
READ_FINAL_PROC_TIME =
metricsContext.getSummary("read_final_proc_time_ms", DetailLevel.ADVANCED);
WRITE_FINAL_PROC_TIME =
metricsContext.getSummary("write_final_proc_time_ms", DetailLevel.ADVANCED);
+ PROPOSAL_LATENCY = metricsContext.getSummary("proposal_latency",
DetailLevel.ADVANCED);
+ PROPOSAL_ACK_CREATION_LATENCY =
metricsContext.getSummary("proposal_ack_creation_latency",
DetailLevel.ADVANCED);
+ COMMIT_PROPAGATION_LATENCY =
metricsContext.getSummary("commit_propagation_latency", DetailLevel.ADVANCED);
+ LEARNER_PROPOSAL_RECEIVED_COUNT =
metricsContext.getCounter("learner_proposal_received_count");
+ LEARNER_COMMIT_RECEIVED_COUNT =
metricsContext.getCounter("learner_commit_received_count");
+
}
/**
@@ -257,6 +263,12 @@ public final class ServerMetrics {
public final Summary PREP_PROCESS_TIME;
public final Summary CLOSE_SESSION_PREP_TIME;
+ public final Summary PROPOSAL_LATENCY;
+ public final Summary PROPOSAL_ACK_CREATION_LATENCY;
+ public final Summary COMMIT_PROPAGATION_LATENCY;
+ public final Counter LEARNER_PROPOSAL_RECEIVED_COUNT;
+ public final Counter LEARNER_COMMIT_RECEIVED_COUNT;
+
/**
* Fired watcher stats.
*/
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
index ea1dab6..719734f 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
@@ -141,7 +141,8 @@ public class Follower extends Learner{
case Leader.PING:
ping(qp);
break;
- case Leader.PROPOSAL:
+ case Leader.PROPOSAL:
+ ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1);
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
if (hdr.getZxid() != lastQueued + 1) {
@@ -159,12 +160,24 @@ public class Follower extends Learner{
}
fzk.logRequest(hdr, txn);
-
+ if (hdr != null) {
+ /*
+ * Request header is created only by the leader, so this is
only set
+ * for quorum packets. If there is a clock drift, the latency
may be
+ * negative. Headers use wall time, not CLOCK_MONOTONIC.
+ */
+ long now = Time.currentWallTime();
+ long latency = now - hdr.getTime();
+ if (latency > 0) {
+ ServerMetrics.getMetrics().PROPOSAL_LATENCY.add(latency);
+ }
+ }
if (om != null) {
om.proposalReceived(qp);
}
break;
case Leader.COMMIT:
+ ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
fzk.commit(qp.getZxid());
if (om != null) {
om.proposalCommitted(qp.getZxid());
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
index 610e965..e153286 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
@@ -33,6 +33,7 @@ import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.SyncRequestProcessor;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.txn.TxnHeader;
import javax.management.JMException;
@@ -113,6 +114,7 @@ public class FollowerZooKeeperServer extends
LearnerZooKeeperServer {
System.exit(ExitCode.UNMATCHED_TXN_COMMIT.getValue());
}
Request request = pendingTxns.remove();
+
request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
commitProcessor.commit(request);
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
index b688e03..5e086b7 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.jute.Record;
import org.apache.zookeeper.server.ObserverBean;
import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.SerializeUtils;
@@ -166,9 +167,11 @@ public class Observer extends Learner{
((ObserverZooKeeperServer)zk).sync();
break;
case Leader.INFORM:
+ ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
Request request = new Request (hdr.getClientId(), hdr.getCxid(),
hdr.getType(), hdr, txn, 0);
+
request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
ObserverZooKeeperServer obs = (ObserverZooKeeperServer)zk;
obs.commitRequest(request);
break;
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
index 6985342..c41f685 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
@@ -21,6 +21,7 @@ package org.apache.zookeeper.server.quorum;
import java.io.Flushable;
import java.io.IOException;
+import org.apache.zookeeper.server.ServerMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +43,8 @@ public class SendAckRequestProcessor implements
RequestProcessor, Flushable {
QuorumPacket qp = new QuorumPacket(Leader.ACK,
si.getHdr().getZxid(), null,
null);
try {
+
si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
+
learner.writePacket(qp, false);
} catch (IOException e) {
LOG.warn("Closing connection to leader, exception during
packet send", e);
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java
new file mode 100644
index 0000000..8895f1f
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *uuuuu
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "/RequuuAS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.metrics.MetricsUtils;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.number.OrderingComparison.greaterThan;
+
+public class LearnerMetricsTest extends QuorumPeerTestBase {
+
+ @Test
+ public void testLearnerMetricsTest() throws Exception {
+ ServerMetrics.getMetrics().resetAll();
+ ClientBase.setupTestEnv();
+
+ final int SERVER_COUNT = 6; // 5 participants, 1 observer
+ final String path = "/zk-testLeanerMetrics";
+ final byte[] data = new byte[512];
+ final int clientPorts[] = new int[SERVER_COUNT];
+ StringBuilder sb = new StringBuilder();
+ int observer = 0 ;
+ clientPorts[observer] = PortAssignment.unique();
+
sb.append("server."+observer+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+":observer\n");
+ for(int i = 1; i < SERVER_COUNT; i++) {
+ clientPorts[i] = PortAssignment.unique();
+
sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+"\n");
+ }
+
+ // start the participants
+ String quorumCfgSection = sb.toString();
+ QuorumPeerTestBase.MainThread mt[] = new
QuorumPeerTestBase.MainThread[SERVER_COUNT];
+ for(int i = 1; i < SERVER_COUNT; i++) {
+ mt[i] = new QuorumPeerTestBase.MainThread(i, clientPorts[i],
quorumCfgSection);
+ mt[i].start();
+ }
+
+ // start the observer
+ Map<String, String> observerConfig = new HashMap<>();
+ observerConfig.put("peerType", "observer");
+ mt[observer] = new QuorumPeerTestBase.MainThread(observer,
clientPorts[observer], quorumCfgSection, observerConfig);
+ mt[observer].start();
+
+ ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[1],
ClientBase.CONNECTION_TIMEOUT, this);
+
+ waitForOne(zk, ZooKeeper.States.CONNECTED);
+
+ // send one create request
+ zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+
+ Thread.sleep(200);
+
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+ // there are 4 followers, each received two proposals, one for leader
election, one for the create request
+ Assert.assertEquals(8L, values.get("learner_proposal_received_count"));
+ Assert.assertEquals(8L, values.get("cnt_proposal_latency"));
+ Assert.assertThat((long)values.get("min_proposal_latency"),
greaterThan(0L));
+ Assert.assertEquals(8L,
values.get("cnt_proposal_ack_creation_latency"));
+
Assert.assertThat((long)values.get("min_proposal_ack_creation_latency"),
greaterThan(0L));
+
+ // there are five learners, each received two commits, one for leader
election, one for the create request
+ Assert.assertEquals(10L, values.get("learner_commit_received_count"));
+ Assert.assertEquals(10L, values.get("cnt_commit_propagation_latency"));
+ Assert.assertThat((long)values.get("min_commit_propagation_latency"),
greaterThan(0L));
+ }
+}