This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 699792d  RATIS-648. Add metrics related to GrpcLogAppendRequests. 
Contributed by Siddharth Wagle.
699792d is described below

commit 699792d56ca3828351b9e72cf983562eb8c11b21
Author: Shashikant Banerjee <[email protected]>
AuthorDate: Thu Oct 10 14:55:11 2019 +0530

    RATIS-648. Add metrics related to GrpcLogAppendRequests. Contributed by 
Siddharth Wagle.
---
 ratis-grpc/pom.xml                                 |  13 ++-
 .../ratis/grpc/metrics/GrpcServerMetrics.java      |  92 ++++++++++++++++++
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |  74 +++++++++++----
 .../ratis/grpc/server/TestGrpcServerMetrics.java   | 105 +++++++++++++++++++++
 .../apache/ratis/server/impl/RaftServerImpl.java   |   9 ++
 .../ratis/server/metrics/RatisMetricNames.java     |   1 +
 .../apache/ratis/server/metrics/RatisMetrics.java  |   6 ++
 .../server/metrics/TestLeaderElectionMetrics.java  |   2 -
 8 files changed, 279 insertions(+), 23 deletions(-)

diff --git a/ratis-grpc/pom.xml b/ratis-grpc/pom.xml
index 854138f..5b6ca6d 100644
--- a/ratis-grpc/pom.xml
+++ b/ratis-grpc/pom.xml
@@ -43,7 +43,6 @@
       <scope>test</scope>
       <type>test-jar</type>
     </dependency>
-    
     <dependency>
       <artifactId>ratis-client</artifactId>
       <groupId>org.apache.ratis</groupId>
@@ -54,7 +53,6 @@
       <scope>test</scope>
       <type>test-jar</type>
     </dependency>
-    
     <dependency>
       <artifactId>ratis-server</artifactId>
       <groupId>org.apache.ratis</groupId>
@@ -66,10 +64,19 @@
       <scope>test</scope>
       <type>test-jar</type>
     </dependency>
