This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new ced7dbf  RATIS-649. Add metrics related to ClientRequests. Contributed 
by Aravindan Vijayan.
ced7dbf is described below

commit ced7dbfea404523ebc6b415d2e42979c2ac6b351
Author: Shashikant Banerjee <[email protected]>
AuthorDate: Fri Oct 18 16:25:22 2019 +0530

    RATIS-649. Add metrics related to ClientRequests. Contributed by Aravindan 
Vijayan.
---
 .../apache/ratis/protocol/RaftClientRequest.java   |   2 +-
 .../apache/ratis/server/impl/FollowerState.java    |   2 +-
 .../org/apache/ratis/server/impl/LeaderState.java  |   4 +-
 .../apache/ratis/server/impl/PendingRequests.java  |  10 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   | 104 ++++++++++++---------
 .../ratis/server/impl/RaftServerMetrics.java       |  51 +++++++++-
 .../ratis/server/metrics/RatisMetricNames.java     |  15 +++
 .../apache/ratis/server/metrics/RatisMetrics.java  |   5 -
 .../apache/ratis/grpc/TestRaftServerWithGrpc.java  |  43 ++++++++-
 9 files changed, 174 insertions(+), 62 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index ae4e9ae..4c10c0c 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -127,7 +127,7 @@ public class RaftClientRequest extends RaftClientMessage {
       return (WatchRequestTypeProto)proto;
     }
 
