This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new e767c15 RATIS-1444. Add metric to streaming (#607)
e767c15 is described below
commit e767c1532a690138f5b7559ffd6b9e08f65eedf9
Author: hao guo <[email protected]>
AuthorDate: Mon Feb 28 16:02:01 2022 +0800
RATIS-1444. Add metric to streaming (#607)
---
.../netty/metrics/NettyServerStreamRpcMetrics.java | 131 +++++++++++++++++++++
.../ratis/netty/server/DataStreamManagement.java | 53 +++++++--
.../ratis/netty/server/NettyServerStreamRpc.java | 7 +-
3 files changed, 180 insertions(+), 11 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/metrics/NettyServerStreamRpcMetrics.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/metrics/NettyServerStreamRpcMetrics.java
new file mode 100644
index 0000000..33f3632
--- /dev/null
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/metrics/NettyServerStreamRpcMetrics.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty.metrics;
+
+import com.codahale.metrics.Timer;
+import org.apache.ratis.metrics.MetricRegistryInfo;
+import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.metrics.RatisMetrics;
+
+import java.util.Locale;
+
+public class NettyServerStreamRpcMetrics extends RatisMetrics {
+ private static final String METRICS_APP_NAME = "ratis_netty";
+ private static final String METRICS_COMP_NAME = "stream_server";
+ private static final String METRICS_DESC = "Metrics for Ratis Netty Stream
Server";
+
+ private static final String METRICS_LATENCY = "%s_latency";
+ private static final String METRICS_SUCCESS = "%s_success_reply_count";
+ private static final String METRICS_FAIL = "%s_fail_reply_count";
+ private static final String METRICS_NUM_REQUESTS = "num_requests_%s";
+
+ public enum RequestType {
+ CHANNEL_READ, HEADER, LOCAL_WRITE, REMOTE_WRITE, STATE_MACHINE_STREAM,
START_TRANSACTION;
+
+ private final String numRequestsString;
+ private final String successCountString;
+ private final String failCountString;
+ private final String latencyString;
+
+ RequestType() {
+ final String lower = name().toLowerCase(Locale.ENGLISH);
+ this.numRequestsString = String.format(METRICS_NUM_REQUESTS, lower);
+ this.successCountString = String.format(METRICS_SUCCESS, lower);
+ this.failCountString = String.format(METRICS_FAIL, lower);
+ this.latencyString = String.format(METRICS_LATENCY, lower);
+ }
+
+ String getNumRequestsString() {
+ return numRequestsString;
+ }
+ String getSuccessCountString() {
+ return successCountString;
+ }
+ String getFailCountString() {
+ return failCountString;
+ }
+ String getLatencyString() {
+ return latencyString;
+ }
+ }
+
+ public static final class RequestContext {
+ private final Timer.Context timerContext;
+
+ private RequestContext(Timer.Context timerContext) {
+ this.timerContext = timerContext;
+ }
+
+ Timer.Context getTimerContext() {
+ return timerContext;
+ }
+ }
+
+ public final class RequestMetrics {
+ private final RequestType type;
+ private final Timer timer;
+
+ private RequestMetrics(RequestType type) {
+ this.type = type;
+ this.timer = getLatencyTimer(type);
+ }
+
+ public RequestContext start() {
+ onRequestCreate(type);
+ return new RequestContext(timer.time());
+ }
+
+ public void stop(RequestContext context, boolean success) {
+ context.getTimerContext().stop();
+ if (success) {
+ onRequestSuccess(type);
+ } else {
+ onRequestFail(type);
+ }
+ }
+ }
+
+ public NettyServerStreamRpcMetrics(String serverId) {
+ registry = getMetricRegistryForGrpcServer(serverId);
+ }
+
+ private RatisMetricRegistry getMetricRegistryForGrpcServer(String serverId) {
+ return create(new MetricRegistryInfo(serverId,
+ METRICS_APP_NAME, METRICS_COMP_NAME, METRICS_DESC));
+ }
+
+ public RequestMetrics newRequestMetrics(RequestType type) {
+ return new RequestMetrics(type);
+ }
+
+ public Timer getLatencyTimer(RequestType type) {
+ return registry.timer(type.getLatencyString());
+ }
+
+ public void onRequestCreate(RequestType type) {
+ registry.counter(type.getNumRequestsString()).inc();
+ }
+
+ public void onRequestSuccess(RequestType type) {
+ registry.counter(type.getSuccessCountString()).inc();
+ }
+
+ public void onRequestFail(RequestType type) {
+ registry.counter(type.getFailCountString()).inc();
+ }
+}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 37cf90e..301a7d9 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -26,6 +26,10 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
+import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics;
+import
org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics.RequestContext;
+import
org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics.RequestMetrics;
+import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics.RequestType;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
@@ -78,15 +82,19 @@ public class DataStreamManagement {
static class LocalStream {
private final CompletableFuture<DataStream> streamFuture;
private final AtomicReference<CompletableFuture<Long>> writeFuture;
+ private final RequestMetrics metrics;
- LocalStream(CompletableFuture<DataStream> streamFuture) {
+ LocalStream(CompletableFuture<DataStream> streamFuture, RequestMetrics
metrics) {
this.streamFuture = streamFuture;
this.writeFuture = new AtomicReference<>(streamFuture.thenApply(s ->
0L));
+ this.metrics = metrics;
}
CompletableFuture<Long> write(ByteBuf buf, WriteOption[] options, Executor
executor) {
+ final RequestContext context = metrics.start();
return composeAsync(writeFuture, executor,
- n -> streamFuture.thenCompose(stream -> writeToAsync(buf, options,
stream, executor)));
+ n -> streamFuture.thenCompose(stream -> writeToAsync(buf, options,
stream, executor)
+ .whenComplete((l, e) -> metrics.stop(context, e == null))));
}
}
@@ -94,14 +102,18 @@ public class DataStreamManagement {
private final DataStreamOutputRpc out;
private final AtomicReference<CompletableFuture<DataStreamReply>>
sendFuture
= new AtomicReference<>(CompletableFuture.completedFuture(null));
+ private final RequestMetrics metrics;
- RemoteStream(DataStreamOutputRpc out) {
+ RemoteStream(DataStreamOutputRpc out, RequestMetrics metrics) {
+ this.metrics = metrics;
this.out = out;
}
CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request,
Executor executor) {
+ final RequestContext context = metrics.start();
return composeAsync(sendFuture, executor,
- n -> out.writeAsync(request.slice().nioBuffer(),
request.getWriteOptions()));
+ n -> out.writeAsync(request.slice().nioBuffer(),
request.getWriteOptions())
+ .whenComplete((l, e) -> metrics.stop(context, e == null)));
}
}
@@ -116,15 +128,18 @@ public class DataStreamManagement {
= new AtomicReference<>(CompletableFuture.completedFuture(null));
StreamInfo(RaftClientRequest request, boolean primary,
CompletableFuture<DataStream> stream, RaftServer server,
- CheckedBiFunction<RaftClientRequest, Set<RaftPeer>,
Set<DataStreamOutputRpc>, IOException> getStreams)
+ CheckedBiFunction<RaftClientRequest, Set<RaftPeer>,
Set<DataStreamOutputRpc>, IOException> getStreams,
+ Function<RequestType, RequestMetrics> metricsConstructor)
throws IOException {
this.request = request;
this.primary = primary;
- this.local = new LocalStream(stream);
+ this.local = new LocalStream(stream,
metricsConstructor.apply(RequestType.LOCAL_WRITE));
this.server = server;
final Set<RaftPeer> successors = getSuccessors(server.getId());
final Set<DataStreamOutputRpc> outs = getStreams.apply(request,
successors);
- this.remotes =
outs.stream().map(RemoteStream::new).collect(Collectors.toSet());
+ this.remotes = outs.stream()
+ .map(o -> new RemoteStream(o,
metricsConstructor.apply(RequestType.REMOTE_WRITE)))
+ .collect(Collectors.toSet());
}
AtomicReference<CompletableFuture<Void>> getPrevious() {
@@ -213,7 +228,9 @@ public class DataStreamManagement {
private final Executor requestExecutor;
private final Executor writeExecutor;
- DataStreamManagement(RaftServer server) {
+ private final NettyServerStreamRpcMetrics nettyServerStreamRpcMetrics;
+
+ DataStreamManagement(RaftServer server, NettyServerStreamRpcMetrics metrics)
{
this.server = server;
this.name = server.getId() + "-" +
JavaUtils.getClassSimpleName(getClass());
@@ -225,13 +242,20 @@ public class DataStreamManagement {
this.writeExecutor =
ConcurrentUtils.newThreadPoolWithMax(useCachedThreadPool,
RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties),
name + "-write-");
+
+ this.nettyServerStreamRpcMetrics = metrics;
}
private CompletableFuture<DataStream>
computeDataStreamIfAbsent(RaftClientRequest request) throws IOException {
final Division division = server.getDivision(request.getRaftGroupId());
final ClientInvocationId invocationId =
ClientInvocationId.valueOf(request);
final MemoizedSupplier<CompletableFuture<DataStream>> supplier =
JavaUtils.memoize(
- () -> division.getStateMachine().data().stream(request));
+ () -> {
+ final RequestMetrics metrics =
getMetrics().newRequestMetrics(RequestType.STATE_MACHINE_STREAM);
+ final RequestContext context = metrics.start();
+ return division.getStateMachine().data().stream(request)
+ .whenComplete((r, e) -> metrics.stop(context, e == null));
+ });
final CompletableFuture<DataStream> f = division.getDataStreamMap()
.computeIfAbsent(invocationId, key -> supplier.get());
if (!supplier.isInitialized()) {
@@ -246,7 +270,8 @@ public class DataStreamManagement {
final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
RaftClientRequestProto.parseFrom(buf.nioBuffer()));
final boolean isPrimary = server.getId().equals(request.getServerId());
- return new StreamInfo(request, isPrimary,
computeDataStreamIfAbsent(request), server, getStreams);
+ return new StreamInfo(request, isPrimary,
computeDataStreamIfAbsent(request), server, getStreams,
+ getMetrics()::newRequestMetrics);
} catch (Throwable e) {
throw new CompletionException(e);
}
@@ -326,12 +351,15 @@ public class DataStreamManagement {
private CompletableFuture<RaftClientReply> startTransaction(StreamInfo info,
DataStreamRequestByteBuf request,
long bytesWritten, ChannelHandlerContext ctx) {
+ final RequestMetrics metrics =
getMetrics().newRequestMetrics(RequestType.START_TRANSACTION);
+ final RequestContext context = metrics.start();
try {
AsyncRpcApi asyncRpcApi = (AsyncRpcApi)
(server.getDivision(info.getRequest()
.getRaftGroupId())
.getRaftClient()
.async());
return asyncRpcApi.sendForward(info.request).whenCompleteAsync((reply,
e) -> {
+ metrics.stop(context, e == null);
if (e != null) {
replyDataStreamException(server, e, info.getRequest(), request, ctx);
} else {
@@ -397,6 +425,7 @@ public class DataStreamManagement {
throw new IllegalStateException("Failed to create a new stream for " +
request
+ " since a stream already exists Key: " + key + " StreamInfo:" +
info);
}
+ getMetrics().onRequestCreate(RequestType.HEADER);
} else if (close) {
info = Optional.ofNullable(streams.remove(key)).orElseThrow(
() -> new IllegalStateException("Failed to remove StreamInfo for " +
request));
@@ -475,6 +504,10 @@ public class DataStreamManagement {
return true;
}
+ NettyServerStreamRpcMetrics getMetrics() {
+ return nettyServerStreamRpcMetrics;
+ }
+
@Override
public String toString() {
return name;
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 95e045d..04b9571 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -25,6 +25,7 @@ import
org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.netty.NettyUtils;
+import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
@@ -118,9 +119,12 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
private final DataStreamManagement requests;
private final List<Proxies> proxies = new ArrayList<>();
+ private final NettyServerStreamRpcMetrics metrics;
+
public NettyServerStreamRpc(RaftServer server) {
this.name = server.getId() + "-" +
JavaUtils.getClassSimpleName(getClass());
- this.requests = new DataStreamManagement(server);
+ this.metrics = new NettyServerStreamRpcMetrics(this.name);
+ this.requests = new DataStreamManagement(server, metrics);
final RaftProperties properties = server.getProperties();
@@ -188,6 +192,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
+
metrics.onRequestCreate(NettyServerStreamRpcMetrics.RequestType.CHANNEL_READ);
if (!(msg instanceof DataStreamRequestByteBuf)) {
LOG.error("Unexpected message class {}, ignoring ...",
msg.getClass().getName());
return;