This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new a7ab5e6 RATIS-830. Add a metric for tracking failed client requests
on a server (#205)
a7ab5e6 is described below
commit a7ab5e6fb4dd38a36fafa60e871f025b75843141
Author: Rui Wang <[email protected]>
AuthorDate: Tue Sep 22 22:30:37 2020 -0700
RATIS-830. Add a metric for tracking failed client requests on a server
(#205)
* RATIS-830. Add a metric for tracking failed client requests on a server
* fixup! address comments.
---
.../apache/ratis/server/impl/RaftServerImpl.java | 25 +++++++-
.../ratis/server/impl/RaftServerMetrics.java | 29 ++++++++++
.../server/impl/TestRatisServerMetricsBase.java | 67 ++++++++++++++++++++++
.../TestRatisServerMetricsWithSimulatedRpc.java | 28 +++++++++
4 files changed, 147 insertions(+), 2 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index b1d56d6..88ac669 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -19,6 +19,7 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.*;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
@@ -628,14 +629,16 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
final Timer.Context timerContext = (timer != null) ? timer.time() : null;
CompletableFuture<RaftClientReply> replyFuture;
+
if (request.is(RaftClientRequestProto.TypeCase.STALEREAD)) {
- replyFuture = staleReadAsync(request);
+ replyFuture = staleReadAsync(request);
} else {
// first check the server's leader state
CompletableFuture<RaftClientReply> reply = checkLeaderState(request,
null);
if (reply != null) {
return reply;
}
+
// let the state machine handle read-only request from client
RaftClientRequest.Type type = request.getType();
if (type.is(RaftClientRequestProto.TypeCase.STREAM)) {
@@ -652,7 +655,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
if (type.is(RaftClientRequestProto.TypeCase.READ)) {
// TODO: We might not be the leader anymore by the time this completes.
// See the RAFT paper section 8 (last part)
- replyFuture =
processQueryFuture(stateMachine.query(request.getMessage()), request);
+ replyFuture =
processQueryFuture(stateMachine.query(request.getMessage()), request);
} else if (type.is(RaftClientRequestProto.TypeCase.WATCH)) {
replyFuture = watchAsync(request);
} else if (type.is(RaftClientRequestProto.TypeCase.STREAM)) {
@@ -684,14 +687,32 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
}
}
+ final RaftClientRequest.Type type = request.getType();
replyFuture.whenComplete((clientReply, exception) -> {
if (clientReply.isSuccess() && timerContext != null) {
timerContext.stop();
}
+ if (exception != null || clientReply.getException() != null) {
+ incFailedRequestCount(type);
+ }
});
return replyFuture;
}
+ private void incFailedRequestCount(RaftClientRequest.Type type) {
+ if (type.is(TypeCase.STALEREAD)) {
+ raftServerMetrics.onFailedClientStaleRead();
+ } else if (type.is(TypeCase.WATCH)) {
+ raftServerMetrics.onFailedClientWatch();
+ } else if (type.is(TypeCase.WRITE)) {
+ raftServerMetrics.onFailedClientWrite();
+ } else if (type.is(TypeCase.READ)) {
+ raftServerMetrics.onFailedClientRead();
+ } else if (type.is(TypeCase.STREAM)) {
+ raftServerMetrics.onFailedClientStream();
+ }
+ }
+
private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest
request) {
return role.getLeaderState()
.map(ls -> ls.addWatchReqeust(request))
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
index ec3ad68..3b24010 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
@@ -65,6 +65,16 @@ public final class RaftServerMetrics extends RatisMetrics {
public static final String RETRY_CACHE_HIT_RATE_METRIC = "retryCacheHitRate";
public static final String RETRY_CACHE_MISS_COUNT_METRIC =
"retryCacheMissCount";
public static final String RETRY_CACHE_MISS_RATE_METRIC =
"retryCacheMissRate";
+ public static final String RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT =
+ "numFailedClientStaleReadOnServer";
+ public static final String RATIS_SERVER_FAILED_CLIENT_READ_COUNT =
+ "numFailedClientReadOnServer";
+ public static final String RATIS_SERVER_FAILED_CLIENT_WRITE_COUNT =
+ "numFailedClientWriteOnServer";
+ public static final String RATIS_SERVER_FAILED_CLIENT_WATCH_COUNT =
+ "numFailedClientWatchOnServer";
+ public static final String RATIS_SERVER_FAILED_CLIENT_STREAM_COUNT =
+ "numFailedClientStreamOnServer";
private Map<String, Long> followerLastHeartbeatElapsedTimeMap = new
HashMap<>();
private CommitInfoCache commitInfoCache;
@@ -230,6 +240,25 @@ public final class RaftServerMetrics extends RatisMetrics {
registry.counter(RESOURCE_LIMIT_HIT_COUNTER).inc();
}
+ void onFailedClientStaleRead() {
+ registry.counter(RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT).inc();
+ }
+
+ void onFailedClientRead() {
+ registry.counter(RATIS_SERVER_FAILED_CLIENT_READ_COUNT).inc();
+ }
+
+ void onFailedClientWatch() {
+ registry.counter(RATIS_SERVER_FAILED_CLIENT_WATCH_COUNT).inc();
+ }
+
+ void onFailedClientWrite() {
+ registry.counter(RATIS_SERVER_FAILED_CLIENT_WRITE_COUNT).inc();
+ }
+
+ void onFailedClientStream() {
+ registry.counter(RATIS_SERVER_FAILED_CLIENT_STREAM_COUNT).inc();
+ }
public RatisMetricRegistry getRegistry() {
return registry;
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
new file mode 100644
index 0000000..1ae94c9
--- /dev/null
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static
org.apache.ratis.server.impl.RaftServerMetrics.RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.log4j.Level;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.Log4jUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Tests on Ratis server metrics. */
+public abstract class TestRatisServerMetricsBase<CLUSTER extends
MiniRaftCluster>
+ extends BaseTest
+ implements MiniRaftCluster.Factory.Get<CLUSTER> {
+ {
+ Log4jUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+ Log4jUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+ }
+
+ private static final int NUM_SERVERS = 3;
+
+ @Test
+ public void testClientFailedRequest() throws Exception {
+ runWithNewCluster(NUM_SERVERS, this::runTestClientFailedRequest);
+ }
+
+ void runTestClientFailedRequest(CLUSTER cluster)
+ throws InterruptedException, IOException, ExecutionException {
+ RaftServerImpl leaderImpl = RaftTestUtil.waitForLeader(cluster);
+ ClientId clientId = ClientId.randomId();
+ // StaleRead with Long.MAX_VALUE minIndex will fail.
+ RaftClientRequest r = new RaftClientRequest(clientId, leaderImpl.getId(),
cluster.getGroupId(),
+ 0, Message.EMPTY,
RaftClientRequest.staleReadRequestType(Long.MAX_VALUE), null);
+ CompletableFuture<RaftClientReply> f =
leaderImpl.submitClientRequestAsync(r);
+ Assert.assertTrue(!f.get().isSuccess());
+ assertEquals(1L,
+
leaderImpl.getRaftServerMetrics().getRegistry().counter(RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT).getCount());
+ }
+}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRatisServerMetricsWithSimulatedRpc.java
b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRatisServerMetricsWithSimulatedRpc.java
new file mode 100644
index 0000000..f7173bf
--- /dev/null
+++
b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRatisServerMetricsWithSimulatedRpc.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
+import org.apache.ratis.server.impl.TestRatisServerMetricsBase;
+
+/** Tests on Ratis server metrics with simulated rpc. */
+public class TestRatisServerMetricsWithSimulatedRpc
+ extends TestRatisServerMetricsBase<MiniRaftClusterWithSimulatedRpc>
+ implements MiniRaftClusterWithSimulatedRpc.FactoryGet{
+}