This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f85a2a  RATIS-682. Add follower metrics corresponding to LogAppender. 
Contributed by Siddharth Wagle.
3f85a2a is described below

commit 3f85a2abc3493b1f70e0841f8152001875b1f96a
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Nov 5 19:22:23 2019 +0530

    RATIS-682. Add follower metrics corresponding to LogAppender. Contributed 
by Siddharth Wagle.
---
 .../org/apache/ratis/server/impl/FollowerInfo.java |  3 +-
 .../org/apache/ratis/server/impl/LeaderState.java  |  7 +-
 .../ratis/server/metrics/LogAppenderMetrics.java   | 47 ++++++++++++
 .../ratis/server/metrics/RatisMetricNames.java     |  6 ++
 .../apache/ratis/server/metrics/RatisMetrics.java  |  7 ++
 .../ratis/server/impl/TestLogAppenderMetrics.java  | 86 ++++++++++++++++++++++
 6 files changed, 152 insertions(+), 4 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
index 5c69c26..802ebdd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
@@ -44,7 +44,6 @@ public class FollowerInfo {
   private volatile boolean attendVote;
   private final int rpcSlownessTimeoutMs;
 
-
   FollowerInfo(RaftGroupMemberId id, RaftPeer peer, Timestamp lastRpcTime, 
long nextIndex,
       boolean attendVote, int rpcSlownessTimeoutMs) {
     this.name = id + "->" + peer.getId();
@@ -135,7 +134,7 @@ public class FollowerInfo {
     lastRpcSendTime.set(Timestamp.currentTime());
   }
 
-  Timestamp getLastRpcTime() {
+  public Timestamp getLastRpcTime() {
     return Timestamp.latest(lastRpcResponseTime.get(), lastRpcSendTime.get());
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index b955352..9618652 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -21,6 +21,7 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.metrics.LogAppenderMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
@@ -201,6 +202,7 @@ public class LeaderState {
   private final TimeDuration syncInterval;
   private final long placeHolderIndex;
   private final RaftServerMetrics raftServerMetrics;
+  private final LogAppenderMetrics logAppenderMetrics;
 
   LeaderState(RaftServerImpl server, RaftProperties properties) {
     this.name = server.getMemberId() + "-" + getClass().getSimpleName();
@@ -216,6 +218,7 @@ public class LeaderState {
     this.eventQueue = new EventQueue();
     processor = new EventProcessor();
     raftServerMetrics = server.getRaftServerMetrics();
+    logAppenderMetrics = new LogAppenderMetrics(server.getMemberId());
     this.pendingRequests = new PendingRequests(server.getMemberId(), 
properties, raftServerMetrics);
     this.watchRequests = new WatchRequests(server.getMemberId(), properties);
 
@@ -389,8 +392,8 @@ public class LeaderState {
     final List<LogAppender> newAppenders = newPeers.stream()
         .map(peer -> {
           LogAppender logAppender = server.newLogAppender(this, peer, t, 
nextIndex, attendVote);
-          raftServerMetrics
-              .addFollower(logAppender.getFollower().getPeer());
+          raftServerMetrics.addFollower(logAppender.getFollower().getPeer());
+          logAppenderMetrics.addFollowerGauges(logAppender.getFollower());
           return logAppender;
         }).collect(Collectors.toList());
     senders.addAll(newAppenders);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/LogAppenderMetrics.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/LogAppenderMetrics.java
new file mode 100644
index 0000000..6e48138
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/LogAppenderMetrics.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.metrics;
+
+import static 
org.apache.ratis.server.metrics.RatisMetricNames.FOLLOWER_MATCH_INDEX;
+import static 
org.apache.ratis.server.metrics.RatisMetricNames.FOLLOWER_NEXT_INDEX;
+import static 
org.apache.ratis.server.metrics.RatisMetricNames.FOLLOWER_RPC_RESP_TIME;
+
+import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.server.impl.FollowerInfo;
+
+public final class LogAppenderMetrics {
+  private RatisMetricRegistry ratisMetricRegistry;
+
+  public LogAppenderMetrics(RaftGroupMemberId groupMemberId) {
+    ratisMetricRegistry = 
RatisMetrics.getMetricRegistryForLogAppender(groupMemberId.toString());
+  }
+
+  public RatisMetricRegistry getRatisMetricRegistry() {
+    return ratisMetricRegistry;
+  }
+
+  public void addFollowerGauges(FollowerInfo followerInfo) {
+    ratisMetricRegistry.gauge(String.format(FOLLOWER_NEXT_INDEX, 
followerInfo.getPeer().getId().toString()),
+        () -> followerInfo::getNextIndex);
+    ratisMetricRegistry.gauge(String.format(FOLLOWER_MATCH_INDEX, 
followerInfo.getPeer().getId().toString()),
+        () -> followerInfo::getMatchIndex);
+    ratisMetricRegistry.gauge(String.format(FOLLOWER_RPC_RESP_TIME, 
followerInfo.getPeer().getId().toString()),
+        () -> () -> followerInfo.getLastRpcTime().elapsedTimeMs());
+  }
+}
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
index ffd6054..a768418 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
@@ -109,4 +109,10 @@ public final class RatisMetricNames {
   public static final String RAFT_LOG_LOAD_SEGMENT_LATENCY = 
"segmentLoadLatency";
 
   public static final String FOLLOWER_APPEND_ENTRIES_LATENCY = 
"follower_append_entry_latency";
+
+  public static final String FOLLOWER_NEXT_INDEX = "follower_%s_next_index";
+
+  public static final String FOLLOWER_MATCH_INDEX = "follower_%s_match_index";
+
+  public static final String FOLLOWER_RPC_RESP_TIME = 
"follower_%s_rpc_response_time";
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
index fd71fe3..3122b8b 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
@@ -34,6 +34,8 @@ public class RatisMetrics {
   public static final String RATIS_STATEMACHINE_METRICS_DESC = "Metrics for 
State Machine Updater";
   public static final String RATIS_SERVER_METRICS = "server";
   public static final String RATIS_SERVER_METRICS_DESC = "Metrics for Raft 
server";
+  public static final String RATIS_LOG_APPENDER_METRICS = "log_appender";
+  public static final String RATIS_LOG_APPENDER_METRICS_DESC = "Metrics for 
log appender";
 
   private static RatisMetricRegistry create(MetricRegistryInfo info) {
     Optional<RatisMetricRegistry> metricRegistry = 
MetricRegistries.global().get(info);
@@ -75,4 +77,9 @@ public class RatisMetrics {
             RATIS_LOG_WORKER_METRICS_DESC));
     return ratisMetricRegistry.orElse(null);
   }
+
+  public static RatisMetricRegistry getMetricRegistryForLogAppender(String 
serverId) {
+    return create(new MetricRegistryInfo(serverId, 
RATIS_APPLICATION_NAME_METRICS,
+        RATIS_LOG_APPENDER_METRICS, RATIS_LOG_APPENDER_METRICS_DESC));
+  }
 }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestLogAppenderMetrics.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestLogAppenderMetrics.java
new file mode 100644
index 0000000..42d8a2c
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestLogAppenderMetrics.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static 
org.apache.ratis.server.metrics.RatisMetricNames.FOLLOWER_MATCH_INDEX;
+import static 
org.apache.ratis.server.metrics.RatisMetricNames.FOLLOWER_NEXT_INDEX;
+import static 
org.apache.ratis.server.metrics.RatisMetricNames.FOLLOWER_RPC_RESP_TIME;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.metrics.LogAppenderMetrics;
+import org.apache.ratis.util.Timestamp;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.codahale.metrics.Gauge;
+
+public class TestLogAppenderMetrics {
+
+  private RatisMetricRegistry ratisMetricRegistry;
+  private RaftPeerId raftPeerId;
+  private FollowerInfo followerInfo;
+
+  @Before
+  public void setup() {
+    RaftGroupId raftGroupId = RaftGroupId.randomId();
+    raftPeerId = RaftPeerId.valueOf("TestId");
+    RaftPeer raftPeer = new RaftPeer(raftPeerId);
+    RaftGroupMemberId raftGroupMemberId = 
RaftGroupMemberId.valueOf(raftPeerId, raftGroupId);
+    LogAppender logAppender = mock(LogAppender.class);
+    followerInfo = new TestFollowerInfo(raftGroupMemberId, raftPeer, 
Timestamp.currentTime(), 100L, true, 1000);
+    when(logAppender.getFollower()).thenReturn(followerInfo);
+    LogAppenderMetrics logAppenderMetrics = new 
LogAppenderMetrics(raftGroupMemberId);
+    ratisMetricRegistry = logAppenderMetrics.getRatisMetricRegistry();
+    logAppenderMetrics.addFollowerGauges(followerInfo);
+  }
+
+  @Test
+  public void testLogAppenderGauges() {
+    Gauge nextIndex = ratisMetricRegistry.getGauges((s, metric) ->
+        s.contains(String.format(FOLLOWER_NEXT_INDEX, 
raftPeerId.toString()))).values().iterator().next();
+    Assert.assertEquals(100L, nextIndex.getValue());
+    Gauge matchIndex = ratisMetricRegistry.getGauges((s, metric) ->
+        s.contains(String.format(FOLLOWER_MATCH_INDEX, 
raftPeerId.toString()))).values().iterator().next();
+    Assert.assertEquals(0L, matchIndex.getValue());
+    Gauge rpcTime = ratisMetricRegistry.getGauges((s, metric) ->
+        s.contains(String.format(FOLLOWER_RPC_RESP_TIME, 
raftPeerId.toString()))).values().iterator().next();
+    Assert.assertTrue(((Long) rpcTime.getValue()) > 0);
+    followerInfo.updateNextIndex(200L);
+    followerInfo.updateMatchIndex(100L);
+    followerInfo.updateLastRpcResponseTime();
+    Assert.assertEquals(200L, nextIndex.getValue());
+    Assert.assertEquals(100L, matchIndex.getValue());
+    Assert.assertNotNull(rpcTime.getValue());
+  }
+
+  private static class TestFollowerInfo extends FollowerInfo {
+    TestFollowerInfo(RaftGroupMemberId id, RaftPeer peer, Timestamp
+        lastRpcTime, long nextIndex, boolean attendVote, int
+        rpcSlownessTimeoutMs) {
+      super(id, peer, lastRpcTime, nextIndex, attendVote, 
rpcSlownessTimeoutMs);
+    }
+  }
+}
+

Reply via email to