This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 3f85a2a RATIS-682. Add follower metrics corresponding to LogAppender.
Contributed by Siddharth Wagle.
3f85a2a is described below
commit 3f85a2abc3493b1f70e0841f8152001875b1f96a
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Nov 5 19:22:23 2019 +0530
RATIS-682. Add follower metrics corresponding to LogAppender. Contributed
by Siddharth Wagle.
---
.../org/apache/ratis/server/impl/FollowerInfo.java | 3 +-
.../org/apache/ratis/server/impl/LeaderState.java | 7 +-
.../ratis/server/metrics/LogAppenderMetrics.java | 47 ++++++++++++
.../ratis/server/metrics/RatisMetricNames.java | 6 ++
.../apache/ratis/server/metrics/RatisMetrics.java | 7 ++
.../ratis/server/impl/TestLogAppenderMetrics.java | 86 ++++++++++++++++++++++
6 files changed, 152 insertions(+), 4 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
index 5c69c26..802ebdd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
@@ -44,7 +44,6 @@ public class FollowerInfo {
private volatile boolean attendVote;
private final int rpcSlownessTimeoutMs;
-
FollowerInfo(RaftGroupMemberId id, RaftPeer peer, Timestamp lastRpcTime,
long nextIndex,
boolean attendVote, int rpcSlownessTimeoutMs) {
this.name = id + "->" + peer.getId();
@@ -135,7 +134,7 @@ public class FollowerInfo {
lastRpcSendTime.set(Timestamp.currentTime());
}
- Timestamp getLastRpcTime() {
+ public Timestamp getLastRpcTime() {
return Timestamp.latest(lastRpcResponseTime.get(), lastRpcSendTime.get());
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index b955352..9618652 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -21,6 +21,7 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.protocol.*;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.metrics.LogAppenderMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
@@ -201,6 +202,7 @@ public class LeaderState {
private final TimeDuration syncInterval;
private final long placeHolderIndex;
private final RaftServerMetrics raftServerMetrics;
+ private final LogAppenderMetrics logAppenderMetrics;
LeaderState(RaftServerImpl server, RaftProperties properties) {
this.name = server.getMemberId() + "-" + getClass().getSimpleName();
@@ -216,6 +218,7 @@ public class LeaderState {
this.eventQueue = new EventQueue();
processor = new EventProcessor();
raftServerMetrics = server.getRaftServerMetrics();
+ logAppenderMetrics = new LogAppenderMetrics(server.getMemberId());
this.pendingRequests = new PendingRequests(server.getMemberId(),
properties, raftServerMetrics);
this.watchRequests = new WatchRequests(server.getMemberId(), properties);
@@ -389,8 +392,8 @@ public class LeaderState {
final List<LogAppender> newAppenders = newPeers.stream()
.map(peer -> {
LogAppender logAppender = server.newLogAppender(this, peer, t,
nextIndex, attendVote);
- raftServerMetrics
- .addFollower(logAppender.getFollower().getPeer());
+ raftServerMetrics.addFollower(logAppender.getFollower().getPeer());
+ logAppenderMetrics.addFollowerGauges(logAppender.getFollower());
return logAppender;
}).collect(Collectors.toList());
senders.addAll(newAppenders);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/LogAppenderMetrics.java
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/LogAppenderMetrics.java
new file mode 100644
index 0000000..6e48138
--- /dev/null
+++
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/LogAppenderMetrics.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.metrics;
+
+import static
org.apache.ratis.server.metrics.RatisMetricNames.FOLLOWER_MATCH_INDEX;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.FOLLOWER_NEXT_INDEX;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.FOLLOWER_RPC_RESP_TIME;
+
+import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.server.impl.FollowerInfo;
+
+public final class LogAppenderMetrics {
+ private RatisMetricRegistry ratisMetricRegistry;
+
+ public LogAppenderMetrics(RaftGroupMemberId groupMemberId) {
+ ratisMetricRegistry =
RatisMetrics.getMetricRegistryForLogAppender(groupMemberId.toString());
+ }
+
+ public RatisMetricRegistry getRatisMetricRegistry() {
+ return ratisMetricRegistry;
+ }
+
+ public void addFollowerGauges(FollowerInfo followerInfo) {
+ ratisMetricRegistry.gauge(String.format(FOLLOWER_NEXT_INDEX,
followerInfo.getPeer().getId().toString()),
+ () -> followerInfo::getNextIndex);
+ ratisMetricRegistry.gauge(String.format(FOLLOWER_MATCH_INDEX,
followerInfo.getPeer().getId().toString()),
+ () -> followerInfo::getMatchIndex);
+ ratisMetricRegistry.gauge(String.format(FOLLOWER_RPC_RESP_TIME,
followerInfo.getPeer().getId().toString()),
+ () -> () -> followerInfo.getLastRpcTime().elapsedTimeMs());
+ }
+}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
index ffd6054..a768418 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
@@ -109,4 +109,10 @@ public final class RatisMetricNames {
public static final String RAFT_LOG_LOAD_SEGMENT_LATENCY =
"segmentLoadLatency";
public static final String FOLLOWER_APPEND_ENTRIES_LATENCY =
"follower_append_entry_latency";
+
+ public static final String FOLLOWER_NEXT_INDEX = "follower_%s_next_index";
+
+ public static final String FOLLOWER_MATCH_INDEX = "follower_%s_match_index";
+
+ public static final String FOLLOWER_RPC_RESP_TIME =
"follower_%s_rpc_response_time";
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
index fd71fe3..3122b8b 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
@@ -34,6 +34,8 @@ public class RatisMetrics {
public static final String RATIS_STATEMACHINE_METRICS_DESC = "Metrics for
State Machine Updater";
public static final String RATIS_SERVER_METRICS = "server";
public static final String RATIS_SERVER_METRICS_DESC = "Metrics for Raft
server";
+ public static final String RATIS_LOG_APPENDER_METRICS = "log_appender";
+ public static final String RATIS_LOG_APPENDER_METRICS_DESC = "Metrics for
log appender";
private static RatisMetricRegistry create(MetricRegistryInfo info) {
Optional<RatisMetricRegistry> metricRegistry =
MetricRegistries.global().get(info);
@@ -75,4 +77,9 @@ public class RatisMetrics {
RATIS_LOG_WORKER_METRICS_DESC));
return ratisMetricRegistry.orElse(null);
}
+
+ public static RatisMetricRegistry getMetricRegistryForLogAppender(String
serverId) {
+ return create(new MetricRegistryInfo(serverId,
RATIS_APPLICATION_NAME_METRICS,
+ RATIS_LOG_APPENDER_METRICS, RATIS_LOG_APPENDER_METRICS_DESC));
+ }
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestLogAppenderMetrics.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestLogAppenderMetrics.java
new file mode 100644
index 0000000..42d8a2c
--- /dev/null
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestLogAppenderMetrics.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static
org.apache.ratis.server.metrics.RatisMetricNames.FOLLOWER_MATCH_INDEX;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.FOLLOWER_NEXT_INDEX;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.FOLLOWER_RPC_RESP_TIME;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.metrics.LogAppenderMetrics;
+import org.apache.ratis.util.Timestamp;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.codahale.metrics.Gauge;
+
+public class TestLogAppenderMetrics {
+
+ private RatisMetricRegistry ratisMetricRegistry;
+ private RaftPeerId raftPeerId;
+ private FollowerInfo followerInfo;
+
+ @Before
+ public void setup() {
+ RaftGroupId raftGroupId = RaftGroupId.randomId();
+ raftPeerId = RaftPeerId.valueOf("TestId");
+ RaftPeer raftPeer = new RaftPeer(raftPeerId);
+ RaftGroupMemberId raftGroupMemberId =
RaftGroupMemberId.valueOf(raftPeerId, raftGroupId);
+ LogAppender logAppender = mock(LogAppender.class);
+ followerInfo = new TestFollowerInfo(raftGroupMemberId, raftPeer,
Timestamp.currentTime(), 100L, true, 1000);
+ when(logAppender.getFollower()).thenReturn(followerInfo);
+ LogAppenderMetrics logAppenderMetrics = new
LogAppenderMetrics(raftGroupMemberId);
+ ratisMetricRegistry = logAppenderMetrics.getRatisMetricRegistry();
+ logAppenderMetrics.addFollowerGauges(followerInfo);
+ }
+
+ @Test
+ public void testLogAppenderGauges() {
+ Gauge nextIndex = ratisMetricRegistry.getGauges((s, metric) ->
+ s.contains(String.format(FOLLOWER_NEXT_INDEX,
raftPeerId.toString()))).values().iterator().next();
+ Assert.assertEquals(100L, nextIndex.getValue());
+ Gauge matchIndex = ratisMetricRegistry.getGauges((s, metric) ->
+ s.contains(String.format(FOLLOWER_MATCH_INDEX,
raftPeerId.toString()))).values().iterator().next();
+ Assert.assertEquals(0L, matchIndex.getValue());
+ Gauge rpcTime = ratisMetricRegistry.getGauges((s, metric) ->
+ s.contains(String.format(FOLLOWER_RPC_RESP_TIME,
raftPeerId.toString()))).values().iterator().next();
+ Assert.assertTrue(((Long) rpcTime.getValue()) > 0);
+ followerInfo.updateNextIndex(200L);
+ followerInfo.updateMatchIndex(100L);
+ followerInfo.updateLastRpcResponseTime();
+ Assert.assertEquals(200L, nextIndex.getValue());
+ Assert.assertEquals(100L, matchIndex.getValue());
+ Assert.assertNotNull(rpcTime.getValue());
+ }
+
+ private static class TestFollowerInfo extends FollowerInfo {
+ TestFollowerInfo(RaftGroupMemberId id, RaftPeer peer, Timestamp
+ lastRpcTime, long nextIndex, boolean attendVote, int
+ rpcSlownessTimeoutMs) {
+ super(id, peer, lastRpcTime, nextIndex, attendVote,
rpcSlownessTimeoutMs);
+ }
+ }
+}
+