This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 59dfd3549 RATIS-1998. Add watch request metrics (#1009)
59dfd3549 is described below
commit 59dfd35491df2346de80a685365b14da52f6bc1f
Author: Ivan Andika <[email protected]>
AuthorDate: Sun Jan 14 07:08:55 2024 +0800
RATIS-1998. Add watch request metrics (#1009)
---
ratis-docs/src/site/markdown/metrics.md | 53 ++++++++++----------
.../org/apache/ratis/metrics/RatisMetrics.java | 7 +++
.../apache/ratis/server/impl/LeaderStateImpl.java | 3 +-
.../apache/ratis/server/impl/WatchRequests.java | 24 +++++++--
.../server/metrics/RaftServerMetricsImpl.java | 34 +++++++++++++
.../java/org/apache/ratis/WatchRequestTests.java | 57 ++++++++++++++++++++++
6 files changed, 149 insertions(+), 29 deletions(-)
diff --git a/ratis-docs/src/site/markdown/metrics.md
b/ratis-docs/src/site/markdown/metrics.md
index 10c78ccbb..23486792e 100644
--- a/ratis-docs/src/site/markdown/metrics.md
+++ b/ratis-docs/src/site/markdown/metrics.md
@@ -80,31 +80,34 @@
### Raft Server Metrics
-| Application | Component | Name | Type |
Description |
-|-------------|-----------|----------------------------------|---------|---------------------------------------------------------------------|
-| ratis | server | {peer}_lastHeartbeatElapsedTime | Gauge | Time
elapsed since last heartbeat rpc response |
-| ratis | server | follower_append_entry_latency | Timer | Time
taken for followers to append log entries |
-| ratis | server | {peer}_peerCommitIndex | Gauge |
Commit index of peer |
-| ratis | server | clientReadRequest | Timer | Time
taken to process read requests from client |
-| ratis | server | clientStaleReadRequest | Timer | Time
taken to process stale-read requests from client |
-| ratis | server | clientWriteRequest | Timer | Time
taken to process write requests from client |
-| ratis | server | clientWatch{level}Request | Timer | Time
taken to process watch(replication_level) requests from client |
-| ratis | server | numRequestQueueLimitHits | Counter |
Number of (total client requests in queue) limit hits |
-| ratis | server | numRequestsByteSizeLimitHits | Counter |
Number of (total size of client requests in queue) limit hits |
-| ratis | server | numResourceLimitHits | Counter | Sum
of numRequestQueueLimitHits and numRequestsByteSizeLimitHits |
-| ratis | server | numPendingRequestInQueue | Gauge |
Number of pending client requests in queue |
-| ratis | server | numPendingRequestMegaByteSize | Gauge | Total
size of pending client requests in queue |
-| ratis | server | retryCacheEntryCount | Gauge |
Number of entries in retry cache |
-| ratis | server | retryCacheHitCount | Gauge |
Number of retry cache hits |
-| ratis | server | retryCacheHitRate | Gauge | Retry
cache hit rate |
-| ratis | server | retryCacheMissCount | Gauge |
Number of retry cache misses |
-| ratis | server | retryCacheMissRate | Gauge | Retry
cache miss rate |
-| ratis | server | numFailedClientStaleReadOnServer | Counter |
Number of failed stale-read requests |
-| ratis | server | numFailedClientReadOnServer | Counter |
Number of failed read requests |
-| ratis | server | numFailedClientWriteOnServer | Counter |
Number of failed write requests |
-| ratis | server | numFailedClientWatchOnServer | Counter |
Number of failed watch requests |
-| ratis | server | numFailedClientStreamOnServer | Counter |
Number of failed stream requests |
-| ratis | server | numInstallSnapshot | Counter |
Number of install-snapshot requests |
+| Application | Component | Name | Type |
Description |
+|-------------|-----------|--------------------------------------|---------|---------------------------------------------------------------------|
+| ratis | server | {peer}_lastHeartbeatElapsedTime | Gauge |
Time elapsed since last heartbeat rpc response |
+| ratis | server | follower_append_entry_latency | Timer |
Time taken for followers to append log entries |
+| ratis | server | {peer}_peerCommitIndex | Gauge |
Commit index of peer |
+| ratis | server | clientReadRequest | Timer |
Time taken to process read requests from client |
+| ratis | server | clientStaleReadRequest | Timer |
Time taken to process stale-read requests from client |
+| ratis | server | clientWriteRequest | Timer |
Time taken to process write requests from client |
+| ratis | server | clientWatch{level}Request | Timer |
Time taken to process watch(replication_level) requests from client |
+| ratis | server | numRequestQueueLimitHits | Counter |
Number of (total client requests in queue) limit hits |
+| ratis | server | numRequestsByteSizeLimitHits | Counter |
Number of (total size of client requests in queue) limit hits |
+| ratis | server | numResourceLimitHits | Counter |
Sum of numRequestQueueLimitHits and numRequestsByteSizeLimitHits |
+| ratis | server | numPendingRequestInQueue | Gauge |
Number of pending client requests in queue |
+| ratis | server | numPendingRequestMegaByteSize | Gauge |
Total size of pending client requests in queue |
+| ratis | server | retryCacheEntryCount | Gauge |
Number of entries in retry cache |
+| ratis | server | retryCacheHitCount | Gauge |
Number of retry cache hits |
+| ratis | server | retryCacheHitRate | Gauge |
Retry cache hit rate |
+| ratis | server | retryCacheMissCount | Gauge |
Number of retry cache misses |
+| ratis | server | retryCacheMissRate | Gauge |
Retry cache miss rate |
+| ratis | server | numFailedClientStaleReadOnServer | Counter |
Number of failed stale-read requests |
+| ratis | server | numFailedClientReadOnServer | Counter |
Number of failed read requests |
+| ratis | server | numFailedClientWriteOnServer | Counter |
Number of failed write requests |
+| ratis | server | numFailedClientWatchOnServer | Counter |
Number of failed watch requests |
+| ratis | server | numFailedClientStreamOnServer | Counter |
Number of failed stream requests |
+| ratis | server | numInstallSnapshot | Counter |
Number of install-snapshot requests |
+| ratis | server | numWatch{level}RequestTimeout | Counter |
Number of watch(replication_level) request timeout |
+| ratis | server | numWatch{level}RequestInQueue | Gauge |
Number of watch(replication_level) requests in queue |
+| ratis | server | numWatch{level}RequestQueueLimitHits | Counter |
Number of (total watch request in queue) limit hits |
## Ratis Netty Metrics
diff --git
a/ratis-metrics-api/src/main/java/org/apache/ratis/metrics/RatisMetrics.java
b/ratis-metrics-api/src/main/java/org/apache/ratis/metrics/RatisMetrics.java
index 32bcf52e3..eafc3849d 100644
--- a/ratis-metrics-api/src/main/java/org/apache/ratis/metrics/RatisMetrics.java
+++ b/ratis-metrics-api/src/main/java/org/apache/ratis/metrics/RatisMetrics.java
@@ -49,6 +49,13 @@ public class RatisMetrics {
return Collections.unmodifiableMap(maps);
}
+ protected static <T extends Enum<T>> Map<T, LongCounter> newCounterMap(
+ Class<T> clazz, Function<T, LongCounter> constructor) {
+ final EnumMap<T, LongCounter> map = new EnumMap<>(clazz);
+ Arrays.stream(clazz.getEnumConstants()).forEach(t -> map.put(t,
constructor.apply(t)));
+ return Collections.unmodifiableMap(map);
+ }
+
protected static <T extends Enum<T>> Map<T, Timekeeper> newTimerMap(
Class<T> clazz, Function<T, Timekeeper> constructor) {
final EnumMap<T, Timekeeper> map = new EnumMap<>(clazz);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 8864c220c..4175424e8 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -379,7 +379,7 @@ class LeaderStateImpl implements LeaderState {
raftServerMetrics = server.getRaftServerMetrics();
logAppenderMetrics = new LogAppenderMetrics(server.getMemberId());
this.pendingRequests = new PendingRequests(server.getMemberId(),
properties, raftServerMetrics);
- this.watchRequests = new WatchRequests(server.getMemberId(), properties);
+ this.watchRequests = new WatchRequests(server.getMemberId(), properties,
raftServerMetrics);
this.messageStreamRequests = new
MessageStreamRequests(server.getMemberId());
this.pendingStepDown = new PendingStepDown(this);
this.readIndexHeartbeats = new ReadIndexHeartbeats();
@@ -457,6 +457,7 @@ class LeaderStateImpl implements LeaderState {
logAppenderMetrics.unregister();
raftServerMetrics.unregister();
pendingRequests.close();
+ watchRequests.close();
return f;
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
index f4c6200b9..6988bfbae 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
@@ -24,6 +24,7 @@ import
org.apache.ratis.protocol.exceptions.NotReplicatedException;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
import org.apache.ratis.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,11 +76,15 @@ class WatchRequests {
private final SortedMap<PendingWatch, PendingWatch> q = new TreeMap<>(
Comparator.comparingLong(PendingWatch::getIndex).thenComparing(PendingWatch::getCreationTime));
private final ResourceSemaphore resource;
+ private final RaftServerMetricsImpl raftServerMetrics;
private volatile long index; //Invariant: q.isEmpty() or index < any
element q
- WatchQueue(ReplicationLevel replication, int elementLimit) {
+ WatchQueue(ReplicationLevel replication, int elementLimit,
RaftServerMetricsImpl raftServerMetrics) {
this.replication = replication;
this.resource = new ResourceSemaphore(elementLimit);
+ this.raftServerMetrics = raftServerMetrics;
+
+ raftServerMetrics.addNumPendingWatchRequestsGauge(resource::used,
replication);
}
long getIndex() {
@@ -103,6 +108,7 @@ class WatchRequests {
if (computed == null) {
// failed to acquire
+ raftServerMetrics.onWatchRequestQueueLimitHit(replication);
return JavaUtils.completeExceptionally(new
ResourceUnavailableException(
"Failed to acquire a pending watch request in " + name + " for " +
request));
}
@@ -123,6 +129,7 @@ class WatchRequests {
pending.getFuture().completeExceptionally(
new NotReplicatedException(request.getCallId(), replication,
pending.getIndex()));
LOG.debug("{}: timeout {}, {}", name, pending, request);
+ raftServerMetrics.onWatchRequestTimeout(replication);
}
}
@@ -162,6 +169,12 @@ class WatchRequests {
q.clear();
resource.close();
}
+
+ void close() {
+ if (raftServerMetrics != null) {
+ raftServerMetrics.removeNumPendingWatchRequestsGauge(replication);
+ }
+ }
}
private final String name;
@@ -171,7 +184,7 @@ class WatchRequests {
private final TimeDuration watchTimeoutDenominationNanos;
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
- WatchRequests(Object name, RaftProperties properties) {
+ WatchRequests(Object name, RaftProperties properties, RaftServerMetricsImpl
raftServerMetrics) {
this.name = name + "-" + JavaUtils.getClassSimpleName(getClass());
final TimeDuration watchTimeout =
RaftServerConfigKeys.Watch.timeout(properties);
@@ -183,7 +196,8 @@ class WatchRequests {
+ watchTimeoutDenomination + ").");
final int elementLimit =
RaftServerConfigKeys.Watch.elementLimit(properties);
- Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r, new
WatchQueue(r, elementLimit)));
+ Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r,
+ new WatchQueue(r, elementLimit, raftServerMetrics)));
}
CompletableFuture<Long> add(RaftClientRequest request) {
@@ -207,4 +221,8 @@ class WatchRequests {
void failWatches(Exception e) {
queues.values().forEach(q -> q.failAll(e));
}
+
+ void close() {
+ queues.values().forEach(WatchQueue::close);
+ }
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
index cdbce6ee2..70711c3e3 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
@@ -57,9 +57,14 @@ public final class RaftServerMetricsImpl extends
RatisMetrics implements RaftSer
public static final String REQUEST_QUEUE_LIMIT_HIT_COUNTER =
"numRequestQueueLimitHits";
public static final String REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER =
"numRequestsByteSizeLimitHits";
public static final String RESOURCE_LIMIT_HIT_COUNTER =
"numResourceLimitHits";
+ public static final String WATCH_REQUEST_QUEUE_LIMIT_HIT_COUNTER =
"numWatch%sRequestQueueLimitHits";
public static final String REQUEST_QUEUE_SIZE = "numPendingRequestInQueue";
public static final String REQUEST_MEGA_BYTE_SIZE =
"numPendingRequestMegaByteSize";
+
+ public static final String WATCH_REQUEST_QUEUE_SIZE =
"numWatch%sRequestInQueue";
+ public static final String WATCH_REQUEST_TIMEOUT_COUNTER =
"numWatch%sRequestTimeout";
+
public static final String RETRY_CACHE_ENTRY_COUNT_METRIC =
"retryCacheEntryCount";
public static final String RETRY_CACHE_HIT_COUNT_METRIC =
"retryCacheHitCount";
public static final String RETRY_CACHE_HIT_RATE_METRIC = "retryCacheHitRate";
@@ -76,6 +81,11 @@ public final class RaftServerMetricsImpl extends
RatisMetrics implements RaftSer
private final LongCounter numRequestQueueLimitHits =
getRegistry().counter(REQUEST_QUEUE_LIMIT_HIT_COUNTER);
private final LongCounter numRequestsByteSizeLimitHits =
getRegistry().counter(REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER);
private final LongCounter numResourceLimitHits =
getRegistry().counter(RESOURCE_LIMIT_HIT_COUNTER);
+ private final Map<ReplicationLevel, LongCounter>
numWatchRequestQueueLimitHits = newCounterMap(ReplicationLevel.class,
+ replication -> getRegistry().counter(
+ String.format(WATCH_REQUEST_QUEUE_LIMIT_HIT_COUNTER,
Type.toString(replication))));
+ private final Map<ReplicationLevel, LongCounter> numWatchRequestsTimeout =
newCounterMap(ReplicationLevel.class,
+ replication ->
getRegistry().counter(String.format(WATCH_REQUEST_TIMEOUT_COUNTER,
Type.toString(replication))));
private final LongCounter numFailedClientStaleRead
= getRegistry().counter(RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT);
@@ -150,6 +160,14 @@ public final class RaftServerMetricsImpl extends
RatisMetrics implements RaftSer
return numInstallSnapshot;
}
+ public LongCounter getNumWatchRequestQueueLimitHits(ReplicationLevel
replication) {
+ return numWatchRequestQueueLimitHits.get(replication);
+ }
+
+ public LongCounter getNumWatchRequestsTimeout(ReplicationLevel replication) {
+ return numWatchRequestsTimeout.get(replication);
+ }
+
private static RatisMetricRegistry createRegistry(String serverId) {
return create(new MetricRegistryInfo(serverId,
RATIS_APPLICATION_NAME_METRICS, RATIS_SERVER_METRICS,
@@ -237,6 +255,22 @@ public final class RaftServerMetricsImpl extends
RatisMetrics implements RaftSer
return getRegistry().remove(REQUEST_MEGA_BYTE_SIZE);
}
+ public void onWatchRequestQueueLimitHit(ReplicationLevel replicationLevel) {
+ numWatchRequestQueueLimitHits.get(replicationLevel).inc();
+ }
+
+ public void onWatchRequestTimeout(ReplicationLevel replicationLevel) {
+ numWatchRequestsTimeout.get(replicationLevel).inc();
+ }
+
+ public void addNumPendingWatchRequestsGauge(Supplier<Integer> queueSize,
ReplicationLevel replication) {
+ getRegistry().gauge(String.format(WATCH_REQUEST_QUEUE_SIZE,
Type.toString(replication)), () -> queueSize);
+ }
+
+ public boolean removeNumPendingWatchRequestsGauge(ReplicationLevel
replication) {
+ return getRegistry().remove(String.format(WATCH_REQUEST_QUEUE_SIZE,
Type.toString(replication)));
+ }
+
public void onRequestByteSizeLimitHit() {
numRequestsByteSizeLimitHits.inc();
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index 1f19f9d1c..a9bdd1a3a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -31,8 +31,10 @@ import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.RaftServerConfigKeys.Watch;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.Slf4jUtils;
@@ -47,12 +49,14 @@ import org.slf4j.Logger;
import org.slf4j.event.Level;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.fail;
@@ -469,6 +473,59 @@ public abstract class WatchRequestTests<CLUSTER extends
MiniRaftCluster>
}
}
+ @Test
+ public void testWatchMetrics() throws Exception {
+ final RaftProperties p = getProperties();
+ RaftServerConfigKeys.Watch.setElementLimit(p, 10);
+ RaftServerConfigKeys.Watch.setTimeout(p, TimeDuration.valueOf(2,
TimeUnit.SECONDS));
+ try {
+ runWithNewCluster(NUM_SERVERS,
+ cluster -> runSingleTest(WatchRequestTests::runTestWatchMetrics,
cluster, LOG));
+ } finally {
+ RaftServerConfigKeys.Watch.setElementLimit(p,
Watch.ELEMENT_LIMIT_DEFAULT);
+ RaftServerConfigKeys.Watch.setTimeout(p,
RaftServerConfigKeys.Watch.TIMEOUT_DEFAULT);
+ }
+ }
+
+ static RaftServerMetricsImpl getRaftServerMetrics(RaftServer.Division
division) {
+ return (RaftServerMetricsImpl) division.getRaftServerMetrics();
+ }
+
+ static void runTestWatchMetrics(TestParameters p) throws Exception {
+ final MiniRaftCluster cluster = p.cluster;
+
+ List<RaftClient> clients = new ArrayList<>();
+
+ final ReplicationLevel replicationLevel = ReplicationLevel.MAJORITY;
+ try {
+ long initialWatchRequestTimeoutCount =
getRaftServerMetrics(cluster.getLeader())
+ .getNumWatchRequestsTimeout(replicationLevel).getCount();
+ long initialLimitHit = getRaftServerMetrics(cluster.getLeader())
+ .getNumWatchRequestQueueLimitHits(replicationLevel).getCount();
+
+ int uncommittedBaseIndex = 10000;
+ // Logs with indices 10001 - 10011 will never be committed, so it should
fail with NotReplicatedException
+ for (int i = 1; i <= 11; i++) {
+ RaftClient client = cluster.createClient(cluster.getLeader().getId(),
RetryPolicies.noRetry());
+ clients.add(client);
+ client.async().watch(uncommittedBaseIndex + i, replicationLevel);
+ }
+
+ // All the watch timeout for each unique index should increment the
metric
+ RaftTestUtil.waitFor(() -> getRaftServerMetrics(cluster.getLeader())
+ .getNumWatchRequestsTimeout(replicationLevel).getCount() ==
initialWatchRequestTimeoutCount + 10,
+ 300, 5000);
+ // There are 11 pending watch request, but the pending watch request
limit is 10
+ RaftTestUtil.waitFor(() -> getRaftServerMetrics(cluster.getLeader())
+ .getNumWatchRequestQueueLimitHits(replicationLevel).getCount() ==
+ initialLimitHit + 1, 300, 5000);
+ } finally {
+ for(RaftClient client : clients) {
+ client.close();
+ }
+ }
+ }
+
static void checkTimeout(List<CompletableFuture<RaftClientReply>> replies,
List<CompletableFuture<WatchReplies>> watches, Logger LOG) throws
Exception {
for(int i = 0; i < replies.size(); i++) {