This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new aab414f  RATIS-805 - Metrics for GRPC messages (#159)
aab414f is described below

commit aab414f2af73809168657beb17419be87b0aa259
Author: anshkhannasbu <[email protected]>
AuthorDate: Tue Aug 4 07:20:27 2020 -0400

    RATIS-805 - Metrics for GRPC messages (#159)
---
 .../grpc/client/GrpcClientProtocolClient.java      |  6 ++
 .../apache/ratis/grpc/metrics/MessageMetrics.java  | 65 ++++++++++++++++++
 .../metrics/intercept/client/MetricClientCall.java | 44 ++++++++++++
 .../intercept/client/MetricClientCallListener.java | 49 ++++++++++++++
 .../intercept/client/MetricClientInterceptor.java  | 56 ++++++++++++++++
 .../metrics/intercept/server/MetricServerCall.java | 49 ++++++++++++++
 .../intercept/server/MetricServerCallListener.java | 44 ++++++++++++
 .../intercept/server/MetricServerInterceptor.java  | 78 ++++++++++++++++++++++
 .../org/apache/ratis/grpc/server/GrpcService.java  | 23 ++++++-
 .../apache/ratis/grpc/TestGrpcMessageMetrics.java  | 71 ++++++++++++++++++++
 10 files changed, 482 insertions(+), 3 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 1d6860a..42c24ef 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -23,6 +23,7 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.grpc.metrics.intercept.client.MetricClientInterceptor;
 import org.apache.ratis.proto.RaftProtos.GroupInfoReplyProto;
 import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto;
 import org.apache.ratis.proto.RaftProtos.GroupListReplyProto;
@@ -123,9 +124,14 @@ public class GrpcClientProtocolClient implements Closeable 
{
     } else {
       channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
     }
+
+    MetricClientInterceptor monitoringInterceptor = new 
MetricClientInterceptor(getName());
+
     channel = channelBuilder.flowControlWindow(flowControlWindow.getSizeInt())
         .maxInboundMessageSize(maxMessageSize.getSizeInt())
+        .intercept(monitoringInterceptor)
         .build();
+
     blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel);
     asyncStub = RaftClientProtocolServiceGrpc.newStub(channel);
     adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(channel);
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/MessageMetrics.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/MessageMetrics.java
new file mode 100644
index 0000000..4056c7a
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/MessageMetrics.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.metrics;
+
+import org.apache.ratis.metrics.MetricRegistryInfo;
+import org.apache.ratis.metrics.RatisMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageMetrics extends RatisMetrics {
+  static final Logger LOG = LoggerFactory.getLogger(MessageMetrics.class);
+  public static final String GRPC_MESSAGE_METRICS = "%s_message_metrics";
+  public static final String GRPC_MESSAGE_METRICS_DESC = "Outbound/Inbound 
message counters";
+
+  public MessageMetrics(String endpointId, String endpointType) {
+    this.registry = create(
+        new MetricRegistryInfo(endpointId,
+            RATIS_APPLICATION_NAME_METRICS,
+            String.format(GRPC_MESSAGE_METRICS, endpointType),
+            GRPC_MESSAGE_METRICS_DESC)
+    );
+  }
+
+  /**
+   * Increments the count of RPCs that are started.
+   * Both client and server use this.
+   * @param rpcType
+   */
+  public void rpcStarted(String rpcType){
+    registry.counter(rpcType + "_started_total").inc();
+  }
+
+  /**
+   * Increments the count of RPCs that were started and got completed.
+   * Both client and server use this.
+   * @param rpcType
+   */
+  public void rpcCompleted(String rpcType){
+    registry.counter(rpcType + "_completed_total").inc();
+  }
+
+  /**
+   * increments the count of RPCs recived on the server.
+   * @param rpcType
+   */
+  public void rpcReceived(String rpcType){
+    registry.counter(rpcType + "_received_executed").inc();
+  }
+
+}
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientCall.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientCall.java
new file mode 100644
index 0000000..9dc09cf
--- /dev/null
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientCall.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.grpc.metrics.intercept.client;
+
+import org.apache.ratis.thirdparty.io.grpc.ClientCall;
+import org.apache.ratis.thirdparty.io.grpc.ForwardingClientCall;
+import org.apache.ratis.thirdparty.io.grpc.Metadata;
+import org.apache.ratis.grpc.metrics.MessageMetrics;
+
+public class MetricClientCall<R, S> extends 
ForwardingClientCall.SimpleForwardingClientCall<R, S> {
+  private final String metricNamePrefix;
+  private final MessageMetrics metrics;
+
+  public MetricClientCall(ClientCall<R, S> delegate,
+                          MessageMetrics metrics,
+                          String metricName){
+    super(delegate);
+    this.metricNamePrefix = metricName;
+    this.metrics = metrics;
+  }
+
+  @Override
+  public void start(ClientCall.Listener<S> delegate, Metadata metadata) {
+    metrics.rpcStarted(metricNamePrefix);
+    super.start(new MetricClientCallListener<>(
+        delegate, metrics, metricNamePrefix), metadata);
+  }
+}
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientCallListener.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientCallListener.java
new file mode 100644
index 0000000..7091e0b
--- /dev/null
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientCallListener.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.metrics.intercept.client;
+
+import org.apache.ratis.thirdparty.io.grpc.ClientCall;
+import org.apache.ratis.thirdparty.io.grpc.ForwardingClientCallListener;
+import org.apache.ratis.thirdparty.io.grpc.Metadata;
+import org.apache.ratis.thirdparty.io.grpc.Status;
+import org.apache.ratis.grpc.metrics.MessageMetrics;
+
+public class MetricClientCallListener<S> extends 
ForwardingClientCallListener<S> {
+  private final String metricNamePrefix;
+  private final MessageMetrics metrics;
+  private final ClientCall.Listener<S> delegate;
+
+  MetricClientCallListener(ClientCall.Listener<S> delegate,
+                           MessageMetrics metrics,
+                           String metricNamePrefix){
+    this.delegate = delegate;
+    this.metricNamePrefix = metricNamePrefix;
+    this.metrics = metrics;
+  }
+
+  @Override
+  protected ClientCall.Listener<S> delegate() {
+    return delegate;
+  }
+
+  @Override
+  public void onClose(Status status, Metadata metadata) {
+    metrics.rpcReceived(metricNamePrefix + "_" + status.getCode().toString());
+    super.onClose(status, metadata);
+  }
+}
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientInterceptor.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientInterceptor.java
new file mode 100644
index 0000000..7597687
--- /dev/null
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientInterceptor.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.grpc.metrics.intercept.client;
+
+import org.apache.ratis.grpc.metrics.MessageMetrics;
+import org.apache.ratis.thirdparty.io.grpc.*;
+
+/**
+ * An implementation of a client interceptor.
+ * Intercepts the messages and increments metrics accordingly
+ * before sending them.
+ */
+
+public class MetricClientInterceptor implements ClientInterceptor {
+  private final String identifier;
+  private final MessageMetrics metrics;
+
+  public MetricClientInterceptor(String identifier){
+    this.identifier = identifier;
+    this.metrics = new MessageMetrics(identifier, "client");
+  }
+
+  private String getMethodMetricPrefix(MethodDescriptor<?, ?> method){
+    String serviceName = 
MethodDescriptor.extractFullServiceName(method.getFullMethodName());
+    String methodName = 
method.getFullMethodName().substring(serviceName.length() + 1);
+    return identifier + "_" + serviceName + "_" + methodName;
+  }
+
+  @Override
+  public <ReqT, RespT> ClientCall<ReqT, RespT> 
interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor,
+                                                             CallOptions 
callOptions,
+                                                             Channel channel) {
+
+    return new MetricClientCall<>(
+        channel.newCall(methodDescriptor, callOptions),
+        metrics,
+        getMethodMetricPrefix(methodDescriptor)
+    );
+  }
+}
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerCall.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerCall.java
new file mode 100644
index 0000000..5eb3f29
--- /dev/null
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerCall.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.grpc.metrics.intercept.server;
+
+import org.apache.ratis.thirdparty.io.grpc.Metadata;
+import org.apache.ratis.thirdparty.io.grpc.Status;
+import org.apache.ratis.thirdparty.io.grpc.ForwardingServerCall;
+import org.apache.ratis.thirdparty.io.grpc.ServerCall;
+import org.apache.ratis.grpc.metrics.MessageMetrics;
+
+class MetricServerCall<R,S> extends 
ForwardingServerCall.SimpleForwardingServerCall<R,S> {
+  private final MessageMetrics metrics;
+  private final String metricNamPrefix;
+  private final ServerCall<R,S> delegate;
+
+  MetricServerCall(ServerCall<R,S> delegate,
+                       String metricNamPrefix,
+                       MessageMetrics metrics){
+    super(delegate);
+    this.delegate = delegate;
+    this.metricNamPrefix = metricNamPrefix;
+    this.metrics = metrics;
+
+    metrics.rpcStarted(metricNamPrefix);
+  }
+
+  @Override
+  public void close(Status status, Metadata responseHeaders) {
+    metrics.rpcCompleted(metricNamPrefix + "_" + status.getCode().toString());
+    super.close(status, responseHeaders);
+  }
+
+}
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerCallListener.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerCallListener.java
new file mode 100644
index 0000000..e0648c6
--- /dev/null
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerCallListener.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.grpc.metrics.intercept.server;
+
+import org.apache.ratis.thirdparty.io.grpc.ForwardingServerCallListener;
+import org.apache.ratis.thirdparty.io.grpc.ServerCall;
+import org.apache.ratis.grpc.metrics.MessageMetrics;
+
+public class MetricServerCallListener<R> extends 
ForwardingServerCallListener<R> {
+  private final ServerCall.Listener<R> delegate;
+  private final String metricNamePrefix;
+  private MessageMetrics metrics;
+
+  MetricServerCallListener(
+      ServerCall.Listener<R> delegate,
+      String metricNamePrefix,
+      MessageMetrics metrics
+  ){
+    this.delegate = delegate;
+    this.metricNamePrefix = metricNamePrefix;
+    this.metrics = metrics;
+  }
+
+  @Override
+  protected ServerCall.Listener<R> delegate() {
+    return delegate;
+  }
+}
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerInterceptor.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerInterceptor.java
new file mode 100644
index 0000000..db19677
--- /dev/null
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerInterceptor.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.grpc.metrics.intercept.server;
+
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.thirdparty.io.grpc.Metadata;
+import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor;
+import org.apache.ratis.thirdparty.io.grpc.ServerCall;
+import org.apache.ratis.thirdparty.io.grpc.ServerCallHandler;
+import org.apache.ratis.grpc.metrics.MessageMetrics;
+import org.apache.ratis.thirdparty.io.grpc.ServerInterceptor;
+
+import java.util.function.Supplier;
+
+/**
+ * An implementation of a server interceptor.
+ * Intercepts the inbound/outbound messages and increments metrics accordingly
+ * before handling them.
+ */
+
+public class MetricServerInterceptor implements ServerInterceptor {
+  private String identifier;
+  private MessageMetrics metrics;
+  private final Supplier<RaftPeerId> peerIdSupplier;
+  private final String defaultIdentifier;
+
+  public MessageMetrics getMetrics() {
+    return metrics;
+  }
+
+  public MetricServerInterceptor(Supplier<RaftPeerId> idSupplier, String 
defaultIdentifier){
+    this.peerIdSupplier = idSupplier;
+    this.identifier = null;
+    this.defaultIdentifier = defaultIdentifier;
+  }
+
+  private String getMethodMetricPrefix(MethodDescriptor<?, ?> method){
+    String serviceName = 
MethodDescriptor.extractFullServiceName(method.getFullMethodName());
+    String methodName = 
method.getFullMethodName().substring(serviceName.length() + 1);
+    return identifier + "_" + serviceName + "_" + methodName;
+  }
+
+  @Override
+  public <R, S> ServerCall.Listener<R> interceptCall(
+      ServerCall<R, S> call,
+      Metadata requestHeaders,
+      ServerCallHandler<R, S> next) {
+    MethodDescriptor<R, S> method = call.getMethodDescriptor();
+    if(identifier == null){
+      try {
+        identifier = peerIdSupplier.get().toString();
+      } catch (Exception e){
+        identifier = defaultIdentifier;
+      }
+      metrics = new MessageMetrics(identifier, "server");
+    }
+    String metricNamePrefix = getMethodMetricPrefix(method);
+    ServerCall<R,S> monitoringCall = new MetricServerCall<>(call, 
metricNamePrefix, metrics);
+    return new MetricServerCallListener<>(
+        next.startCall(monitoringCall, requestHeaders), metricNamePrefix, 
metrics);
+  }
+}
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index f4d71f2..a352bc1 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -20,6 +20,7 @@ package org.apache.ratis.grpc.server;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.grpc.client.GrpcClientProtocolService;
+import org.apache.ratis.grpc.metrics.intercept.server.MetricServerInterceptor;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
@@ -27,6 +28,7 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.impl.RaftServerRpcWithProxy;
+import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
 import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
 import org.apache.ratis.thirdparty.io.grpc.Server;
