This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 59dfd3549 RATIS-1998. Add watch request metrics (#1009)
59dfd3549 is described below

commit 59dfd35491df2346de80a685365b14da52f6bc1f
Author: Ivan Andika <[email protected]>
AuthorDate: Sun Jan 14 07:08:55 2024 +0800

    RATIS-1998. Add watch request metrics (#1009)
---
 ratis-docs/src/site/markdown/metrics.md            | 53 ++++++++++----------
 .../org/apache/ratis/metrics/RatisMetrics.java     |  7 +++
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  3 +-
 .../apache/ratis/server/impl/WatchRequests.java    | 24 +++++++--
 .../server/metrics/RaftServerMetricsImpl.java      | 34 +++++++++++++
 .../java/org/apache/ratis/WatchRequestTests.java   | 57 ++++++++++++++++++++++
 6 files changed, 149 insertions(+), 29 deletions(-)

diff --git a/ratis-docs/src/site/markdown/metrics.md 
b/ratis-docs/src/site/markdown/metrics.md
index 10c78ccbb..23486792e 100644
--- a/ratis-docs/src/site/markdown/metrics.md
+++ b/ratis-docs/src/site/markdown/metrics.md
@@ -80,31 +80,34 @@
 
 ### Raft Server Metrics
 
-| Application | Component | Name                             | Type    | 
Description                                                         |
-|-------------|-----------|----------------------------------|---------|---------------------------------------------------------------------|
-| ratis       | server    | {peer}_lastHeartbeatElapsedTime  | Gauge   | Time 
elapsed since last heartbeat rpc response                      |
-| ratis       | server    | follower_append_entry_latency    | Timer   | Time 
taken for followers to append log entries                      |
-| ratis       | server    | {peer}_peerCommitIndex           | Gauge   | 
Commit index of peer                                                |
-| ratis       | server    | clientReadRequest                | Timer   | Time 
taken to process read requests from client                     |
-| ratis       | server    | clientStaleReadRequest           | Timer   | Time 
taken to process stale-read requests from client               |
-| ratis       | server    | clientWriteRequest               | Timer   | Time 
taken to process write requests from client                    |
-| ratis       | server    | clientWatch{level}Request        | Timer   | Time 
taken to process watch(replication_level) requests from client |
-| ratis       | server    | numRequestQueueLimitHits         | Counter | 
Number of (total client requests in queue) limit hits               |
-| ratis       | server    | numRequestsByteSizeLimitHits     | Counter | 
Number of (total size of client requests in queue) limit hits       |
-| ratis       | server    | numResourceLimitHits             | Counter | Sum 
of numRequestQueueLimitHits and numRequestsByteSizeLimitHits    |
-| ratis       | server    | numPendingRequestInQueue         | Gauge   | 
Number of pending client requests in queue                          |
-| ratis       | server    | numPendingRequestMegaByteSize    | Gauge   | Total 
size of pending client requests in queue                      |
-| ratis       | server    | retryCacheEntryCount             | Gauge   | 
Number of entries in retry cache                                    |
-| ratis       | server    | retryCacheHitCount               | Gauge   | 
Number of retry cache hits                                          |
-| ratis       | server    | retryCacheHitRate                | Gauge   | Retry 
cache hit rate                                                |
-| ratis       | server    | retryCacheMissCount              | Gauge   | 
Number of retry cache misses                                        |
-| ratis       | server    | retryCacheMissRate               | Gauge   | Retry 
cache miss rate                                               |
-| ratis       | server    | numFailedClientStaleReadOnServer | Counter | 
Number of failed stale-read requests                                |
-| ratis       | server    | numFailedClientReadOnServer      | Counter | 
Number of failed read requests                                      |
-| ratis       | server    | numFailedClientWriteOnServer     | Counter | 
Number of failed write requests                                     |
-| ratis       | server    | numFailedClientWatchOnServer     | Counter | 
Number of failed watch requests                                     |
-| ratis       | server    | numFailedClientStreamOnServer    | Counter | 
Number of failed stream requests                                    |
-| ratis       | server    | numInstallSnapshot               | Counter | 
Number of install-snapshot requests                                 |
+| Application | Component | Name                                 | Type    | 
Description                                                         |
+|-------------|-----------|--------------------------------------|---------|---------------------------------------------------------------------|
+| ratis       | server    | {peer}_lastHeartbeatElapsedTime      | Gauge   | 
Time elapsed since last heartbeat rpc response                      |
+| ratis       | server    | follower_append_entry_latency        | Timer   | 
Time taken for followers to append log entries                      |
+| ratis       | server    | {peer}_peerCommitIndex               | Gauge   | 
Commit index of peer                                                |
+| ratis       | server    | clientReadRequest                    | Timer   | 
Time taken to process read requests from client                     |
+| ratis       | server    | clientStaleReadRequest               | Timer   | 
Time taken to process stale-read requests from client               |
+| ratis       | server    | clientWriteRequest                   | Timer   | 
Time taken to process write requests from client                    |
+| ratis       | server    | clientWatch{level}Request            | Timer   | 
Time taken to process watch(replication_level) requests from client |
+| ratis       | server    | numRequestQueueLimitHits             | Counter | 
Number of (total client requests in queue) limit hits               |
+| ratis       | server    | numRequestsByteSizeLimitHits         | Counter | 
Number of (total size of client requests in queue) limit hits       |
+| ratis       | server    | numResourceLimitHits                 | Counter | 
Sum of numRequestQueueLimitHits and numRequestsByteSizeLimitHits    |
+| ratis       | server    | numPendingRequestInQueue             | Gauge   | 
Number of pending client requests in queue                          |
+| ratis       | server    | numPendingRequestMegaByteSize        | Gauge   | 
Total size of pending client requests in queue                      |
+| ratis       | server    | retryCacheEntryCount                 | Gauge   | 
Number of entries in retry cache                                    |
+| ratis       | server    | retryCacheHitCount                   | Gauge   | 
Number of retry cache hits                                          |
+| ratis       | server    | retryCacheHitRate                    | Gauge   | 
Retry cache hit rate                                                |
+| ratis       | server    | retryCacheMissCount                  | Gauge   | 
Number of retry cache misses                                        |
+| ratis       | server    | retryCacheMissRate                   | Gauge   | 
Retry cache miss rate                                               |
+| ratis       | server    | numFailedClientStaleReadOnServer     | Counter | 
Number of failed stale-read requests                                |
+| ratis       | server    | numFailedClientReadOnServer          | Counter | 
Number of failed read requests                                      |
+| ratis       | server    | numFailedClientWriteOnServer         | Counter | 
Number of failed write requests                                     |
+| ratis       | server    | numFailedClientWatchOnServer         | Counter | 
Number of failed watch requests                                     |
+| ratis       | server    | numFailedClientStreamOnServer        | Counter | 
Number of failed stream requests                                    |
+| ratis       | server    | numInstallSnapshot                   | Counter | 
Number of install-snapshot requests                                 |
+| ratis       | server    | numWatch{level}RequestTimeout        | Counter | 
Number of watch(replication_level) request timeout                  |
+| ratis       | server    | numWatch{level}RequestInQueue        | Gauge   | 
Number of watch(replication_level) requests in queue                |
+| ratis       | server    | numWatch{level}RequestQueueLimitHits | Counter | 
Number of (total watch request in queue) limit hits                 |
 
 
 ## Ratis Netty Metrics
diff --git 
a/ratis-metrics-api/src/main/java/org/apache/ratis/metrics/RatisMetrics.java 
b/ratis-metrics-api/src/main/java/org/apache/ratis/metrics/RatisMetrics.java
index 32bcf52e3..eafc3849d 100644
--- a/ratis-metrics-api/src/main/java/org/apache/ratis/metrics/RatisMetrics.java
+++ b/ratis-metrics-api/src/main/java/org/apache/ratis/metrics/RatisMetrics.java
@@ -49,6 +49,13 @@ public class RatisMetrics {
     return Collections.unmodifiableMap(maps);
   }
 
+  protected static <T extends Enum<T>> Map<T, LongCounter> newCounterMap(
+      Class<T> clazz, Function<T, LongCounter> constructor) {
+    final EnumMap<T, LongCounter> map = new EnumMap<>(clazz);
+    Arrays.stream(clazz.getEnumConstants()).forEach(t -> map.put(t, 
constructor.apply(t)));
+    return Collections.unmodifiableMap(map);
+  }
+
   protected static <T extends Enum<T>> Map<T, Timekeeper> newTimerMap(
       Class<T> clazz, Function<T, Timekeeper> constructor) {
     final EnumMap<T, Timekeeper> map = new EnumMap<>(clazz);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 8864c220c..4175424e8 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -379,7 +379,7 @@ class LeaderStateImpl implements LeaderState {
     raftServerMetrics = server.getRaftServerMetrics();
     logAppenderMetrics = new LogAppenderMetrics(server.getMemberId());
     this.pendingRequests = new PendingRequests(server.getMemberId(), 
properties, raftServerMetrics);
-    this.watchRequests = new WatchRequests(server.getMemberId(), properties);
+    this.watchRequests = new WatchRequests(server.getMemberId(), properties, 
raftServerMetrics);
     this.messageStreamRequests = new 
MessageStreamRequests(server.getMemberId());
     this.pendingStepDown = new PendingStepDown(this);
     this.readIndexHeartbeats = new ReadIndexHeartbeats();
@@ -457,6 +457,7 @@ class LeaderStateImpl implements LeaderState {
     logAppenderMetrics.unregister();
     raftServerMetrics.unregister();
     pendingRequests.close();
+    watchRequests.close();
     return f;
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
index f4c6200b9..6988bfbae 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
@@ -24,6 +24,7 @@ import 
org.apache.ratis.protocol.exceptions.NotReplicatedException;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
 import org.apache.ratis.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,11 +76,15 @@ class WatchRequests {
     private final SortedMap<PendingWatch, PendingWatch> q = new TreeMap<>(
         
Comparator.comparingLong(PendingWatch::getIndex).thenComparing(PendingWatch::getCreationTime));
     private final ResourceSemaphore resource;
+    private final RaftServerMetricsImpl raftServerMetrics;
     private volatile long index; //Invariant: q.isEmpty() or index < any 
element q
 
-    WatchQueue(ReplicationLevel replication, int elementLimit) {
+    WatchQueue(ReplicationLevel replication, int elementLimit, 
RaftServerMetricsImpl raftServerMetrics) {
       this.replication = replication;
       this.resource = new ResourceSemaphore(elementLimit);
+      this.raftServerMetrics = raftServerMetrics;
+
+      raftServerMetrics.addNumPendingWatchRequestsGauge(resource::used, 
replication);
     }
 
     long getIndex() {
@@ -103,6 +108,7 @@ class WatchRequests {
 
       if (computed == null) {
         // failed to acquire
+        raftServerMetrics.onWatchRequestQueueLimitHit(replication);
         return JavaUtils.completeExceptionally(new 
ResourceUnavailableException(
             "Failed to acquire a pending watch request in " + name + " for " + 
request));
       }
@@ -123,6 +129,7 @@ class WatchRequests {
         pending.getFuture().completeExceptionally(
             new NotReplicatedException(request.getCallId(), replication, 
pending.getIndex()));
         LOG.debug("{}: timeout {}, {}", name, pending, request);
+        raftServerMetrics.onWatchRequestTimeout(replication);
       }
     }
 
@@ -162,6 +169,12 @@ class WatchRequests {
       q.clear();
       resource.close();
     }
+
+    void close() {
+      if (raftServerMetrics != null) {
+        raftServerMetrics.removeNumPendingWatchRequestsGauge(replication);
+      }
+    }
   }
 
   private final String name;
@@ -171,7 +184,7 @@ class WatchRequests {
   private final TimeDuration watchTimeoutDenominationNanos;
   private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
 
-  WatchRequests(Object name, RaftProperties properties) {
+  WatchRequests(Object name, RaftProperties properties, RaftServerMetricsImpl 
raftServerMetrics) {
     this.name = name + "-" + JavaUtils.getClassSimpleName(getClass());
 
     final TimeDuration watchTimeout = 
RaftServerConfigKeys.Watch.timeout(properties);
@@ -183,7 +196,8 @@ class WatchRequests {
             + watchTimeoutDenomination + ").");
 
     final int elementLimit = 
RaftServerConfigKeys.Watch.elementLimit(properties);
-    Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r, new 
WatchQueue(r, elementLimit)));
+    Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r,
+        new WatchQueue(r, elementLimit, raftServerMetrics)));
   }
 
   CompletableFuture<Long> add(RaftClientRequest request) {
@@ -207,4 +221,8 @@ class WatchRequests {
   void failWatches(Exception e) {
     queues.values().forEach(q -> q.failAll(e));
   }
+
+  void close() {
+    queues.values().forEach(WatchQueue::close);
+  }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
index cdbce6ee2..70711c3e3 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
@@ -57,9 +57,14 @@ public final class RaftServerMetricsImpl extends 
RatisMetrics implements RaftSer
   public static final String REQUEST_QUEUE_LIMIT_HIT_COUNTER = 
"numRequestQueueLimitHits";
   public static final String REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER = 
"numRequestsByteSizeLimitHits";
   public static final String RESOURCE_LIMIT_HIT_COUNTER = 
"numResourceLimitHits";
+  public static final String WATCH_REQUEST_QUEUE_LIMIT_HIT_COUNTER = 
"numWatch%sRequestQueueLimitHits";
 
   public static final String REQUEST_QUEUE_SIZE = "numPendingRequestInQueue";
   public static final String REQUEST_MEGA_BYTE_SIZE = 
"numPendingRequestMegaByteSize";
+
+  public static final String WATCH_REQUEST_QUEUE_SIZE = 
"numWatch%sRequestInQueue";
+  public static final String WATCH_REQUEST_TIMEOUT_COUNTER = 
"numWatch%sRequestTimeout";
+
   public static final String RETRY_CACHE_ENTRY_COUNT_METRIC = 
"retryCacheEntryCount";
   public static final String RETRY_CACHE_HIT_COUNT_METRIC = 
"retryCacheHitCount";
   public static final String RETRY_CACHE_HIT_RATE_METRIC = "retryCacheHitRate";
@@ -76,6 +81,11 @@ public final class RaftServerMetricsImpl extends 
RatisMetrics implements RaftSer
   private final LongCounter numRequestQueueLimitHits = 
getRegistry().counter(REQUEST_QUEUE_LIMIT_HIT_COUNTER);
   private final LongCounter numRequestsByteSizeLimitHits = 
getRegistry().counter(REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER);
   private final LongCounter numResourceLimitHits = 
getRegistry().counter(RESOURCE_LIMIT_HIT_COUNTER);
+  private final Map<ReplicationLevel, LongCounter> 
numWatchRequestQueueLimitHits = newCounterMap(ReplicationLevel.class,
+      replication -> getRegistry().counter(
+          String.format(WATCH_REQUEST_QUEUE_LIMIT_HIT_COUNTER, 
Type.toString(replication))));
+  private final Map<ReplicationLevel, LongCounter> numWatchRequestsTimeout = 
newCounterMap(ReplicationLevel.class,
+      replication -> 
getRegistry().counter(String.format(WATCH_REQUEST_TIMEOUT_COUNTER, 
Type.toString(replication))));
 
   private final LongCounter numFailedClientStaleRead
       = getRegistry().counter(RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT);
@@ -150,6 +160,14 @@ public final class RaftServerMetricsImpl extends 
RatisMetrics implements RaftSer
     return numInstallSnapshot;
   }
 
+  public LongCounter getNumWatchRequestQueueLimitHits(ReplicationLevel 
replication) {
+    return numWatchRequestQueueLimitHits.get(replication);
+  }
+
+  public LongCounter getNumWatchRequestsTimeout(ReplicationLevel replication) {
+    return numWatchRequestsTimeout.get(replication);
+  }
+
   private static RatisMetricRegistry createRegistry(String serverId) {
     return create(new MetricRegistryInfo(serverId,
         RATIS_APPLICATION_NAME_METRICS, RATIS_SERVER_METRICS,
@@ -237,6 +255,22 @@ public final class RaftServerMetricsImpl extends 
RatisMetrics implements RaftSer
     return getRegistry().remove(REQUEST_MEGA_BYTE_SIZE);
   }
 
+  public void onWatchRequestQueueLimitHit(ReplicationLevel replicationLevel) {
+    numWatchRequestQueueLimitHits.get(replicationLevel).inc();
+  }
+
+  public void onWatchRequestTimeout(ReplicationLevel replicationLevel) {
+    numWatchRequestsTimeout.get(replicationLevel).inc();
+  }
+
+  public void addNumPendingWatchRequestsGauge(Supplier<Integer> queueSize, 
ReplicationLevel replication) {
+    getRegistry().gauge(String.format(WATCH_REQUEST_QUEUE_SIZE, 
Type.toString(replication)), () -> queueSize);
+  }
+
+  public boolean removeNumPendingWatchRequestsGauge(ReplicationLevel 
replication) {
+    return getRegistry().remove(String.format(WATCH_REQUEST_QUEUE_SIZE, 
Type.toString(replication)));
+  }
+
   public void onRequestByteSizeLimitHit() {
     numRequestsByteSizeLimitHits.inc();
   }
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java 
b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index 1f19f9d1c..a9bdd1a3a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -31,8 +31,10 @@ import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.RaftServerConfigKeys.Watch;
 import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
 import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.Slf4jUtils;
@@ -47,12 +49,14 @@ import org.slf4j.Logger;
 import org.slf4j.event.Level;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.junit.Assert.fail;
@@ -469,6 +473,59 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
     }
   }
 
+  @Test
+  public void testWatchMetrics() throws Exception {
+    final RaftProperties p = getProperties();
+    RaftServerConfigKeys.Watch.setElementLimit(p, 10);
+    RaftServerConfigKeys.Watch.setTimeout(p, TimeDuration.valueOf(2, 
TimeUnit.SECONDS));
+    try {
+      runWithNewCluster(NUM_SERVERS,
+          cluster -> runSingleTest(WatchRequestTests::runTestWatchMetrics, 
cluster, LOG));
+    } finally {
+      RaftServerConfigKeys.Watch.setElementLimit(p, 
Watch.ELEMENT_LIMIT_DEFAULT);
+      RaftServerConfigKeys.Watch.setTimeout(p, 
RaftServerConfigKeys.Watch.TIMEOUT_DEFAULT);
+    }
+  }
+
+  static RaftServerMetricsImpl getRaftServerMetrics(RaftServer.Division 
division) {
+    return (RaftServerMetricsImpl) division.getRaftServerMetrics();
+  }
+
+  static void runTestWatchMetrics(TestParameters p) throws Exception {
+    final MiniRaftCluster cluster = p.cluster;
+
+    List<RaftClient> clients = new ArrayList<>();
+
+    final ReplicationLevel replicationLevel = ReplicationLevel.MAJORITY;
+    try {
+      long initialWatchRequestTimeoutCount = 
getRaftServerMetrics(cluster.getLeader())
+          .getNumWatchRequestsTimeout(replicationLevel).getCount();
+      long initialLimitHit = getRaftServerMetrics(cluster.getLeader())
+          .getNumWatchRequestQueueLimitHits(replicationLevel).getCount();
+
+      int uncommittedBaseIndex = 10000;
+      // Logs with indices 10001 - 10011 will never be committed, so it should 
fail with NotReplicatedException
+      for (int i = 1; i <= 11; i++) {
+        RaftClient client = cluster.createClient(cluster.getLeader().getId(), 
RetryPolicies.noRetry());
+        clients.add(client);
+        client.async().watch(uncommittedBaseIndex + i, replicationLevel);
+      }
+
+      // All the watch timeout for each unique index should increment the 
metric
+      RaftTestUtil.waitFor(() -> getRaftServerMetrics(cluster.getLeader())
+              .getNumWatchRequestsTimeout(replicationLevel).getCount() == 
initialWatchRequestTimeoutCount + 10,
+          300, 5000);
+      // There are 11 pending watch request, but the pending watch request 
limit is 10
+      RaftTestUtil.waitFor(() -> getRaftServerMetrics(cluster.getLeader())
+          .getNumWatchRequestQueueLimitHits(replicationLevel).getCount() ==
+          initialLimitHit + 1, 300, 5000);
+    } finally {
+      for(RaftClient client : clients) {
+        client.close();
+      }
+    }
+  }
+
   static void checkTimeout(List<CompletableFuture<RaftClientReply>> replies,
       List<CompletableFuture<WatchReplies>> watches, Logger LOG) throws 
Exception {
     for(int i = 0; i < replies.size(); i++) {

Reply via email to