-    
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/GrpcServerMetrics.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/GrpcServerMetrics.java
new file mode 100644
index 0000000..75da000
--- /dev/null
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/GrpcServerMetrics.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.metrics;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ratis.metrics.JVMMetrics;
+import org.apache.ratis.metrics.MetricRegistries;
+import org.apache.ratis.metrics.MetricRegistryInfo;
+import org.apache.ratis.metrics.MetricsReporting;
+import org.apache.ratis.metrics.RatisMetricRegistry;
+
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+
+public class GrpcServerMetrics {
+  private final RatisMetricRegistry registry;
+  private static MetricsReporting metricsReporting = new MetricsReporting(500, 
TimeUnit.MILLISECONDS);
+
+  private static final String RATIS_GRPC_METRICS_APP_NAME = "ratis_grpc";
+  private static final String RATIS_GRPC_METRICS_COMP_NAME = "log_appender";
+  private static final String RATIS_GRPC_METRICS_DESC = "Metrics for Ratis 
Grpc Log Appender";
+
+  public static final String RATIS_GRPC_METRICS_LOG_APPENDER_LATENCY =
+      "grpc_log_appender_follower_%s_latency";
+  public static final String RATIS_GRPC_METRICS_LOG_APPENDER_SUCCESS =
+      "grpc_log_appender_follower_%s_success_reply_count";
+  public static final String RATIS_GRPC_METRICS_LOG_APPENDER_NOT_LEADER =
+      "grpc_log_appender_follower_%s_not_leader_reply_count";
+  public static final String RATIS_GRPC_METRICS_LOG_APPENDER_INCONSISTENCY =
+      "grpc_log_appender_follower_%s_inconsistency_reply_count";
+  public static final String RATIS_GRPC_METRICS_REQUEST_RETRY_COUNT = 
"grpc_log_appender_num_retries";
+  public static final String RATIS_GRPC_METRICS_REQUESTS_TOTAL = 
"grpc_log_appender_num_requests";
+
+  public GrpcServerMetrics(String serverId) {
+    MetricRegistryInfo info = new MetricRegistryInfo(serverId, 
RATIS_GRPC_METRICS_APP_NAME,
+        RATIS_GRPC_METRICS_COMP_NAME, RATIS_GRPC_METRICS_DESC);
+    Optional<RatisMetricRegistry> metricRegistry = 
MetricRegistries.global().get(info);
+
+    registry = metricRegistry.orElseGet(() -> 
MetricRegistries.global().create(info));
+
+    metricsReporting.startMetricsReporter(registry, 
MetricsReporting.MetricReporterType.JMX,
+            MetricsReporting.MetricReporterType.HADOOP2);
+    // JVM metrics
+    JVMMetrics.startJVMReporting(1000, TimeUnit.MILLISECONDS, 
MetricsReporting.MetricReporterType.JMX);
+  }
+
+  public Timer getGrpcLogAppenderLatencyTimer(String follower) {
+    return 
registry.timer(String.format(RATIS_GRPC_METRICS_LOG_APPENDER_LATENCY, 
follower));
+  }
+
+  public void onRequestRetry() {
+    registry.counter(RATIS_GRPC_METRICS_REQUEST_RETRY_COUNT).inc();
+  }
+
+  public void onRequestCreate() {
+    registry.counter(RATIS_GRPC_METRICS_REQUESTS_TOTAL).inc();
+  }
+
+  public void onRequestSuccess(String follower) {
+    registry.counter(String.format(RATIS_GRPC_METRICS_LOG_APPENDER_SUCCESS, 
follower)).inc();
+  }
+
+  public void onRequestNotLeader(String follower) {
+      
registry.counter(String.format(RATIS_GRPC_METRICS_LOG_APPENDER_NOT_LEADER, 
follower)).inc();
+  }
+
+  public void onRequestInconsistency(String follower) {
+    
registry.counter(String.format(RATIS_GRPC_METRICS_LOG_APPENDER_INCONSISTENCY, 
follower)).inc();
+  }
+
+  @VisibleForTesting
+  public RatisMetricRegistry getRegistry() {
+    return registry;
+  }
+}
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 549426b..6374c31 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -19,6 +19,7 @@ package org.apache.ratis.grpc.server;
 
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.FollowerInfo;
 import org.apache.ratis.server.impl.LeaderState;
@@ -41,6 +42,8 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.codahale.metrics.Timer;
+
 /**
  * A new log appender implementation using grpc bi-directional stream API.
  */
