This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new ff47fc3 ZOOKEEPER-3310: Add metrics for prep processor
ff47fc3 is described below
commit ff47fc3dfda3487ca3a5aca2ea16831b2aebc7e2
Author: Jie Huang <[email protected]>
AuthorDate: Wed Apr 17 14:16:17 2019 +0200
ZOOKEEPER-3310: Add metrics for prep processor
Author: Jie Huang <[email protected]>
Reviewers: [email protected]
Closes #855 from jhuan31/ZOOKEEPER-3310 and squashes the following commits:
ab0b81c2b [Jie Huang] move to the new metric framework
8f4d53942 [Jie Huang] remove sleep in tests
9cf1fc064 [Jie Huang] Reconstructed unit test. Add left out metric
b07aa8c24 [Jie Huang] ZOOKEEPER-3310: Add metrics for prep processor
---
.../zookeeper/server/FinalRequestProcessor.java | 1 +
.../zookeeper/server/PrepRequestProcessor.java | 11 +-
.../java/org/apache/zookeeper/server/Request.java | 2 +
.../org/apache/zookeeper/server/ServerMetrics.java | 17 +-
.../server/PrepRequestProcessorMetricsTest.java | 176 +++++++++++++++++++++
5 files changed, 205 insertions(+), 2 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
index 8a237ff..2267666 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
@@ -126,6 +126,7 @@ public class FinalRequestProcessor implements
RequestProcessor {
while (!zks.outstandingChanges.isEmpty()
&& zks.outstandingChanges.peek().zxid <= zxid) {
ChangeRecord cr = zks.outstandingChanges.remove();
+
ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
if (cr.zxid < zxid) {
LOG.warn("Zxid outstanding " + cr.zxid
+ " is less than current " + zxid);
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
index d0a1ac9..48c3bfc 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
@@ -131,6 +131,7 @@ public class PrepRequestProcessor extends
ZooKeeperCriticalThread implements
public void run() {
try {
while (true) {
+
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
Request request = submittedRequests.take();
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
@@ -142,7 +143,9 @@ public class PrepRequestProcessor extends
ZooKeeperCriticalThread implements
if (Request.requestOfDeath == request) {
break;
}
+ long prepStartTime = Time.currentElapsedTime();
pRequest(request);
+
ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() -
prepStartTime);
}
} catch (RequestProcessorException e) {
if (e.getCause() instanceof XidRolloverException) {
@@ -183,10 +186,11 @@ public class PrepRequestProcessor extends
ZooKeeperCriticalThread implements
}
}
- private void addChangeRecord(ChangeRecord c) {
+ protected void addChangeRecord(ChangeRecord c) {
synchronized (zks.outstandingChanges) {
zks.outstandingChanges.add(c);
zks.outstandingChangesForPath.put(c.path, c);
+ ServerMetrics.getMetrics().OUTSTANDING_CHANGES_QUEUED.add(1);
}
}
@@ -588,6 +592,7 @@ public class PrepRequestProcessor extends
ZooKeeperCriticalThread implements
// queues up this operation without being the session owner.
// this request is the last of the session so it should be ok
//zks.sessionTracker.checkSession(request.sessionId,
request.getOwner());
+ long startTime = Time.currentElapsedTime();
Set<String> es = zks.getZKDatabase()
.getEphemerals(request.sessionId);
synchronized (zks.outstandingChanges) {
@@ -605,6 +610,7 @@ public class PrepRequestProcessor extends
ZooKeeperCriticalThread implements
zks.sessionTracker.setSessionClosing(request.sessionId);
}
+
ServerMetrics.getMetrics().CLOSE_SESSION_PREP_TIME.add(Time.currentElapsedTime()
- startTime);
break;
case OpCode.check:
zks.sessionTracker.checkSession(request.sessionId,
request.getOwner());
@@ -902,6 +908,7 @@ public class PrepRequestProcessor extends
ZooKeeperCriticalThread implements
}
}
request.zxid = zks.getZxid();
+
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME.add(Time.currentElapsedTime()
- request.prepQueueStartTime);
nextProcessor.processRequest(request);
}
@@ -1005,7 +1012,9 @@ public class PrepRequestProcessor extends
ZooKeeperCriticalThread implements
}
public void processRequest(Request request) {
+ request.prepQueueStartTime = Time.currentElapsedTime();
submittedRequests.add(request);
+ ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUED.add(1);
}
public void shutdown() {
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index 4895d8c..f6c50bc 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -78,6 +78,8 @@ public class Request {
public final long createTime = Time.currentElapsedTime();
+ public long prepQueueStartTime= -1;
+
private Object owner;
private KeeperException e;
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index ea6514c..cdd57f0 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -99,7 +99,7 @@ public final class ServerMetrics {
/*
- * Number of dead watchers in DeadWatcherListener
+ * Number of dead watchers in DeadWatcherListener
*/
ADD_DEAD_WATCHER_STALL_TIME =
metricsContext.getCounter("add_dead_watcher_stall_time");
DEAD_WATCHERS_QUEUED =
metricsContext.getCounter("dead_watchers_queued");
@@ -115,6 +115,13 @@ public final class ServerMetrics {
ENSEMBLE_AUTH_SKIP = metricsContext.getCounter("ensemble_auth_skip");
+ PREP_PROCESSOR_QUEUE_TIME =
metricsContext.getSummary("prep_processor_queue_time_ms", DetailLevel.ADVANCED);
+ PREP_PROCESSOR_QUEUE_SIZE =
metricsContext.getSummary("prep_processor_queue_size", DetailLevel.BASIC);
+ PREP_PROCESSOR_QUEUED =
metricsContext.getCounter("prep_processor_request_queued");
+ OUTSTANDING_CHANGES_QUEUED =
metricsContext.getCounter("outstanding_changes_queued");
+ OUTSTANDING_CHANGES_REMOVED =
metricsContext.getCounter("outstanding_changes_removed");
+ PREP_PROCESS_TIME = metricsContext.getSummary("prep_process_time",
DetailLevel.BASIC);
+ CLOSE_SESSION_PREP_TIME =
metricsContext.getSummary("close_session_prep_time", DetailLevel.ADVANCED);
}
/**
@@ -169,6 +176,14 @@ public final class ServerMetrics {
public final SummarySet READ_PER_NAMESPACE;
public final Counter BYTES_RECEIVED_COUNT;
+ public final Summary PREP_PROCESSOR_QUEUE_TIME;
+ public final Summary PREP_PROCESSOR_QUEUE_SIZE;
+ public final Counter PREP_PROCESSOR_QUEUED;
+ public final Counter OUTSTANDING_CHANGES_QUEUED;
+ public final Counter OUTSTANDING_CHANGES_REMOVED;
+ public final Summary PREP_PROCESS_TIME;
+ public final Summary CLOSE_SESSION_PREP_TIME;
+
/**
* Fired watcher stats.
*/
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
new file mode 100644
index 0000000..019d0e7
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.StatPersisted;
+import org.apache.zookeeper.metrics.MetricsUtils;
+import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.QuorumUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.number.OrderingComparison.greaterThan;
+import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.*;
+
+public class PrepRequestProcessorMetricsTest extends ZKTestCase {
+ private static final Logger LOG =
LoggerFactory.getLogger(PrepRequestProcessorMetricsTest.class);
+
+ ZooKeeperServer zks;
+ RequestProcessor nextProcessor;
+
+ @Before
+ public void setup() {
+ zks = spy(new ZooKeeperServer());
+ zks.sessionTracker = mock(SessionTracker.class);
+
+ ZKDatabase db = mock(ZKDatabase.class);
+ when(zks.getZKDatabase()).thenReturn(db);
+
+ DataNode node = new DataNode(new byte[1], null,
mock(StatPersisted.class));
+ when(db.getNode(anyString())).thenReturn(node);
+
+ Set<String> ephemerals = new HashSet<>();
+ ephemerals.add("/crystalmountain");
+ ephemerals.add("/stevenspass");
+ when(db.getEphemerals(anyLong())).thenReturn(ephemerals);
+
+ nextProcessor = mock(RequestProcessor.class);
+ ServerMetrics.getMetrics().resetAll();
+ }
+
+ private Request createRequest(Record record, int opCode) throws
IOException {
+ // encoding
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+ record.serialize(boa, "request");
+ baos.close();
+ return new Request(null, 1l, 0, opCode,
ByteBuffer.wrap(baos.toByteArray()), null);
+ }
+
+ private Request createRequest(String path, int opCode) throws IOException {
+ Record record;
+ switch (opCode) {
+ case ZooDefs.OpCode.setData:
+ record = new SetDataRequest(path, new byte[0], -1);
+ break;
+ case ZooDefs.OpCode.delete:
+ record = new DeleteRequest(path, -1);
+ break;
+ default:
+ record = new DeleteRequest(path, -1);
+ break;
+ }
+
+ return createRequest(record, opCode);
+ }
+
+ private Request createRequest(long sessionId, int opCode) {
+ return new Request(null, sessionId, 0, opCode, null, null);
+ }
+
+ @Test
+ public void testPrepRequestProcessorMetrics() throws Exception {
+ CountDownLatch threeRequests = new CountDownLatch(3);
+ doAnswer(invocationOnMock -> {
+ threeRequests.countDown();
+ return
null;}).when(nextProcessor).processRequest(any(Request.class));
+
+ PrepRequestProcessor prepRequestProcessor = new
PrepRequestProcessor(zks, nextProcessor);
+ PrepRequestProcessor.skipACL = true;
+
+ //setData will generate one change
+ prepRequestProcessor.processRequest(createRequest("/foo",
ZooDefs.OpCode.setData));
+ //delete will generate two changes, one for itself, one for its parent
+ prepRequestProcessor.processRequest(createRequest("/foo/bar",
ZooDefs.OpCode.delete));
+ //mocking two ephemeral nodes exists for this session so two changes
+ prepRequestProcessor.processRequest(createRequest(2,
ZooDefs.OpCode.closeSession));
+
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+ Assert.assertEquals(3L, values.get("prep_processor_request_queued"));
+
+ prepRequestProcessor.start();
+
+ threeRequests.await(500, TimeUnit.MILLISECONDS);
+
+ values = MetricsUtils.currentServerMetrics();
+ Assert.assertEquals(3L, values.get("max_prep_processor_queue_size"));
+
+
Assert.assertThat((long)values.get("min_prep_processor_queue_time_ms"),
greaterThan(0l));
+ Assert.assertEquals(3L,
values.get("cnt_prep_processor_queue_time_ms"));
+
+ Assert.assertEquals(3L, values.get("cnt_prep_process_time"));
+ Assert.assertThat((long)values.get("max_prep_process_time"),
greaterThan(0l));
+
+ Assert.assertEquals(1L, values.get("cnt_close_session_prep_time"));
+ Assert.assertThat((long)values.get("max_close_session_prep_time"),
greaterThanOrEqualTo(0L));
+
+ Assert.assertEquals(5L, values.get("outstanding_changes_queued"));
+ }
+
+ private class SimpleWatcher implements Watcher {
+ CountDownLatch created;
+ public SimpleWatcher(CountDownLatch latch) {
+ this.created = latch;
+ }
+ @Override
+ public void process(WatchedEvent e) {
+ created.countDown();
+ }
+ }
+
+ @Test
+ public void testOutstandingChangesRemoved() throws Exception {
+ // this metric is currently recorded in FinalRequestProcessor but it
is tightly related to the Prep metrics
+ QuorumUtil util = new QuorumUtil(1);
+ util.startAll();
+
+ ServerMetrics.getMetrics().resetAll();
+
+ ZooKeeper zk = ClientBase.createZKClient(util.getConnString());
+ zk.create("/test", new byte[50], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+
+ CountDownLatch created = new CountDownLatch(1);
+ zk.exists("/test", new SimpleWatcher(created));
+ created.await(200, TimeUnit.MILLISECONDS);
+
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+ Assert.assertThat((long)values.get("outstanding_changes_removed"),
greaterThan(0L));
+
+ util.shutdownAll();
+ }
+}