This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new a7ab5e6  RATIS-830. Add a metric for tracking failed client requests 
on a server (#205)
a7ab5e6 is described below

commit a7ab5e6fb4dd38a36fafa60e871f025b75843141
Author: Rui Wang <[email protected]>
AuthorDate: Tue Sep 22 22:30:37 2020 -0700

    RATIS-830. Add a metric for tracking failed client requests on a server 
(#205)
    
    * RATIS-830. Add a metric for tracking failed client requests on a server
    
    * fixup! address comments.
---
 .../apache/ratis/server/impl/RaftServerImpl.java   | 25 +++++++-
 .../ratis/server/impl/RaftServerMetrics.java       | 29 ++++++++++
 .../server/impl/TestRatisServerMetricsBase.java    | 67 ++++++++++++++++++++++
 .../TestRatisServerMetricsWithSimulatedRpc.java    | 28 +++++++++
 4 files changed, 147 insertions(+), 2 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index b1d56d6..88ac669 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -19,6 +19,7 @@ package org.apache.ratis.server.impl;
 
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.*;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
@@ -628,14 +629,16 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
     final Timer.Context timerContext = (timer != null) ? timer.time() : null;
 
     CompletableFuture<RaftClientReply> replyFuture;
+
     if (request.is(RaftClientRequestProto.TypeCase.STALEREAD)) {
-      replyFuture =  staleReadAsync(request);
+      replyFuture = staleReadAsync(request);
     } else {
       // first check the server's leader state
       CompletableFuture<RaftClientReply> reply = checkLeaderState(request, 
null);
       if (reply != null) {
         return reply;
       }
+
       // let the state machine handle read-only request from client
       RaftClientRequest.Type type = request.getType();
       if (type.is(RaftClientRequestProto.TypeCase.STREAM)) {
@@ -652,7 +655,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       if (type.is(RaftClientRequestProto.TypeCase.READ)) {
         // TODO: We might not be the leader anymore by the time this completes.
         // See the RAFT paper section 8 (last part)
-        replyFuture =  
processQueryFuture(stateMachine.query(request.getMessage()), request);
+        replyFuture = 
processQueryFuture(stateMachine.query(request.getMessage()), request);
       } else if (type.is(RaftClientRequestProto.TypeCase.WATCH)) {
         replyFuture = watchAsync(request);
       } else if (type.is(RaftClientRequestProto.TypeCase.STREAM)) {
@@ -684,14 +687,32 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       }
     }
 
+    final RaftClientRequest.Type type = request.getType();
     replyFuture.whenComplete((clientReply, exception) -> {
       if (clientReply.isSuccess() && timerContext != null) {
         timerContext.stop();
       }
+      if (exception != null || clientReply.getException() != null) {
+        incFailedRequestCount(type);
+      }
     });
     return replyFuture;
   }
 
+  private void incFailedRequestCount(RaftClientRequest.Type type) {
+    if (type.is(TypeCase.STALEREAD)) {
+      raftServerMetrics.onFailedClientStaleRead();
+    } else if (type.is(TypeCase.WATCH)) {
+      raftServerMetrics.onFailedClientWatch();
+    } else if (type.is(TypeCase.WRITE)) {
+      raftServerMetrics.onFailedClientWrite();
+    } else if (type.is(TypeCase.READ)) {
+      raftServerMetrics.onFailedClientRead();
+    } else if (type.is(TypeCase.STREAM)) {
+      raftServerMetrics.onFailedClientStream();
+    }
+  }
+
   private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest 
request) {
     return role.getLeaderState()
         .map(ls -> ls.addWatchReqeust(request))
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
index ec3ad68..3b24010 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
@@ -65,6 +65,16 @@ public final class RaftServerMetrics extends RatisMetrics {
   public static final String RETRY_CACHE_HIT_RATE_METRIC = "retryCacheHitRate";
   public static final String RETRY_CACHE_MISS_COUNT_METRIC = 
"retryCacheMissCount";
   public static final String RETRY_CACHE_MISS_RATE_METRIC = 
"retryCacheMissRate";
+  public static final String RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT =
+      "numFailedClientStaleReadOnServer";
+  public static final String RATIS_SERVER_FAILED_CLIENT_READ_COUNT =
+      "numFailedClientReadOnServer";
+  public static final String RATIS_SERVER_FAILED_CLIENT_WRITE_COUNT =
+      "numFailedClientWriteOnServer";
+  public static final String RATIS_SERVER_FAILED_CLIENT_WATCH_COUNT =
+      "numFailedClientWatchOnServer";
+  public static final String RATIS_SERVER_FAILED_CLIENT_STREAM_COUNT =
+      "numFailedClientStreamOnServer";
 
   private Map<String, Long> followerLastHeartbeatElapsedTimeMap = new 
HashMap<>();
   private CommitInfoCache commitInfoCache;
@@ -230,6 +240,25 @@ public final class RaftServerMetrics extends RatisMetrics {
     registry.counter(RESOURCE_LIMIT_HIT_COUNTER).inc();
   }
 
+  void onFailedClientStaleRead() {
+    registry.counter(RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT).inc();
+  }
+
+  void onFailedClientRead() {
+    registry.counter(RATIS_SERVER_FAILED_CLIENT_READ_COUNT).inc();
+  }
+
+  void onFailedClientWatch() {
+    registry.counter(RATIS_SERVER_FAILED_CLIENT_WATCH_COUNT).inc();
+  }
+
+  void onFailedClientWrite() {
+    registry.counter(RATIS_SERVER_FAILED_CLIENT_WRITE_COUNT).inc();
+  }
+
+  void onFailedClientStream() {
+    registry.counter(RATIS_SERVER_FAILED_CLIENT_STREAM_COUNT).inc();
+  }
   public RatisMetricRegistry getRegistry() {
     return registry;
   }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
new file mode 100644
index 0000000..1ae94c9
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static 
org.apache.ratis.server.impl.RaftServerMetrics.RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.log4j.Level;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.Log4jUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Tests on Ratis server metrics. */
+public abstract class TestRatisServerMetricsBase<CLUSTER extends 
MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+  {
+    Log4jUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    Log4jUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+
+  private static final int NUM_SERVERS = 3;
+
+  @Test
+  public void testClientFailedRequest() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestClientFailedRequest);
+  }
+
+  void runTestClientFailedRequest(CLUSTER cluster)
+      throws InterruptedException, IOException, ExecutionException {
+    RaftServerImpl leaderImpl = RaftTestUtil.waitForLeader(cluster);
+    ClientId clientId = ClientId.randomId();
+    // StaleRead with Long.MAX_VALUE minIndex will fail.
+    RaftClientRequest r = new RaftClientRequest(clientId, leaderImpl.getId(), 
cluster.getGroupId(),
+        0, Message.EMPTY, 
RaftClientRequest.staleReadRequestType(Long.MAX_VALUE), null);
+    CompletableFuture<RaftClientReply> f = 
leaderImpl.submitClientRequestAsync(r);
+    Assert.assertTrue(!f.get().isSuccess());
+    assertEquals(1L,
+        
leaderImpl.getRaftServerMetrics().getRegistry().counter(RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT).getCount());
+  }
+}
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRatisServerMetricsWithSimulatedRpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRatisServerMetricsWithSimulatedRpc.java
new file mode 100644
index 0000000..f7173bf
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRatisServerMetricsWithSimulatedRpc.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
+import org.apache.ratis.server.impl.TestRatisServerMetricsBase;
+
+/** Tests on Ratis server metrics with simulated rpc. */
+public class TestRatisServerMetricsWithSimulatedRpc
+    extends TestRatisServerMetricsBase<MiniRaftClusterWithSimulatedRpc>
+    implements MiniRaftClusterWithSimulatedRpc.FactoryGet{
+}

Reply via email to