@@ -48,7 +51,7 @@ public class GrpcLogAppender extends LogAppender {
   public static final Logger LOG = 
LoggerFactory.getLogger(GrpcLogAppender.class);
 
   private final GrpcService rpcService;
-  private final Map<Long, AppendEntriesRequestProto> pendingRequests;
+  private final Map<Long, AppendEntriesRequest> pendingRequests;
   private final int maxPendingRequestsNum;
   private long callId = 0;
   private volatile boolean firstResponseReceived = false;
@@ -59,6 +62,8 @@ public class GrpcLogAppender extends LogAppender {
 
   private volatile StreamObserver<AppendEntriesRequestProto> 
appendLogRequestObserver;
 
+  private final GrpcServerMetrics grpcServerMetrics;
+
   public GrpcLogAppender(RaftServerImpl server, LeaderState leaderState,
                          FollowerInfo f) {
     super(server, leaderState, f);
@@ -71,6 +76,7 @@ public class GrpcLogAppender extends LogAppender {
     pendingRequests = new ConcurrentHashMap<>();
     installSnapshotEnabled = 
RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(
         server.getProxy().getProperties());
+    grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString());
   }
 
   private GrpcServerProtocolClient getClient() throws IOException {
@@ -160,17 +166,20 @@ public class GrpcLogAppender extends LogAppender {
   }
 
   private void appendLog() throws IOException {
-    final AppendEntriesRequestProto pending;
+    final AppendEntriesRequest request;
     final StreamObserver<AppendEntriesRequestProto> s;
     synchronized (this) {
       // prepare and enqueue the append request. note changes on follower's
       // nextIndex and ops on pendingRequests should always be associated
       // together and protected by the lock
-      pending = createRequest(callId++);
+      AppendEntriesRequestProto pending = createRequest(callId++);
       if (pending == null) {
         return;
       }
-      pendingRequests.put(pending.getServerRequest().getCallId(), pending);
+      grpcServerMetrics.onRequestCreate();
+      request = new AppendEntriesRequest(pending,
+          
grpcServerMetrics.getGrpcLogAppenderLatencyTimer(getFollowerId().toString()));
+      pendingRequests.put(pending.getServerRequest().getCallId(), request);
       increaseNextIndex(pending);
       if (appendLogRequestObserver == null) {
         appendLogRequestObserver = getClient().appendEntries(new 
AppendLogResponseHandler());
@@ -179,25 +188,26 @@ public class GrpcLogAppender extends LogAppender {
     }
 
     if (isAppenderRunning()) {
-      sendRequest(pending, s);
+      sendRequest(request, s);
     }
   }
 
-  private void sendRequest(AppendEntriesRequestProto request,
-      StreamObserver<AppendEntriesRequestProto> s) {
+  private void sendRequest(AppendEntriesRequest request, 
StreamObserver<AppendEntriesRequestProto> s) {
     CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST,
         server.getId(), null, request);
-
-    s.onNext(request);
-    scheduler.onTimeout(requestTimeoutDuration, () -> 
timeoutAppendRequest(request), LOG,
+    AppendEntriesRequestProto requestProto = request.getRequestProto();
+    request.startRequestTimer();
+    s.onNext(requestProto);
+    scheduler.onTimeout(requestTimeoutDuration, () -> 
timeoutAppendRequest(requestProto), LOG,
         () -> "Timeout check failed for append entry request: " + request);
     follower.updateLastRpcSendTime();
   }
 
   private void timeoutAppendRequest(AppendEntriesRequestProto request) {
-    AppendEntriesRequestProto pendingRequest = 
pendingRequests.remove(request.getServerRequest().getCallId());
+    AppendEntriesRequest pendingRequest = 
pendingRequests.remove(request.getServerRequest().getCallId());
     if (pendingRequest != null) {
-      LOG.warn( "{}: appendEntries Timeout, request={}", this, 
ServerProtoUtils.toString(pendingRequest));
+      LOG.warn( "{}: appendEntries Timeout, request={}", this,
+          ServerProtoUtils.toString(pendingRequest.getRequestProto()));
     }
   }
 
@@ -224,17 +234,19 @@ public class GrpcLogAppender extends LogAppender {
      */
     @Override
     public void onNext(AppendEntriesReplyProto reply) {
-      final AppendEntriesRequestProto request = 
pendingRequests.remove(reply.getServerReply().getCallId());
+      final AppendEntriesRequest request = 
pendingRequests.remove(reply.getServerReply().getCallId());
+      AppendEntriesRequestProto requestProto = request.getRequestProto();
       if (LOG.isDebugEnabled()) {
         LOG.debug("{}: received {} reply {}, request={}",
             this, firstResponseReceived? "a": "the first",
-            ServerProtoUtils.toString(reply), 
ServerProtoUtils.toString(request));
+            ServerProtoUtils.toString(reply), 
ServerProtoUtils.toString(requestProto));
       }
+      request.stopRequestTimer(); // Update completion time
 
       try {
-        onNextImpl(request, reply);
+        onNextImpl(requestProto, reply);
       } catch(Throwable t) {
-        LOG.error("Failed onNext request=" + ServerProtoUtils.toString(request)
+        LOG.error("Failed onNext request=" + 
ServerProtoUtils.toString(requestProto)
             + ", reply=" + ServerProtoUtils.toString(reply), t);
       }
     }
@@ -254,17 +266,20 @@ public class GrpcLogAppender extends LogAppender {
 
       switch (reply.getResult()) {
         case SUCCESS:
+          grpcServerMetrics.onRequestSuccess(getFollowerId().toString());
           updateCommitIndex(reply.getFollowerCommit());
           if (checkAndUpdateMatchIndex(request)) {
             submitEventOnSuccessAppend();
           }
           break;
         case NOT_LEADER:
+          grpcServerMetrics.onRequestNotLeader(getFollowerId().toString());
           if (checkResponseTerm(reply.getTerm())) {
             return;
           }
           break;
         case INCONSISTENCY:
+          grpcServerMetrics.onRequestInconsistency(getFollowerId().toString());
           updateNextIndex(reply.getNextIndex());
           break;
         default:
@@ -283,9 +298,9 @@ public class GrpcLogAppender extends LogAppender {
         return;
       }
       GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries", t);
-
+      grpcServerMetrics.onRequestRetry(); // Update try counter
       long callId = GrpcUtil.getCallId(t);
-      resetClient(pendingRequests.remove(callId));
+      resetClient(pendingRequests.remove(callId).getRequestProto());
     }
 
     @Override
@@ -504,4 +519,27 @@ public class GrpcLogAppender extends LogAppender {
     }
     return null;
   }
+
+  static class AppendEntriesRequest {
+    private final AppendEntriesRequestProto requestProto;
+    private final Timer timer;
+    private Timer.Context timerContext;
+
+    AppendEntriesRequest(AppendEntriesRequestProto requestProto, Timer timer) {
+      this.requestProto = requestProto;
+      this.timer = timer;
+    }
+
+    AppendEntriesRequestProto getRequestProto() {
+      return requestProto;
+    }
+
+    void startRequestTimer() {
+      timerContext = timer.time();
+    }
+
+    void stopRequestTimer() {
+      timerContext.stop();
+    }
+  }
 }
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/server/TestGrpcServerMetrics.java
 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/server/TestGrpcServerMetrics.java
new file mode 100644
index 0000000..f868b31
--- /dev/null
+++ 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/server/TestGrpcServerMetrics.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import static 
org.apache.ratis.grpc.metrics.GrpcServerMetrics.RATIS_GRPC_METRICS_LOG_APPENDER_INCONSISTENCY;
+import static 
org.apache.ratis.grpc.metrics.GrpcServerMetrics.RATIS_GRPC_METRICS_LOG_APPENDER_LATENCY;
+import static 
org.apache.ratis.grpc.metrics.GrpcServerMetrics.RATIS_GRPC_METRICS_LOG_APPENDER_NOT_LEADER;
+import static 
org.apache.ratis.grpc.metrics.GrpcServerMetrics.RATIS_GRPC_METRICS_LOG_APPENDER_SUCCESS;
+import static 
org.apache.ratis.grpc.metrics.GrpcServerMetrics.RATIS_GRPC_METRICS_REQUESTS_TOTAL;
+import static 
org.apache.ratis.grpc.metrics.GrpcServerMetrics.RATIS_GRPC_METRICS_REQUEST_RETRY_COUNT;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.function.Consumer;
+
+import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
+import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerState;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestGrpcServerMetrics {
+  private static GrpcServerMetrics grpcServerMetrics;
+  private static RatisMetricRegistry ratisMetricRegistry;
+  private static RaftGroupId raftGroupId;
+  private static RaftPeerId raftPeerId;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    RaftServerImpl raftServer = mock(RaftServerImpl.class);
+    ServerState serverStateMock = mock(ServerState.class);
+    when(raftServer.getState()).thenReturn(serverStateMock);
+    when(serverStateMock.getLastLeaderElapsedTimeMs()).thenReturn(1000L);
+    raftGroupId = RaftGroupId.randomId();
+    raftPeerId = RaftPeerId.valueOf("TestId");
+    RaftGroupMemberId raftGroupMemberId = 
RaftGroupMemberId.valueOf(raftPeerId, raftGroupId);
+    when(raftServer.getMemberId()).thenReturn(raftGroupMemberId);
+    grpcServerMetrics = new GrpcServerMetrics(raftGroupMemberId.toString());
+    ratisMetricRegistry = grpcServerMetrics.getRegistry();
+  }
+
+  @Test
+  public void testGrpcLogAppenderLatencyTimer() throws Exception {
+    RaftProtos.AppendEntriesRequestProto.Builder proto = 
RaftProtos.AppendEntriesRequestProto.newBuilder();
+    GrpcLogAppender.AppendEntriesRequest req =
+        new GrpcLogAppender.AppendEntriesRequest(proto.build(),
+            
grpcServerMetrics.getGrpcLogAppenderLatencyTimer(raftPeerId.toString()));
+    Assert.assertEquals(0L, ratisMetricRegistry.timer(String.format(
+        RATIS_GRPC_METRICS_LOG_APPENDER_LATENCY, 
raftPeerId.toString())).getSnapshot().getMax());
+    req.startRequestTimer();
+    Thread.sleep(1000L);
+    req.stopRequestTimer();
+    Assert.assertTrue(ratisMetricRegistry.timer(String.format(
+        RATIS_GRPC_METRICS_LOG_APPENDER_LATENCY, 
raftPeerId.toString())).getSnapshot().getMax() > 1000L);
+  }
+
+  @Test
+  public void testGrpcLogRequestTotal() {
+    Assert.assertEquals(0L, 
ratisMetricRegistry.counter(RATIS_GRPC_METRICS_REQUESTS_TOTAL).getCount());
+    grpcServerMetrics.onRequestCreate();
+    Assert.assertEquals(1L, 
ratisMetricRegistry.counter(RATIS_GRPC_METRICS_REQUESTS_TOTAL).getCount());
+  }
+
+  @Test
+  public void testGrpcLogRequestRetry() {
+    Assert.assertEquals(0L, 
ratisMetricRegistry.counter(RATIS_GRPC_METRICS_REQUEST_RETRY_COUNT).getCount());
+    grpcServerMetrics.onRequestRetry();
+    Assert.assertEquals(1L, 
ratisMetricRegistry.counter(RATIS_GRPC_METRICS_REQUEST_RETRY_COUNT).getCount());
+  }
+
+  @Test
+  public void testGrpcLogAppenderRequestCounters() {
+    assertCounterIncremented(RATIS_GRPC_METRICS_LOG_APPENDER_SUCCESS, 
grpcServerMetrics::onRequestSuccess);
+    assertCounterIncremented(RATIS_GRPC_METRICS_LOG_APPENDER_NOT_LEADER, 
grpcServerMetrics::onRequestNotLeader);
+    assertCounterIncremented(RATIS_GRPC_METRICS_LOG_APPENDER_INCONSISTENCY, 
grpcServerMetrics::onRequestInconsistency);
+  }
+
+  private void assertCounterIncremented(String counterVar, Consumer<String> 
incFunction) {
+    String counter = String.format(counterVar, raftPeerId.toString());
+    Assert.assertEquals(0L, ratisMetricRegistry.counter(counter).getCount());
+    incFunction.accept(raftPeerId.toString());
+    Assert.assertEquals(1L, ratisMetricRegistry.counter(counter).getCount());
+  }
+}
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 0a1fd49..3b724c6 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.server.impl;
 
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
@@ -25,6 +26,8 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerMXBean;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.metrics.LeaderElectionMetrics;
+import org.apache.ratis.server.metrics.RatisMetricNames;
+import org.apache.ratis.server.metrics.RatisMetrics;
 import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -61,6 +64,8 @@ import static org.apache.ratis.util.LifeCycle.State.NEW;
 import static org.apache.ratis.util.LifeCycle.State.RUNNING;
 import static org.apache.ratis.util.LifeCycle.State.STARTING;
 
+import com.codahale.metrics.Timer;
+
 public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronousProtocol,
     RaftClientProtocol, RaftClientAsynchronousProtocol {
   public static final Logger LOG = 
LoggerFactory.getLogger(RaftServerImpl.class);
@@ -88,6 +93,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
 
   private final RaftServerJmxAdapter jmxAdapter;
   private final LeaderElectionMetrics leaderElectionMetricsRegistry;
+  private final RatisMetricRegistry raftServerMetricsRegistry;
 
   private AtomicReference<TermIndex> inProgressInstallSnapshotRequest;
 
@@ -116,6 +122,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
 
     this.jmxAdapter = new RaftServerJmxAdapter();
     this.leaderElectionMetricsRegistry = getLeaderElectionMetrics(this);
+    this.raftServerMetricsRegistry = 
RatisMetrics.getMetricsRegistryForServer(id.toString());
   }
 
   private RetryCache initRetryCache(RaftProperties prop) {
@@ -911,6 +918,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     final long currentTerm;
     final long followerCommit = state.getLog().getLastCommittedIndex();
     final Optional<FollowerState> followerState;
+    Timer.Context timer = 
raftServerMetricsRegistry.timer(RatisMetricNames.FOLLOWER_APPEND_ENTRIES_LATENCY).time();
     synchronized (this) {
       final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
       currentTerm = state.getCurrentTerm();
@@ -971,6 +979,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       }
       logAppendEntries(isHeartbeat, () ->
           getMemberId() + ": succeeded to handle AppendEntries. Reply: " + 
ServerProtoUtils.toString(reply));
+      timer.stop();  // TODO: future never completes exceptionally?
       return reply;
     });
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
index 7bc1033..57f7d9a 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
@@ -93,4 +93,5 @@ public final class RatisMetricNames {
   // Time required to load and process raft log segments during restart
   public static final String RAFT_LOG_LOAD_SEGMENT_LATENCY = 
"segmentLoadLatency";
 
+  public static final String FOLLOWER_APPEND_ENTRIES_LATENCY = 
"follower_append_entry_latency";
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
index 384b85f..2877351 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
@@ -37,6 +37,8 @@ public class RatisMetrics {
   public static final String RATIS_LEADER_METRICS_DESC = "Metrics for Ratis 
Leader.";
   public static final String RATIS_STATEMACHINE_METRICS = 
"ratis_state_machine";
   public static final String RATIS_STATEMACHINE_METRICS_DESC = "Metrics for 
State Machine Updater";
+  public static final String RATIS_SERVER_METRICS = "server";
+  public static final String RATIS_SERVER_METRICS_DESC = "Metrics for Raft 
server";
 
   static MetricsReporting metricsReporting = new MetricsReporting(500, 
TimeUnit.MILLISECONDS);
 
@@ -88,4 +90,8 @@ public class RatisMetrics {
     return ratisMetricRegistry.orElse(null);
   }
 
+  public static RatisMetricRegistry getMetricsRegistryForServer(String 
serverId) {
+    return create(new MetricRegistryInfo(serverId, 
RATIS_APPLICATION_NAME_METRICS, RATIS_SERVER_METRICS,
+        RATIS_SERVER_METRICS_DESC));
+  }
 }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/metrics/TestLeaderElectionMetrics.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/metrics/TestLeaderElectionMetrics.java
index b17348e..44750a1 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/metrics/TestLeaderElectionMetrics.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/metrics/TestLeaderElectionMetrics.java
@@ -35,8 +35,6 @@ import org.apache.ratis.server.impl.ServerState;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import com.google.protobuf.ByteString;
-
 /**
  * Test for LeaderElectionMetrics.
  */

Reply via email to