This is an automated email from the ASF dual-hosted git repository.

fangmin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cb10bb  ZOOKEEPER-3309: Add sync processor metrics
5cb10bb is described below

commit 5cb10bb5c0e341d65471b70572654756ba088a1d
Author: Jie Huang <[email protected]>
AuthorDate: Tue Jun 4 15:46:58 2019 -0700

    ZOOKEEPER-3309: Add sync processor metrics
    
    Author: Jie Huang <[email protected]>
    
    Reviewers: [email protected], [email protected], [email protected]
    
    Closes #850 from jhuan31/ZOOKEEPER-3309
---
 .../java/org/apache/zookeeper/server/Request.java  |   2 +
 .../org/apache/zookeeper/server/ServerMetrics.java |  17 ++++
 .../zookeeper/server/SyncRequestProcessor.java     |  16 ++++
 .../quorum/SyncRequestProcessorMetricTest.java     | 104 +++++++++++++++++++++
 4 files changed, 139 insertions(+)

diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index 83f3e0b..6c96792 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -88,6 +88,8 @@ public class Request {
 
     public long commitRecvTime = -1;
 
+    public long syncQueueStartTime;
+
     private Object owner;
 
     private KeeperException e;
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index 686aacc..d8b437d 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -202,6 +202,14 @@ public final class ServerMetrics {
         STARTUP_TXNS_LOAD_TIME = 
metricsContext.getSummary("startup_txns_load_time", DetailLevel.BASIC);
         STARTUP_SNAP_LOAD_TIME = 
metricsContext.getSummary("startup_snap_load_time", DetailLevel.BASIC);
 
+        SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME = 
metricsContext.getSummary("sync_processor_queue_and_flush_time_ms", 
DetailLevel.ADVANCED);
+        SYNC_PROCESSOR_QUEUE_SIZE = 
metricsContext.getSummary("sync_processor_queue_size", DetailLevel.BASIC);
+        SYNC_PROCESSOR_QUEUED = 
metricsContext.getCounter("sync_processor_request_queued");
+        SYNC_PROCESSOR_QUEUE_TIME = 
metricsContext.getSummary("sync_processor_queue_time_ms", DetailLevel.ADVANCED);
+        SYNC_PROCESSOR_FLUSH_TIME = 
metricsContext.getSummary("sync_processor_queue_flush_time_ms", 
DetailLevel.ADVANCED);
+        SYNC_PROCESS_TIME = metricsContext.getSummary("sync_process_time", 
DetailLevel.BASIC);
+
+        BATCH_SIZE = metricsContext.getSummary("sync_processor_batch_size", 
DetailLevel.BASIC);
     }
 
     /**
@@ -283,6 +291,15 @@ public final class ServerMetrics {
     public final Summary STARTUP_TXNS_LOAD_TIME;
     public final Summary STARTUP_SNAP_LOAD_TIME;
 
+    public final Summary SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME;
+    public final Summary SYNC_PROCESSOR_QUEUE_SIZE;
+    public final Counter SYNC_PROCESSOR_QUEUED;
+    public final Summary SYNC_PROCESSOR_QUEUE_TIME;
+    public final Summary SYNC_PROCESSOR_FLUSH_TIME;
+    public final Summary SYNC_PROCESS_TIME;
+
+    public final Summary BATCH_SIZE;
+
     /**
      * Fired watcher stats.
      */
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
index 6bb7937..a3c9e0f 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
@@ -165,6 +165,8 @@ public class SyncRequestProcessor extends 
ZooKeeperCriticalThread implements
             resetSnapshotStats();
             lastFlushTime = Time.currentElapsedTime();
             while (true) {
+                
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
+
                 long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), 
getRemainingDelay());
                 Request si = queuedRequests.poll(pollTime, 
TimeUnit.MILLISECONDS);
                 if (si == null) {
@@ -177,6 +179,10 @@ public class SyncRequestProcessor extends 
ZooKeeperCriticalThread implements
                     break;
                 }
 
+                long startProcessTime = Time.currentElapsedTime();
+                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(
+                                    startProcessTime - si.syncQueueStartTime);
+
                 // track the number of records written to the log
                 if (zks.getZKDatabase().append(si)) {
                     if (shouldSnapshot()) {
@@ -217,6 +223,7 @@ public class SyncRequestProcessor extends 
ZooKeeperCriticalThread implements
                 if (shouldFlush()) {
                     flush();
                 }
+                
ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - 
startProcessTime);
             }
         } catch (Throwable t) {
             handleException(this.getName(), t);
@@ -229,13 +236,19 @@ public class SyncRequestProcessor extends 
ZooKeeperCriticalThread implements
           return;
       }
 
