This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new e767c15  RATIS-1444. Add metric to streaming (#607)
e767c15 is described below

commit e767c1532a690138f5b7559ffd6b9e08f65eedf9
Author: hao guo <[email protected]>
AuthorDate: Mon Feb 28 16:02:01 2022 +0800

    RATIS-1444. Add metric to streaming (#607)
---
 .../netty/metrics/NettyServerStreamRpcMetrics.java | 131 +++++++++++++++++++++
 .../ratis/netty/server/DataStreamManagement.java   |  53 +++++++--
 .../ratis/netty/server/NettyServerStreamRpc.java   |   7 +-
 3 files changed, 180 insertions(+), 11 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/metrics/NettyServerStreamRpcMetrics.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/metrics/NettyServerStreamRpcMetrics.java
new file mode 100644
index 0000000..33f3632
--- /dev/null
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/metrics/NettyServerStreamRpcMetrics.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty.metrics;
+
+import com.codahale.metrics.Timer;
+import org.apache.ratis.metrics.MetricRegistryInfo;
+import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.metrics.RatisMetrics;
+
+import java.util.Locale;
+
+public class NettyServerStreamRpcMetrics extends RatisMetrics {
+  private static final String METRICS_APP_NAME = "ratis_netty";
+  private static final String METRICS_COMP_NAME = "stream_server";
+  private static final String METRICS_DESC = "Metrics for Ratis Netty Stream 
Server";
+
+  private static final String METRICS_LATENCY = "%s_latency";
+  private static final String METRICS_SUCCESS = "%s_success_reply_count";
+  private static final String METRICS_FAIL = "%s_fail_reply_count";
+  private static final String METRICS_NUM_REQUESTS = "num_requests_%s";
+
+  public enum RequestType {
+    CHANNEL_READ, HEADER, LOCAL_WRITE, REMOTE_WRITE, STATE_MACHINE_STREAM, 
START_TRANSACTION;
+
+    private final String numRequestsString;
+    private final String successCountString;
+    private final String failCountString;
+    private final String latencyString;
+
+    RequestType() {
+      final String lower = name().toLowerCase(Locale.ENGLISH);
+      this.numRequestsString = String.format(METRICS_NUM_REQUESTS, lower);
+      this.successCountString = String.format(METRICS_SUCCESS, lower);
+      this.failCountString = String.format(METRICS_FAIL, lower);
+      this.latencyString = String.format(METRICS_LATENCY, lower);
+    }
+
+    String getNumRequestsString() {
+      return numRequestsString;
+    }
+    String getSuccessCountString() {
+      return successCountString;
+    }
+    String getFailCountString() {
+      return failCountString;
+    }
+    String getLatencyString() {
+      return latencyString;
+    }
+  }
+
+  public static final class RequestContext {
+    private final Timer.Context timerContext;
+
+    private RequestContext(Timer.Context timerContext) {
+      this.timerContext = timerContext;
+    }
+
+    Timer.Context getTimerContext() {
+      return timerContext;
+    }
+  }
+
+  public final class RequestMetrics {
+    private final RequestType type;
+    private final Timer timer;
+
+    private RequestMetrics(RequestType type) {
+      this.type = type;
+      this.timer = getLatencyTimer(type);
+    }
+
+    public RequestContext start() {
+      onRequestCreate(type);
+      return new RequestContext(timer.time());
+    }
+
+    public void stop(RequestContext context, boolean success) {
+      context.getTimerContext().stop();
+      if (success) {
+        onRequestSuccess(type);
+      } else {
+        onRequestFail(type);
+      }
+    }
+  }
+
+  public NettyServerStreamRpcMetrics(String serverId) {
+    registry = getMetricRegistryForGrpcServer(serverId);
+  }
+
+  private RatisMetricRegistry getMetricRegistryForGrpcServer(String serverId) {
+    return create(new MetricRegistryInfo(serverId,
+        METRICS_APP_NAME, METRICS_COMP_NAME, METRICS_DESC));
+  }
+
+  public RequestMetrics newRequestMetrics(RequestType type) {
+    return new RequestMetrics(type);
+  }
+
+  public Timer getLatencyTimer(RequestType type) {
+    return registry.timer(type.getLatencyString());
+  }
+
+  public void onRequestCreate(RequestType type) {
+    registry.counter(type.getNumRequestsString()).inc();
+  }
+
+  public void onRequestSuccess(RequestType type) {
+    registry.counter(type.getSuccessCountString()).inc();
+  }
+
+  public void onRequestFail(RequestType type) {
+    registry.counter(type.getFailCountString()).inc();
+  }
+}
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 37cf90e..301a7d9 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -26,6 +26,10 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.io.StandardWriteOption;
 import org.apache.ratis.io.WriteOption;
+import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics;
+import 
org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics.RequestContext;
+import 
org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics.RequestMetrics;
+import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics.RequestType;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
@@ -78,15 +82,19 @@ public class DataStreamManagement {
   static class LocalStream {
     private final CompletableFuture<DataStream> streamFuture;
     private final AtomicReference<CompletableFuture<Long>> writeFuture;
+    private final RequestMetrics metrics;
 
-    LocalStream(CompletableFuture<DataStream> streamFuture) {
+    LocalStream(CompletableFuture<DataStream> streamFuture, RequestMetrics 
metrics) {
       this.streamFuture = streamFuture;
       this.writeFuture = new AtomicReference<>(streamFuture.thenApply(s -> 
0L));
+      this.metrics = metrics;
     }
 
     CompletableFuture<Long> write(ByteBuf buf, WriteOption[] options, Executor 
executor) {
+      final RequestContext context = metrics.start();
       return composeAsync(writeFuture, executor,
-          n -> streamFuture.thenCompose(stream -> writeToAsync(buf, options, 
stream, executor)));
+          n -> streamFuture.thenCompose(stream -> writeToAsync(buf, options, 
stream, executor)
+              .whenComplete((l, e) -> metrics.stop(context, e == null))));
     }
   }
 
@@ -94,14 +102,18 @@ public class DataStreamManagement {
     private final DataStreamOutputRpc out;
     private final AtomicReference<CompletableFuture<DataStreamReply>> 
sendFuture
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
+    private final RequestMetrics metrics;
 
-    RemoteStream(DataStreamOutputRpc out) {
+    RemoteStream(DataStreamOutputRpc out, RequestMetrics metrics) {
+      this.metrics = metrics;
       this.out = out;
     }
 
     CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request, 
Executor executor) {
+      final RequestContext context = metrics.start();
       return composeAsync(sendFuture, executor,
-          n -> out.writeAsync(request.slice().nioBuffer(), 
request.getWriteOptions()));
+          n -> out.writeAsync(request.slice().nioBuffer(), 
request.getWriteOptions())
+              .whenComplete((l, e) -> metrics.stop(context, e == null)));
     }
   }
 
@@ -116,15 +128,18 @@ public class DataStreamManagement {
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
 
     StreamInfo(RaftClientRequest request, boolean primary, 
CompletableFuture<DataStream> stream, RaftServer server,
-        CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, 
Set<DataStreamOutputRpc>, IOException> getStreams)
+        CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, 
Set<DataStreamOutputRpc>, IOException> getStreams,
+        Function<RequestType, RequestMetrics> metricsConstructor)
         throws IOException {
       this.request = request;
       this.primary = primary;
-      this.local = new LocalStream(stream);
+      this.local = new LocalStream(stream, 
metricsConstructor.apply(RequestType.LOCAL_WRITE));
       this.server = server;
       final Set<RaftPeer> successors = getSuccessors(server.getId());
       final Set<DataStreamOutputRpc> outs = getStreams.apply(request, 
successors);
-      this.remotes = 
outs.stream().map(RemoteStream::new).collect(Collectors.toSet());
+      this.remotes = outs.stream()
+          .map(o -> new RemoteStream(o, 
metricsConstructor.apply(RequestType.REMOTE_WRITE)))
+          .collect(Collectors.toSet());
     }
 
     AtomicReference<CompletableFuture<Void>> getPrevious() {
@@ -213,7 +228,9 @@ public class DataStreamManagement {
   private final Executor requestExecutor;
   private final Executor writeExecutor;
 
-  DataStreamManagement(RaftServer server) {
+  private final NettyServerStreamRpcMetrics nettyServerStreamRpcMetrics;
+
+  DataStreamManagement(RaftServer server, NettyServerStreamRpcMetrics metrics) 
{
     this.server = server;
     this.name = server.getId() + "-" + 
JavaUtils.getClassSimpleName(getClass());
 
@@ -225,13 +242,20 @@ public class DataStreamManagement {
     this.writeExecutor = 
ConcurrentUtils.newThreadPoolWithMax(useCachedThreadPool,
           RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties),
           name + "-write-");
+
+    this.nettyServerStreamRpcMetrics = metrics;
   }
 
   private CompletableFuture<DataStream> 
computeDataStreamIfAbsent(RaftClientRequest request) throws IOException {
     final Division division = server.getDivision(request.getRaftGroupId());
     final ClientInvocationId invocationId = 
ClientInvocationId.valueOf(request);
     final MemoizedSupplier<CompletableFuture<DataStream>> supplier = 
JavaUtils.memoize(
-        () -> division.getStateMachine().data().stream(request));
+        () -> {
+          final RequestMetrics metrics = 
getMetrics().newRequestMetrics(RequestType.STATE_MACHINE_STREAM);
+          final RequestContext context = metrics.start();
+          return division.getStateMachine().data().stream(request)
+              .whenComplete((r, e) -> metrics.stop(context, e == null));
+        });
     final CompletableFuture<DataStream> f = division.getDataStreamMap()
         .computeIfAbsent(invocationId, key -> supplier.get());
     if (!supplier.isInitialized()) {
@@ -246,7 +270,8 @@ public class DataStreamManagement {
       final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
           RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       final boolean isPrimary = server.getId().equals(request.getServerId());
-      return new StreamInfo(request, isPrimary, 
computeDataStreamIfAbsent(request), server, getStreams);
+      return new StreamInfo(request, isPrimary, 
computeDataStreamIfAbsent(request), server, getStreams,
+          getMetrics()::newRequestMetrics);
     } catch (Throwable e) {
       throw new CompletionException(e);
     }
@@ -326,12 +351,15 @@ public class DataStreamManagement {
 
   private CompletableFuture<RaftClientReply> startTransaction(StreamInfo info, 
DataStreamRequestByteBuf request,
       long bytesWritten, ChannelHandlerContext ctx) {
+    final RequestMetrics metrics = 
getMetrics().newRequestMetrics(RequestType.START_TRANSACTION);
+    final RequestContext context = metrics.start();
     try {
       AsyncRpcApi asyncRpcApi = (AsyncRpcApi) 
(server.getDivision(info.getRequest()
           .getRaftGroupId())
           .getRaftClient()
           .async());
       return asyncRpcApi.sendForward(info.request).whenCompleteAsync((reply, 
e) -> {
+        metrics.stop(context, e == null);
         if (e != null) {
           replyDataStreamException(server, e, info.getRequest(), request, ctx);
         } else {
@@ -397,6 +425,7 @@ public class DataStreamManagement {
         throw new IllegalStateException("Failed to create a new stream for " + 
request
             + " since a stream already exists Key: " + key + " StreamInfo:" + 
info);
       }
+      getMetrics().onRequestCreate(RequestType.HEADER);
     } else if (close) {
       info = Optional.ofNullable(streams.remove(key)).orElseThrow(
           () -> new IllegalStateException("Failed to remove StreamInfo for " + 
request));
@@ -475,6 +504,10 @@ public class DataStreamManagement {
     return true;
   }
 
+  NettyServerStreamRpcMetrics getMetrics() {
+    return nettyServerStreamRpcMetrics;
+  }
+
   @Override
   public String toString() {
     return name;
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 95e045d..04b9571 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -25,6 +25,7 @@ import 
org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.netty.NettyDataStreamUtils;
 import org.apache.ratis.netty.NettyUtils;
+import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
@@ -118,9 +119,12 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
   private final DataStreamManagement requests;
   private final List<Proxies> proxies = new ArrayList<>();
 
+  private final NettyServerStreamRpcMetrics metrics;
+
   public NettyServerStreamRpc(RaftServer server) {
     this.name = server.getId() + "-" + 
JavaUtils.getClassSimpleName(getClass());
-    this.requests = new DataStreamManagement(server);
+    this.metrics = new NettyServerStreamRpcMetrics(this.name);
+    this.requests = new DataStreamManagement(server, metrics);
 
     final RaftProperties properties = server.getProperties();
 
@@ -188,6 +192,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
 
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
+        
metrics.onRequestCreate(NettyServerStreamRpcMetrics.RequestType.CHANNEL_READ);
         if (!(msg instanceof DataStreamRequestByteBuf)) {
           LOG.error("Unexpected message class {}, ignoring ...", 
msg.getClass().getName());
           return;

Reply via email to