This is an automated email from the ASF dual-hosted git repository.
fangmin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 5cb10bb ZOOKEEPER-3309: Add sync processor metrics
5cb10bb is described below
commit 5cb10bb5c0e341d65471b70572654756ba088a1d
Author: Jie Huang <[email protected]>
AuthorDate: Tue Jun 4 15:46:58 2019 -0700
ZOOKEEPER-3309: Add sync processor metrics
Author: Jie Huang <[email protected]>
Reviewers: [email protected], [email protected], [email protected]
Closes #850 from jhuan31/ZOOKEEPER-3309
---
.../java/org/apache/zookeeper/server/Request.java | 2 +
.../org/apache/zookeeper/server/ServerMetrics.java | 17 ++++
.../zookeeper/server/SyncRequestProcessor.java | 16 ++++
.../quorum/SyncRequestProcessorMetricTest.java | 104 +++++++++++++++++++++
4 files changed, 139 insertions(+)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index 83f3e0b..6c96792 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -88,6 +88,8 @@ public class Request {
public long commitRecvTime = -1;
+ public long syncQueueStartTime;
+
private Object owner;
private KeeperException e;
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index 686aacc..d8b437d 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -202,6 +202,14 @@ public final class ServerMetrics {
STARTUP_TXNS_LOAD_TIME =
metricsContext.getSummary("startup_txns_load_time", DetailLevel.BASIC);
STARTUP_SNAP_LOAD_TIME =
metricsContext.getSummary("startup_snap_load_time", DetailLevel.BASIC);
+ SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME =
metricsContext.getSummary("sync_processor_queue_and_flush_time_ms",
DetailLevel.ADVANCED);
+ SYNC_PROCESSOR_QUEUE_SIZE =
metricsContext.getSummary("sync_processor_queue_size", DetailLevel.BASIC);
+ SYNC_PROCESSOR_QUEUED =
metricsContext.getCounter("sync_processor_request_queued");
+ SYNC_PROCESSOR_QUEUE_TIME =
metricsContext.getSummary("sync_processor_queue_time_ms", DetailLevel.ADVANCED);
+ SYNC_PROCESSOR_FLUSH_TIME =
metricsContext.getSummary("sync_processor_queue_flush_time_ms",
DetailLevel.ADVANCED);
+ SYNC_PROCESS_TIME = metricsContext.getSummary("sync_process_time",
DetailLevel.BASIC);
+
+ BATCH_SIZE = metricsContext.getSummary("sync_processor_batch_size",
DetailLevel.BASIC);
}
/**
@@ -283,6 +291,15 @@ public final class ServerMetrics {
public final Summary STARTUP_TXNS_LOAD_TIME;
public final Summary STARTUP_SNAP_LOAD_TIME;
+ public final Summary SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME;
+ public final Summary SYNC_PROCESSOR_QUEUE_SIZE;
+ public final Counter SYNC_PROCESSOR_QUEUED;
+ public final Summary SYNC_PROCESSOR_QUEUE_TIME;
+ public final Summary SYNC_PROCESSOR_FLUSH_TIME;
+ public final Summary SYNC_PROCESS_TIME;
+
+ public final Summary BATCH_SIZE;
+
/**
* Fired watcher stats.
*/
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
index 6bb7937..a3c9e0f 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
@@ -165,6 +165,8 @@ public class SyncRequestProcessor extends
ZooKeeperCriticalThread implements
resetSnapshotStats();
lastFlushTime = Time.currentElapsedTime();
while (true) {
+
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
+
long pollTime = Math.min(zks.getMaxWriteQueuePollTime(),
getRemainingDelay());
Request si = queuedRequests.poll(pollTime,
TimeUnit.MILLISECONDS);
if (si == null) {
@@ -177,6 +179,10 @@ public class SyncRequestProcessor extends
ZooKeeperCriticalThread implements
break;
}
+ long startProcessTime = Time.currentElapsedTime();
+ ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(
+ startProcessTime - si.syncQueueStartTime);
+
// track the number of records written to the log
if (zks.getZKDatabase().append(si)) {
if (shouldSnapshot()) {
@@ -217,6 +223,7 @@ public class SyncRequestProcessor extends
ZooKeeperCriticalThread implements
if (shouldFlush()) {
flush();
}
+
ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() -
startProcessTime);
}
} catch (Throwable t) {
handleException(this.getName(), t);
@@ -229,13 +236,19 @@ public class SyncRequestProcessor extends
ZooKeeperCriticalThread implements
return;
}
+ ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());
+
+ long flushStartTime = Time.currentElapsedTime();
zks.getZKDatabase().commit();
+
ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime()
- flushStartTime);
if (this.nextProcessor == null) {
this.toFlush.clear();
} else {
while (!this.toFlush.isEmpty()) {
final Request i = this.toFlush.remove();
+ long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
+
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
this.nextProcessor.processRequest(i);
}
if (this.nextProcessor instanceof Flushable) {
@@ -266,7 +279,10 @@ public class SyncRequestProcessor extends
ZooKeeperCriticalThread implements
public void processRequest(final Request request) {
Objects.requireNonNull(request, "Request cannot be null");
+
+ request.syncQueueStartTime = Time.currentElapsedTime();
queuedRequests.add(request);
+ ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
}
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java
new file mode 100644
index 0000000..843dfab
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.metrics.MetricsUtils;
+import org.apache.zookeeper.server.*;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.number.OrderingComparison.greaterThan;
+import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SyncRequestProcessorMetricTest {
+ ZooKeeperServer zks;
+ RequestProcessor nextProcessor;
+ CountDownLatch allRequestsFlushed;
+
+ @Before
+ public void setup() throws Exception {
+ ZKDatabase db = mock(ZKDatabase.class);
+ when(db.append(any(Request.class))).thenReturn(true);
+ doAnswer(invocation->{
+ Thread.sleep(100);
+ return null;
+ }).when(db).commit();
+ zks = mock(ZooKeeperServer.class);
+ when(zks.getZKDatabase()).thenReturn(db);
+
+ nextProcessor = mock(RequestProcessor.class);
+ doAnswer(invocationOnMock -> {
+ allRequestsFlushed.countDown();
+ return null;
+ }).when(nextProcessor).processRequest(any(Request.class));
+ }
+
+ private Request createRquest(long sessionId, int xid) {
+ return new Request(null, sessionId, xid, ZooDefs.OpCode.setData,
+ ByteBuffer.wrap(new byte[10]), null);
+ }
+
+ @Test
+ public void testSyncProcessorMetrics() throws Exception{
+ SyncRequestProcessor syncProcessor = new SyncRequestProcessor(zks,
nextProcessor);
+ for (int i=0; i<500; i++) {
+ syncProcessor.processRequest(createRquest(1, i));
+ }
+
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+ Assert.assertEquals(500L, values.get("sync_processor_request_queued"));
+
+ allRequestsFlushed = new CountDownLatch(500);
+ syncProcessor.start();
+
+ allRequestsFlushed.await(5000, TimeUnit.MILLISECONDS);
+
+ values = MetricsUtils.currentServerMetrics();
+
+ Assert.assertEquals(501L, values.get("cnt_sync_processor_queue_size"));
+ Assert.assertEquals(500L, values.get("max_sync_processor_queue_size"));
+ Assert.assertEquals(0L, values.get("min_sync_processor_queue_size"));
+
+ Assert.assertEquals(500L,
values.get("cnt_sync_processor_queue_time_ms"));
+
Assert.assertThat((long)values.get("max_sync_processor_queue_time_ms"),
greaterThan(0L));
+
+ Assert.assertEquals(500L,
values.get("cnt_sync_processor_queue_and_flush_time_ms"));
+
Assert.assertThat((long)values.get("max_sync_processor_queue_and_flush_time_ms"),
greaterThan(0L));
+
+ Assert.assertEquals(500L, values.get("cnt_sync_process_time"));
+ Assert.assertThat((long)values.get("max_sync_process_time"),
greaterThan(0L));
+
+ Assert.assertEquals(500L, values.get("max_sync_processor_batch_size"));
+ Assert.assertEquals(1L,
values.get("cnt_sync_processor_queue_flush_time_ms"));
+
Assert.assertThat((long)values.get("max_sync_processor_queue_flush_time_ms"),
greaterThanOrEqualTo(100L));
+
+ syncProcessor.shutdown();
+ }
+}