@@ -86,6 +88,12 @@ public final class GrpcService extends 
RaftServerRpcWithProxy<GrpcServerProtocol
 
   private final GrpcClientProtocolService clientProtocolService;
 
+  private final MetricServerInterceptor serverInterceptor;
+
+  public MetricServerInterceptor getServerInterceptor() {
+    return serverInterceptor;
+  }
+
   private GrpcService(RaftServer server, GrpcTlsConfig tlsConfig) {
     this(server, server::getId,
         GrpcConfigKeys.Server.port(server.getProperties()),
@@ -111,13 +119,22 @@ public final class GrpcService extends 
RaftServerRpcWithProxy<GrpcServerProtocol
 
     this.clientProtocolService = new GrpcClientProtocolService(idSupplier, 
raftServer);
 
+    this.serverInterceptor = new MetricServerInterceptor(
+        idSupplier,
+        getClass().getSimpleName() + "_" + Integer.toString(port)
+    );
+
     NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
         .withChildOption(ChannelOption.SO_REUSEADDR, true)
         .maxInboundMessageSize(grpcMessageSizeMax.getSizeInt())
         .flowControlWindow(flowControlWindow.getSizeInt())
-        .addService(new GrpcServerProtocolService(idSupplier, raftServer))
-        .addService(clientProtocolService)
-        .addService(new GrpcAdminProtocolService(raftServer));
+        .addService(ServerInterceptors.intercept(
+            new GrpcServerProtocolService(idSupplier, raftServer),
+            serverInterceptor))
+        .addService(ServerInterceptors.intercept(clientProtocolService, 
serverInterceptor))
+        .addService(ServerInterceptors.intercept(
+            new GrpcAdminProtocolService(raftServer),
+            serverInterceptor));
 
     if (tlsConfig != null) {
       SslContextBuilder sslContextBuilder =
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcMessageMetrics.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcMessageMetrics.java
new file mode 100644
index 0000000..1757874
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcMessageMetrics.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.grpc.server.GrpcService;
+import org.apache.ratis.metrics.JVMMetrics;
+import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+public class TestGrpcMessageMetrics extends BaseTest
+    implements MiniRaftClusterWithGrpc.FactoryGet {
+  static {
+    JVMMetrics.initJvmMetrics(TimeDuration.valueOf(10, TimeUnit.SECONDS));
+  }
+
+  public static final int NUM_SERVERS = 3;
+
+  @Test
+  public void testGrpcMessageMetrics() throws Exception {
+    try(final MiniRaftCluster cluster = newCluster(NUM_SERVERS)) {
+      cluster.start();
+      sendMessages(cluster);
+    }
+  }
+
+  static void sendMessages(MiniRaftCluster cluster) throws Exception {
+    waitForLeader(cluster);
+    try (final RaftClient client = cluster.createClient()) {
+      CompletableFuture<RaftClientReply> replyFuture =  client.sendAsync(new 
RaftTestUtil.SimpleMessage("abc"));
+    }
+    // Wait for commits to happen on leader
+    JavaUtils.attempt(() -> assertMessageCount(cluster.getLeader()), 100, 
HUNDRED_MILLIS, cluster.getLeader().getId() + "-assertMessageCount", null);
+  }
+
+  static void assertMessageCount(RaftServerImpl server) throws  Exception {
+    String serverId = server.getId().toString();
+    GrpcService service = (GrpcService)(server.getProxy().getServerRpc());
+    RatisMetricRegistry registry = 
service.getServerInterceptor().getMetrics().getRegistry();
+    String counter_prefix = serverId + "_" + 
"ratis.grpc.RaftServerProtocolService";
+    Assert.assertTrue(registry.counter(counter_prefix + "_" + "requestVote" + 
"_OK_completed_total").getCount() > 0);
+  }
+}
\ No newline at end of file

Reply via email to