-    static String toString(ReplicationLevel replication) {
+    public static String toString(ReplicationLevel replication) {
       return replication == ReplicationLevel.MAJORITY? "": "-" + replication;
     }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index 5f5bddb..fea78b7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -62,7 +62,7 @@ class FollowerState extends Daemon {
   FollowerState(RaftServerImpl server) {
     this.name = server.getMemberId() + "-" + getClass().getSimpleName();
     this.server = server;
-    raftServerMetrics = RaftServerMetrics.getRaftServerMetrics(server);
+    raftServerMetrics = server.getRaftServerMetrics();
     raftServerMetrics.addPeerCommitIndexGauge(server.getPeer());
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 5886124..8f3e54b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -215,7 +215,8 @@ public class LeaderState {
 
     this.eventQueue = new EventQueue();
     processor = new EventProcessor();
-    this.pendingRequests = new PendingRequests(server.getMemberId(), 
properties);
+    raftServerMetrics = server.getRaftServerMetrics();
+    this.pendingRequests = new PendingRequests(server.getMemberId(), 
properties, raftServerMetrics);
     this.watchRequests = new WatchRequests(server.getMemberId(), properties);
 
     final RaftConfiguration conf = server.getRaftConf();
@@ -223,7 +224,6 @@ public class LeaderState {
     placeHolderIndex = raftLog.getNextIndex();
 
     senders = new SenderList();
-    raftServerMetrics = RaftServerMetrics.getRaftServerMetrics(server);
     addSenders(others, placeHolderIndex, true);
     voterLists = divideFollowers(conf);
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 7fa03b8..b37b4f0 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -51,21 +51,25 @@ class PendingRequests {
   private static class RequestMap {
     private final Object name;
     private final ConcurrentMap<Long, PendingRequest> map = new 
ConcurrentHashMap<>();
+    private final RaftServerMetrics raftServerMetrics;
 
     /** Permits to put new requests, always synchronized. */
     private final Map<Permit, Permit> permits = new HashMap<>();
     /** Track and limit the number of requests. */
     private final ResourceSemaphore resource;
 
-    RequestMap(Object name, int capacity) {
+    RequestMap(Object name, int capacity, RaftServerMetrics raftServerMetrics) 
{
       this.name = name;
       this.resource = new ResourceSemaphore(capacity);
+      this.raftServerMetrics = raftServerMetrics;
+      raftServerMetrics.addNumPendingRequestsGauge(resource, capacity);
     }
 
     Permit tryAcquire() {
       final boolean acquired = resource.tryAcquire();
       LOG.trace("tryAcquire? {}", acquired);
       if (!acquired) {
+        raftServerMetrics.onRequestQueueLimitHit();
         return null;
       }
       return putPermit();
@@ -135,9 +139,9 @@ class PendingRequests {
   private final String name;
   private final RequestMap pendingRequests;
 
-  PendingRequests(RaftGroupMemberId id, RaftProperties properties) {
+  PendingRequests(RaftGroupMemberId id, RaftProperties properties, 
RaftServerMetrics raftServerMetrics) {
     this.name = id + "-" + getClass().getSimpleName();
-    this.pendingRequests = new RequestMap(id, 
RaftServerConfigKeys.Write.elementLimit(properties));
+    this.pendingRequests = new RequestMap(id, 
RaftServerConfigKeys.Write.elementLimit(properties), raftServerMetrics);
   }
 
   Permit tryAcquire() {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 443f23d..5434698 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -18,7 +18,6 @@
 package org.apache.ratis.server.impl;
 
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
@@ -26,8 +25,6 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerMXBean;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.metrics.LeaderElectionMetrics;
-import org.apache.ratis.server.metrics.RatisMetricNames;
-import org.apache.ratis.server.metrics.RatisMetrics;
 import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -92,7 +89,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
 
   private final RaftServerJmxAdapter jmxAdapter;
   private final LeaderElectionMetrics leaderElectionMetricsRegistry;
-  private final RatisMetricRegistry raftServerMetricsRegistry;
+  private final RaftServerMetrics raftServerMetrics;
 
   private AtomicReference<TermIndex> inProgressInstallSnapshotRequest;
 
@@ -119,7 +116,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
 
     this.jmxAdapter = new RaftServerJmxAdapter();
     this.leaderElectionMetricsRegistry = getLeaderElectionMetrics(this);
-    this.raftServerMetricsRegistry = 
RatisMetrics.getMetricsRegistryForServer(id.toString());
+    this.raftServerMetrics = RaftServerMetrics.getRaftServerMetrics(this);
   }
 
   private RetryCache initRetryCache(RaftProperties prop) {
@@ -545,49 +542,60 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       RaftClientRequest request) throws IOException {
     assertLifeCycleState(RUNNING);
     LOG.debug("{}: receive client request({})", getMemberId(), request);
-    if (request.is(RaftClientRequestProto.TypeCase.STALEREAD)) {
-      return staleReadAsync(request);
-    }
-
-    // first check the server's leader state
-    CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null);
-    if (reply != null) {
-      return reply;
-    }
+    Timer timer = raftServerMetrics.getClientRequestTimer(request);
+    final Timer.Context timerContext = (timer != null) ? timer.time() : null;
 
-    // let the state machine handle read-only request from client
-    final StateMachine stateMachine = getStateMachine();
-    if (request.is(RaftClientRequestProto.TypeCase.READ)) {
-      // TODO: We might not be the leader anymore by the time this completes.
-      // See the RAFT paper section 8 (last part)
-      return processQueryFuture(stateMachine.query(request.getMessage()), 
request);
-    }
-
-    if (request.is(RaftClientRequestProto.TypeCase.WATCH)) {
-      return watchAsync(request);
+    CompletableFuture<RaftClientReply> replyFuture;
+    if (request.is(RaftClientRequestProto.TypeCase.STALEREAD)) {
+      replyFuture =  staleReadAsync(request);
+    } else {
+      // first check the server's leader state
+      CompletableFuture<RaftClientReply> reply = checkLeaderState(request, 
null);
+      if (reply != null) {
+        return reply;
+      }
+      // let the state machine handle read-only request from client
+      final StateMachine stateMachine = getStateMachine();
+      if (request.is(RaftClientRequestProto.TypeCase.READ)) {
+        // TODO: We might not be the leader anymore by the time this completes.
+        // See the RAFT paper section 8 (last part)
+        replyFuture =  
processQueryFuture(stateMachine.query(request.getMessage()), request);
+      } else if (request.is(RaftClientRequestProto.TypeCase.WATCH)) {
+        replyFuture = watchAsync(request);
+      } else {
+        // query the retry cache
+        RetryCache.CacheQueryResult previousResult = retryCache.queryCache(
+            request.getClientId(), request.getCallId());
+        if (previousResult.isRetry()) {
+          // if the previous attempt is still pending or it succeeded, return 
its
+          // future
+          raftServerMetrics.onRetryRequestCacheHit();
+          replyFuture = previousResult.getEntry().getReplyFuture();
+        } else {
+          final RetryCache.CacheEntry cacheEntry = previousResult.getEntry();
+
+          // TODO: this client request will not be added to pending requests 
until
+          // later which means that any failure in between will leave partial 
state in
+          // the state machine. We should call cancelTransaction() for failed 
requests
+          TransactionContext context = stateMachine.startTransaction(request);
+          if (context.getException() != null) {
+            RaftClientReply exceptionReply = new RaftClientReply(request,
+                new StateMachineException(getMemberId(), 
context.getException()), getCommitInfos());
+            cacheEntry.failWithReply(exceptionReply);
+            replyFuture =  CompletableFuture.completedFuture(exceptionReply);
+          } else {
+            replyFuture = appendTransaction(request, context, cacheEntry);
+          }
+        }
+      }
     }
 
-    // query the retry cache
-    RetryCache.CacheQueryResult previousResult = retryCache.queryCache(
-        request.getClientId(), request.getCallId());
-    if (previousResult.isRetry()) {
-      // if the previous attempt is still pending or it succeeded, return its
-      // future
-      return previousResult.getEntry().getReplyFuture();
-    }
-    final RetryCache.CacheEntry cacheEntry = previousResult.getEntry();
-
-    // TODO: this client request will not be added to pending requests until
-    // later which means that any failure in between will leave partial state 
in
-    // the state machine. We should call cancelTransaction() for failed 
requests
-    TransactionContext context = stateMachine.startTransaction(request);
-    if (context.getException() != null) {
-      RaftClientReply exceptionReply = new RaftClientReply(request,
-          new StateMachineException(getMemberId(), context.getException()), 
getCommitInfos());
-      cacheEntry.failWithReply(exceptionReply);
-      return CompletableFuture.completedFuture(exceptionReply);
-    }
-    return appendTransaction(request, context, cacheEntry);
+    replyFuture.whenComplete((clientReply, exception) -> {
+      if (clientReply.isSuccess() && timerContext != null) {
+        timerContext.stop();
+      }
+    });
+    return replyFuture;
   }
 
   private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest 
request) {
@@ -910,7 +918,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     final long currentTerm;
     final long followerCommit = state.getLog().getLastCommittedIndex();
     final Optional<FollowerState> followerState;
-    Timer.Context timer = 
raftServerMetricsRegistry.timer(RatisMetricNames.FOLLOWER_APPEND_ENTRIES_LATENCY).time();
+    Timer.Context timer = 
raftServerMetrics.getFollowerAppendEntryTimer().time();
     synchronized (this) {
       final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
       currentTerm = state.getCurrentTerm();
@@ -1323,6 +1331,10 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
     return leaderElectionMetricsRegistry;
   }
 
+  public RaftServerMetrics getRaftServerMetrics() {
+    return raftServerMetrics;
+  }
+
   private class RaftServerJmxAdapter extends JmxRegister implements 
RaftServerMXBean {
     @Override
     public String getId() {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
index df43990..192c959 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
@@ -18,21 +18,33 @@
 
 package org.apache.ratis.server.impl;
 
+import static 
org.apache.ratis.server.metrics.RatisMetricNames.FOLLOWER_APPEND_ENTRIES_LATENCY;
 import static 
org.apache.ratis.server.metrics.RatisMetricNames.LEADER_METRIC_PEER_COMMIT_INDEX;
 import static 
org.apache.ratis.server.metrics.RatisMetricNames.LEADER_METRIC_FOLLOWER_LAST_HEARTBEAT_ELAPSED_TIME_METRIC;
+import static 
org.apache.ratis.server.metrics.RatisMetricNames.RAFT_CLIENT_READ_REQUEST;
+import static 
org.apache.ratis.server.metrics.RatisMetricNames.RAFT_CLIENT_STALE_READ_REQUEST;
+import static 
org.apache.ratis.server.metrics.RatisMetricNames.RAFT_CLIENT_WATCH_REQUEST;
+import static 
org.apache.ratis.server.metrics.RatisMetricNames.RAFT_CLIENT_WRITE_REQUEST;
+import static 
org.apache.ratis.server.metrics.RatisMetricNames.REQUEST_QUEUE_LIMIT_HIT_COUNTER;
+import static 
org.apache.ratis.server.metrics.RatisMetricNames.REQUEST_QUEUE_SIZE;
+import static 
org.apache.ratis.server.metrics.RatisMetricNames.RETRY_REQUEST_CACHE_HIT_COUNTER;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedMap;
 
 import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Timer;
 
 import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
+import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.metrics.RatisMetrics;
 import 
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ResourceSemaphore;
 
 /**
  * Metric Registry for Raft Group Server. One instance per leader/follower.
@@ -49,13 +61,11 @@ public final class RaftServerMetrics {
       RaftServerImpl raftServer) {
     RaftServerMetrics serverMetrics = new RaftServerMetrics(raftServer);
     metricsMap.put(raftServer.getMemberId().toString(), serverMetrics);
-
     return serverMetrics;
   }
 
   private RaftServerMetrics(RaftServerImpl server) {
-    registry = RatisMetrics.getMetricRegistryForRaftServer(
-        server.getMemberId().toString());
+    registry = 
RatisMetrics.getMetricRegistryForRaftServer(server.getMemberId().toString());
     commitInfoCache = server.getCommitInfoCache();
     addPeerCommitIndexGauge(server.getPeer());
   }
@@ -130,4 +140,39 @@ public final class RaftServerMetrics {
     followerLastHeartbeatElapsedTimeMap.put(peer.getId().toString(),
         elapsedTime);
   }
+
+  public Timer getFollowerAppendEntryTimer() {
+    return registry.timer(FOLLOWER_APPEND_ENTRIES_LATENCY);
+  }
+
+  public Timer getTimer(String timerName) {
+    return registry.timer(timerName);
+  }
+
+  public Timer getClientRequestTimer(RaftClientRequest request) {
+    if (request.is(TypeCase.READ)) {
+      return getTimer(RAFT_CLIENT_READ_REQUEST);
+    } else if (request.is(TypeCase.STALEREAD)) {
+      return getTimer(RAFT_CLIENT_STALE_READ_REQUEST);
+    } else if (request.is(TypeCase.WATCH)) {
+      String watchType = 
RaftClientRequest.Type.toString(request.getType().getWatch().getReplication());
+      return getTimer(String.format(RAFT_CLIENT_WATCH_REQUEST, watchType));
+    } else if (request.is(TypeCase.WRITE)) {
+      return getTimer(RAFT_CLIENT_WRITE_REQUEST);
+    }
+    return null;
+  }
+
+  public void onRetryRequestCacheHit() {
+    registry.counter(RETRY_REQUEST_CACHE_HIT_COUNTER).inc();
+  }
+
+  public void onRequestQueueLimitHit() {
+    registry.counter(REQUEST_QUEUE_LIMIT_HIT_COUNTER).inc();
+  }
+
+  public void addNumPendingRequestsGauge(ResourceSemaphore resourceSemaphore, 
int capacity) {
+    registry.gauge(REQUEST_QUEUE_SIZE,
+        () -> () -> (capacity - resourceSemaphore.availablePermits()));
+  }
 }
\ No newline at end of file
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
index c20b1c0..ffd6054 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
@@ -39,6 +39,21 @@ public final class RatisMetricNames {
   public static final String STATEMACHINE_APPLY_COMPLETED_GAUGE =
       "statemachineApplyCompletedIndex";
 
+  // Raft client read request metric timer.
+  public static final String RAFT_CLIENT_READ_REQUEST = "clientReadRequest";
+
+  public static final String RAFT_CLIENT_STALE_READ_REQUEST = 
"clientStaleReadRequest";
+
+  public static final String RAFT_CLIENT_WRITE_REQUEST = "clientWriteRequest";
+
+  public static final String RAFT_CLIENT_WATCH_REQUEST = 
"clientWatch%sRequest";
+
+  public static final String RETRY_REQUEST_CACHE_HIT_COUNTER = 
"numRetryCacheHits";
+
+  public static final String REQUEST_QUEUE_LIMIT_HIT_COUNTER = 
"numRequestQueueLimitHits";
+
+  public static final String REQUEST_QUEUE_SIZE = "numPendingRequestInQueue";
+
   //////////////////////////////
   // Raft Log Write Path Metrics
   /////////////////////////////
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
index 8714260..fd71fe3 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
@@ -75,9 +75,4 @@ public class RatisMetrics {
             RATIS_LOG_WORKER_METRICS_DESC));
     return ratisMetricRegistry.orElse(null);
   }
-
-  public static RatisMetricRegistry getMetricsRegistryForServer(String 
serverId) {
-    return create(new MetricRegistryInfo(serverId, 
RATIS_APPLICATION_NAME_METRICS, RATIS_SERVER_METRICS,
-        RATIS_SERVER_METRICS_DESC));
-  }
 }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 05048b7..41ac36e 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -17,6 +17,11 @@
  */
 package org.apache.ratis.grpc;
 
+import static 
org.apache.ratis.server.metrics.RatisMetricNames.RAFT_CLIENT_READ_REQUEST;
+import static 
org.apache.ratis.server.metrics.RatisMetricNames.RAFT_CLIENT_STALE_READ_REQUEST;
+import static 
org.apache.ratis.server.metrics.RatisMetricNames.RAFT_CLIENT_WATCH_REQUEST;
+import static 
org.apache.ratis.server.metrics.RatisMetricNames.RAFT_CLIENT_WRITE_REQUEST;
+
 import org.apache.log4j.Level;
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.MiniRaftCluster;
@@ -29,6 +34,7 @@ import org.apache.ratis.client.impl.RaftClientTestUtil;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.client.GrpcClientProtocolClient;
 import org.apache.ratis.grpc.client.GrpcClientProtocolService;
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.protocol.AlreadyClosedException;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -37,6 +43,7 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.TimeoutIOException;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerMetrics;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.impl.ServerImplUtils;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
@@ -119,7 +126,7 @@ public class TestRaftServerWithGrpc extends BaseTest 
implements MiniRaftClusterW
 
   @Test
   public void testLeaderRestart() throws Exception {
-    runWithNewCluster(3, this::runTestLeaderRestart);
+    runWithNewCluster(1, this::runTestLeaderRestart);
   }
 
   void runTestLeaderRestart(MiniRaftClusterWithGrpc cluster) throws Exception {
@@ -167,6 +174,40 @@ public class TestRaftServerWithGrpc extends BaseTest 
implements MiniRaftClusterW
 
   }
 
+  @Test
+  public void testRaftClientMetrics() throws Exception {
+    runWithNewCluster(3, this::testRaftClientRequestMetrics);
+  }
+
+  void testRaftClientRequestMetrics(MiniRaftClusterWithGrpc cluster) throws 
IOException,
+      ExecutionException, InterruptedException {
+    final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+    RaftServerMetrics raftServerMetrics = leader.getRaftServerMetrics();
+
+    try (final RaftClient client = cluster.createClient()) {
+      final CompletableFuture<RaftClientReply> f1 = client.sendAsync(new 
SimpleMessage("testing"));
+      Assert.assertTrue(f1.get().isSuccess());
+      
Assert.assertTrue(raftServerMetrics.getTimer(RAFT_CLIENT_WRITE_REQUEST).getCount()
 > 0);
+
+      final CompletableFuture<RaftClientReply> f2 = 
client.sendReadOnlyAsync(new SimpleMessage("testing"));
+      Assert.assertTrue(f2.get().isSuccess());
+      
Assert.assertTrue(raftServerMetrics.getTimer(RAFT_CLIENT_READ_REQUEST).getCount()
 > 0);
+
+      final CompletableFuture<RaftClientReply> f3 = 
client.sendStaleReadAsync(new SimpleMessage("testing"),
+          0, leader.getId());
+      Assert.assertTrue(f3.get().isSuccess());
+      
Assert.assertTrue(raftServerMetrics.getTimer(RAFT_CLIENT_STALE_READ_REQUEST).getCount()
 > 0);
+
+      final CompletableFuture<RaftClientReply> f4 = client.sendWatchAsync(0, 
RaftProtos.ReplicationLevel.ALL);
+      Assert.assertTrue(f4.get().isSuccess());
+      
Assert.assertTrue(raftServerMetrics.getTimer(String.format(RAFT_CLIENT_WATCH_REQUEST,
 "-ALL")).getCount() > 0);
+
+      final CompletableFuture<RaftClientReply> f5 = client.sendWatchAsync(0, 
RaftProtos.ReplicationLevel.MAJORITY);
+      Assert.assertTrue(f5.get().isSuccess());
+      
Assert.assertTrue(raftServerMetrics.getTimer(String.format(RAFT_CLIENT_WATCH_REQUEST,
 "")).getCount() > 0);
+    }
+  }
+
   static RaftClientRequest newRaftClientRequest(RaftClient client, RaftPeerId 
serverId, long seqNum) {
     final SimpleMessage m = new SimpleMessage("m" + seqNum);
     return RaftClientTestUtil.newRaftClientRequest(client, serverId, seqNum, m,

Reply via email to