This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new bbc39c4 ZOOKEEPER-3310: Add metrics for prep processor
bbc39c4 is described below
commit bbc39c42bba74add9a625adca3ee52fb1cd02335
Author: Jie Huang <[email protected]>
AuthorDate: Sun Apr 14 16:05:59 2019 +0200
ZOOKEEPER-3310: Add metrics for prep processor
Author: Jie Huang <[email protected]>
Reviewers: [email protected]
Closes #855 from jhuan31/ZOOKEEPER-3310 and squashes the following commits:
5875158b4 [Jie Huang] remove sleep in tests
2033456bb [Jie Huang] Reconstructed unit test. Add left out metric
245663d88 [Jie Huang] ZOOKEEPER-3310: Add metrics for prep processor
---
.../zookeeper/server/FinalRequestProcessor.java | 1 +
.../zookeeper/server/PrepRequestProcessor.java | 11 +-
.../java/org/apache/zookeeper/server/Request.java | 2 +
.../org/apache/zookeeper/server/ServerMetrics.java | 9 +
.../server/PrepRequestProcessorMetricsTest.java | 181 +++++++++++++++++++++
5 files changed, 203 insertions(+), 1 deletion(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
index 8a237ff..1242516 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
@@ -126,6 +126,7 @@ public class FinalRequestProcessor implements
RequestProcessor {
while (!zks.outstandingChanges.isEmpty()
&& zks.outstandingChanges.peek().zxid <= zxid) {
ChangeRecord cr = zks.outstandingChanges.remove();
+ ServerMetrics.OUTSTANDING_CHANGES_REMOVED.add(1);
if (cr.zxid < zxid) {
LOG.warn("Zxid outstanding " + cr.zxid
+ " is less than current " + zxid);
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
index d0a1ac9..38babbb 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
@@ -131,6 +131,7 @@ public class PrepRequestProcessor extends
ZooKeeperCriticalThread implements
public void run() {
try {
while (true) {
+
ServerMetrics.PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
Request request = submittedRequests.take();
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
@@ -142,7 +143,9 @@ public class PrepRequestProcessor extends
ZooKeeperCriticalThread implements
if (Request.requestOfDeath == request) {
break;
}
+ long prepStartTime = Time.currentElapsedTime();
pRequest(request);
+ ServerMetrics.PREP_PROCESS_TIME.add(Time.currentElapsedTime()
- prepStartTime);
}
} catch (RequestProcessorException e) {
if (e.getCause() instanceof XidRolloverException) {
@@ -183,10 +186,11 @@ public class PrepRequestProcessor extends
ZooKeeperCriticalThread implements
}
}
- private void addChangeRecord(ChangeRecord c) {
+ protected void addChangeRecord(ChangeRecord c) {
synchronized (zks.outstandingChanges) {
zks.outstandingChanges.add(c);
zks.outstandingChangesForPath.put(c.path, c);
+ ServerMetrics.OUTSTANDING_CHANGES_QUEUED.add(1);
}
}
@@ -588,6 +592,7 @@ public class PrepRequestProcessor extends
ZooKeeperCriticalThread implements
// queues up this operation without being the session owner.
// this request is the last of the session so it should be ok
//zks.sessionTracker.checkSession(request.sessionId,
request.getOwner());
+ long startTime = Time.currentElapsedTime();
Set<String> es = zks.getZKDatabase()
.getEphemerals(request.sessionId);
synchronized (zks.outstandingChanges) {
@@ -605,6 +610,7 @@ public class PrepRequestProcessor extends
ZooKeeperCriticalThread implements
zks.sessionTracker.setSessionClosing(request.sessionId);
}
+
ServerMetrics.CLOSE_SESSION_PREP_TIME.add(Time.currentElapsedTime() -
startTime);
break;
case OpCode.check:
zks.sessionTracker.checkSession(request.sessionId,
request.getOwner());
@@ -902,6 +908,7 @@ public class PrepRequestProcessor extends
ZooKeeperCriticalThread implements
}
}
request.zxid = zks.getZxid();
+ ServerMetrics.PREP_PROCESSOR_QUEUE_TIME.add(Time.currentElapsedTime()
- request.prepQueueStartTime);
nextProcessor.processRequest(request);
}
@@ -1005,7 +1012,9 @@ public class PrepRequestProcessor extends
ZooKeeperCriticalThread implements
}
public void processRequest(Request request) {
+ request.prepQueueStartTime = Time.currentElapsedTime();
submittedRequests.add(request);
+ ServerMetrics.PREP_PROCESSOR_QUEUED.add(1);
}
public void shutdown() {
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index 4895d8c..f6c50bc 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -78,6 +78,8 @@ public class Request {
public final long createTime = Time.currentElapsedTime();
+ public long prepQueueStartTime= -1;
+
private Object owner;
private KeeperException e;
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index ea6514c..c27716c 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -169,6 +169,15 @@ public final class ServerMetrics {
public final SummarySet READ_PER_NAMESPACE;
public final Counter BYTES_RECEIVED_COUNT;
+ PREP_PROCESSOR_QUEUE_TIME(new
AvgMinMaxPercentileCounter("prep_processor_queue_time_ms")),
+ PREP_PROCESSOR_QUEUE_SIZE(new
AvgMinMaxCounter("prep_processor_queue_size")),
+ PREP_PROCESSOR_QUEUED(new SimpleCounter("prep_processor_request_queued")),
+ OUTSTANDING_CHANGES_QUEUED(new
SimpleCounter("outstanding_changes_queued")),
+ OUTSTANDING_CHANGES_REMOVED(new
SimpleCounter("outstanding_changes_removed")),
+ PREP_PROCESS_TIME(new AvgMinMaxCounter("prep_process_time")),
+ CLOSE_SESSION_PREP_TIME(new
AvgMinMaxPercentileCounter("close_session_prep_time")),
+
+
/**
* Fired watcher stats.
*/
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
new file mode 100644
index 0000000..75b1d6c
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import jline.internal.Log;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.data.StatPersisted;
+import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
+import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.QuorumUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.number.OrderingComparison.greaterThan;
+import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.*;
+
+public class PrepRequestProcessorMetricsTest extends ZKTestCase {
+ private static final Logger LOG =
LoggerFactory.getLogger(PrepRequestProcessorMetricsTest.class);
+
+ ZooKeeperServer zks;
+ RequestProcessor nextProcessor;
+
+ @Before
+ public void setup() {
+ zks = spy(new ZooKeeperServer());
+ zks.sessionTracker = mock(SessionTracker.class);
+
+ ZKDatabase db = mock(ZKDatabase.class);
+ when(zks.getZKDatabase()).thenReturn(db);
+
+ DataNode node = new DataNode(new byte[1], null,
mock(StatPersisted.class));
+ when(db.getNode(anyString())).thenReturn(node);
+
+ Set<String> ephemerals = new HashSet<>();
+ ephemerals.add("/crystalmountain");
+ ephemerals.add("/stevenspass");
+ when(db.getEphemerals(anyLong())).thenReturn(ephemerals);
+
+ nextProcessor = mock(RequestProcessor.class);
+ ServerMetrics.resetAll();
+ }
+
+ private Request createRequest(Record record, int opCode) throws
IOException {
+ // encoding
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+ record.serialize(boa, "request");
+ baos.close();
+ return new Request(null, 1l, 0, opCode,
ByteBuffer.wrap(baos.toByteArray()), null);
+ }
+
+ private Request createRequest(String path, int opCode) throws IOException {
+ Record record;
+ switch (opCode) {
+ case ZooDefs.OpCode.setData:
+ record = new SetDataRequest(path, new byte[0], -1);
+ break;
+ case ZooDefs.OpCode.delete:
+ record = new DeleteRequest(path, -1);
+ break;
+ default:
+ record = new DeleteRequest(path, -1);
+ break;
+ }
+
+ return createRequest(record, opCode);
+ }
+
+ private Request createRequest(long sessionId, int opCode) {
+ return new Request(null, sessionId, 0, opCode, null, null);
+ }
+
+ @Test
+ public void testPrepRequestProcessorMetrics() throws Exception {
+ CountDownLatch threeRequests = new CountDownLatch(3);
+ doAnswer(invocationOnMock -> {
+ threeRequests.countDown();
+ return
null;}).when(nextProcessor).processRequest(any(Request.class));
+
+ PrepRequestProcessor prepRequestProcessor = new
PrepRequestProcessor(zks, nextProcessor);
+ PrepRequestProcessor.skipACL = true;
+
+ //setData will generate one change
+ prepRequestProcessor.processRequest(createRequest("/foo",
ZooDefs.OpCode.setData));
+ //delete will generate two changes, one for itself, one for its parent
+ prepRequestProcessor.processRequest(createRequest("/foo/bar",
ZooDefs.OpCode.delete));
+ //mocking two ephemeral nodes exists for this session so two changes
+ prepRequestProcessor.processRequest(createRequest(2,
ZooDefs.OpCode.closeSession));
+
+ Map<String, Object> values = ServerMetrics.getAllValues();
+ Assert.assertEquals(3L, values.get("prep_processor_request_queued"));
+
+ prepRequestProcessor.start();
+
+ threeRequests.await(500, TimeUnit.MILLISECONDS);
+
+ values = ServerMetrics.getAllValues();
+ Assert.assertEquals(3L, values.get("max_prep_processor_queue_size"));
+
+
Assert.assertThat((long)values.get("min_prep_processor_queue_time_ms"),
greaterThan(0l));
+ Assert.assertEquals(3L,
values.get("cnt_prep_processor_queue_time_ms"));
+
+ Assert.assertEquals(3L, values.get("cnt_prep_process_time"));
+ Assert.assertThat((long)values.get("max_prep_process_time"),
greaterThan(0l));
+
+ Assert.assertEquals(1L, values.get("cnt_close_session_prep_time"));
+ Assert.assertThat((long)values.get("max_close_session_prep_time"),
greaterThanOrEqualTo(0L));
+
+ Assert.assertEquals(5L, values.get("outstanding_changes_queued"));
+ }
+
+ private class SimpleWatcher implements Watcher {
+ CountDownLatch created;
+ public SimpleWatcher(CountDownLatch latch) {
+ this.created = latch;
+ }
+ @Override
+ public void process(WatchedEvent e) {
+ created.countDown();
+ }
+ }
+
+ @Test
+ public void testOutstandingChangesRemoved() throws Exception {
+ // this metric is currently recorded in FinalRequestProcessor but it
is tightly related to the Prep metrics
+ QuorumUtil util = new QuorumUtil(1);
+ util.startAll();
+
+ ServerMetrics.resetAll();
+
+ ZooKeeper zk = ClientBase.createZKClient(util.getConnString());
+ zk.create("/test", new byte[50], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+
+ CountDownLatch created = new CountDownLatch(1);
+ zk.exists("/test", new SimpleWatcher(created));
+ created.await(200, TimeUnit.MILLISECONDS);
+
+ Map<String, Object> values = ServerMetrics.getAllValues();
+ Assert.assertThat((long)values.get("outstanding_changes_removed"),
greaterThan(0L));
+
+ util.shutdownAll();
+ }
+}