+      ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());
+
+      long flushStartTime = Time.currentElapsedTime();
       zks.getZKDatabase().commit();
+      
ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime()
 - flushStartTime);
 
       if (this.nextProcessor == null) {
         this.toFlush.clear();
       } else {
           while (!this.toFlush.isEmpty()) {
               final Request i = this.toFlush.remove();
+              long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
+              
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
               this.nextProcessor.processRequest(i);
           }
           if (this.nextProcessor instanceof Flushable) {
@@ -266,7 +279,10 @@ public class SyncRequestProcessor extends 
ZooKeeperCriticalThread implements
 
     public void processRequest(final Request request) {
         Objects.requireNonNull(request, "Request cannot be null");
+
+        request.syncQueueStartTime = Time.currentElapsedTime();
         queuedRequests.add(request);
+        ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
     }
 
 }
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java
new file mode 100644
index 0000000..843dfab
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.metrics.MetricsUtils;
+import org.apache.zookeeper.server.*;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.number.OrderingComparison.greaterThan;
+import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SyncRequestProcessorMetricTest {
+    ZooKeeperServer zks;
+    RequestProcessor nextProcessor;
+    CountDownLatch allRequestsFlushed;
+
+    @Before
+    public void setup() throws Exception {
+        ZKDatabase db = mock(ZKDatabase.class);
+        when(db.append(any(Request.class))).thenReturn(true);
+        doAnswer(invocation->{
+            Thread.sleep(100);
+            return null;
+        }).when(db).commit();
+        zks = mock(ZooKeeperServer.class);
+        when(zks.getZKDatabase()).thenReturn(db);
+
+        nextProcessor = mock(RequestProcessor.class);
+        doAnswer(invocationOnMock -> {
+            allRequestsFlushed.countDown();
+            return null;
+        }).when(nextProcessor).processRequest(any(Request.class));
+    }
+
+    private Request createRquest(long sessionId, int xid) {
+        return new Request(null, sessionId, xid, ZooDefs.OpCode.setData,
+                ByteBuffer.wrap(new byte[10]), null);
+    }
+
+    @Test
+    public void testSyncProcessorMetrics() throws  Exception{
+        SyncRequestProcessor syncProcessor = new SyncRequestProcessor(zks, 
nextProcessor);
+        for (int i=0; i<500; i++) {
+            syncProcessor.processRequest(createRquest(1, i));
+        }
+
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(500L, values.get("sync_processor_request_queued"));
+
+        allRequestsFlushed = new CountDownLatch(500);
+        syncProcessor.start();
+
+        allRequestsFlushed.await(5000, TimeUnit.MILLISECONDS);
+
+        values = MetricsUtils.currentServerMetrics();
+
+        Assert.assertEquals(501L, values.get("cnt_sync_processor_queue_size"));
+        Assert.assertEquals(500L, values.get("max_sync_processor_queue_size"));
+        Assert.assertEquals(0L, values.get("min_sync_processor_queue_size"));
+
+        Assert.assertEquals(500L, 
values.get("cnt_sync_processor_queue_time_ms"));
+        
Assert.assertThat((long)values.get("max_sync_processor_queue_time_ms"), 
greaterThan(0L));
+
+        Assert.assertEquals(500L, 
values.get("cnt_sync_processor_queue_and_flush_time_ms"));
+        
Assert.assertThat((long)values.get("max_sync_processor_queue_and_flush_time_ms"),
 greaterThan(0L));
+
+        Assert.assertEquals(500L, values.get("cnt_sync_process_time"));
+        Assert.assertThat((long)values.get("max_sync_process_time"), 
greaterThan(0L));
+
+        Assert.assertEquals(500L, values.get("max_sync_processor_batch_size"));
+        Assert.assertEquals(1L, 
values.get("cnt_sync_processor_queue_flush_time_ms"));
+        
Assert.assertThat((long)values.get("max_sync_processor_queue_flush_time_ms"), 
greaterThanOrEqualTo(100L));
+
+        syncProcessor.shutdown();
+    }
+}

Reply via email to