sijie closed pull request #1731: [table service][stats] enable stats for grpc 
calls on both client and server
URL: https://github.com/apache/bookkeeper/pull/1731
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-benchmark/pom.xml b/bookkeeper-benchmark/pom.xml
index 163c1bd5f2..9b86140719 100644
--- a/bookkeeper-benchmark/pom.xml
+++ b/bookkeeper-benchmark/pom.xml
@@ -52,13 +52,20 @@
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>bookkeeper-server</artifactId>
-      <version>${project.parent.version}</version>
+      <version>${project.version}</version>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-common</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>bookkeeper-server</artifactId>
-      <version>${project.parent.version}</version>
+      <version>${project.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
 
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
similarity index 97%
rename from 
bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
rename to 
bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
index 9b9fdb54dd..aca154ec0c 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
+++ 
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,7 +15,6 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
 
 package org.apache.bookkeeper.test;
@@ -103,12 +101,12 @@ public Long getMax() {
 
         @Override
         public void registerFailedEvent(long eventLatency, TimeUnit unit) {
-            registerFailedValue(unit.convert(eventLatency, 
TimeUnit.NANOSECONDS));
+            registerFailedValue(TimeUnit.NANOSECONDS.convert(eventLatency, 
unit));
         }
 
         @Override
         public void registerSuccessfulEvent(long eventLatency, TimeUnit unit) {
-            registerSuccessfulValue(unit.convert(eventLatency, 
TimeUnit.NANOSECONDS));
+            registerSuccessfulValue(TimeUnit.NANOSECONDS.convert(eventLatency, 
unit));
         }
 
         @Override
diff --git a/metadata-drivers/etcd/pom.xml b/metadata-drivers/etcd/pom.xml
index 9e463bd89b..0d9e0dd164 100644
--- a/metadata-drivers/etcd/pom.xml
+++ b/metadata-drivers/etcd/pom.xml
@@ -54,6 +54,13 @@
             <artifactId>testcontainers</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.bookkeeper</groupId>
+            <artifactId>bookkeeper-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.bookkeeper</groupId>
             <artifactId>bookkeeper-server</artifactId>
diff --git 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
index e78ca73e83..b5541becbe 100644
--- 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
+++ 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
@@ -23,6 +23,7 @@
 import org.apache.bookkeeper.clients.utils.ClientConstants;
 import org.apache.bookkeeper.common.net.ServiceURI;
 import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.inferred.freebuilder.FreeBuilder;
 
 /**
@@ -72,6 +73,13 @@
      */
     Optional<String> clientName();
 
+    /**
+     * Configure a stats logger to collect stats exposed by this client.
+     *
+     * @return stats logger.
+     */
+    Optional<StatsLogger> statsLogger();
+
     /**
      * Configure a backoff policy for the client.
      *
diff --git 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index e5206de6a3..afacce18b7 100644
--- 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -31,6 +31,7 @@
 import 
org.apache.bookkeeper.clients.impl.container.StorageContainerClientInterceptor;
 import org.apache.bookkeeper.clients.resolver.EndpointResolver;
 import org.apache.bookkeeper.clients.utils.GrpcUtils;
+import org.apache.bookkeeper.common.grpc.stats.MonitoringClientInterceptor;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc;
 import 
org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceFutureStub;
@@ -49,11 +50,24 @@
 public class StorageServerChannel implements AutoCloseable {
 
     public static Function<Endpoint, StorageServerChannel> 
factory(StorageClientSettings settings) {
-        return (endpoint) -> new StorageServerChannel(
-            endpoint,
-            Optional.empty(),
-            settings.usePlaintext(),
-            settings.endpointResolver());
+        return new Function<Endpoint, StorageServerChannel>() {
+
+            private final Optional<MonitoringClientInterceptor> interceptor =
+                settings.statsLogger().map(statsLogger ->
+                    MonitoringClientInterceptor.create(statsLogger, true));
+
+            @Override
+            public StorageServerChannel apply(Endpoint endpoint) {
+                StorageServerChannel channel = new StorageServerChannel(
+                    endpoint,
+                    Optional.empty(),
+                    settings.usePlaintext(),
+                    settings.endpointResolver());
+                return interceptor
+                    .map(interceptor -> channel.intercept(interceptor))
+                    .orElse(channel);
+            }
+        };
     }
 
     private final Optional<String> token;
diff --git a/stream/common/pom.xml b/stream/common/pom.xml
index 40c8b07dde..c5d0339fe7 100644
--- a/stream/common/pom.xml
+++ b/stream/common/pom.xml
@@ -47,6 +47,13 @@
       <!-- this should not be bundled in binary distributions -->
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-common</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.bookkeeper.tests</groupId>
       <artifactId>stream-storage-tests-common</artifactId>
diff --git 
a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/ClientStats.java
 
b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/ClientStats.java
new file mode 100644
index 0000000000..2b7ba21648
--- /dev/null
+++ 
b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/ClientStats.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.MethodDescriptor;
+import io.grpc.MethodDescriptor.MethodType;
+import io.grpc.Status.Code;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Client side monitoring for grpc services.
+ */
+class ClientStats {
+
+    private final Counter rpcStarted;
+    private final Counter rpcCompleted;
+    private final Counter streamMessagesReceived;
+    private final Counter streamMessagesSent;
+    private final Optional<OpStatsLogger> completedLatencyMicros;
+
+    private ClientStats(StatsLogger rootStatsLogger,
+                        boolean includeLatencyHistograms,
+                        boolean streamRequests,
+                        boolean streamResponses) {
+        this.rpcStarted = rootStatsLogger.getCounter("grpc_started");
+        this.rpcCompleted = rootStatsLogger.getCounter("grpc_completed");
+        if (streamResponses) {
+            this.streamMessagesReceived = 
rootStatsLogger.getCounter("grpc_msg_received");
+        } else {
+            this.streamMessagesReceived = 
NullStatsLogger.INSTANCE.getCounter("grpc_msg_received");
+        }
+        if (streamRequests) {
+            this.streamMessagesSent = 
rootStatsLogger.getCounter("grpc_msg_sent");
+        } else {
+            this.streamMessagesSent = 
NullStatsLogger.INSTANCE.getCounter("grpc_msg_sent");
+        }
+        if (includeLatencyHistograms) {
+            this.completedLatencyMicros = Optional.of(
+                rootStatsLogger.getOpStatsLogger("grpc_latency_micros")
+            );
+        } else {
+            this.completedLatencyMicros = Optional.empty();
+        }
+    }
+
+    public void recordCallStarted() {
+        rpcStarted.inc();
+    }
+
+    public void recordClientHandled(Code code) {
+        rpcCompleted.inc();
+    }
+
+    public void recordStreamMessageSent() {
+        streamMessagesSent.inc();
+    }
+
+    public void recordStreamMessageReceived() {
+        streamMessagesReceived.inc();
+    }
+
+    public boolean shouldRecordLatency() {
+        return completedLatencyMicros.isPresent();
+    }
+
+    public void recordLatency(boolean success, long latencyMicros) {
+        completedLatencyMicros.ifPresent(latencyLogger -> {
+            if (success) {
+                latencyLogger.registerSuccessfulEvent(latencyMicros, 
TimeUnit.MICROSECONDS);
+            } else {
+                latencyLogger.registerFailedEvent(latencyMicros, 
TimeUnit.MICROSECONDS);
+            }
+        });
+    }
+
+    /**
+     * Knows how to produce {@link ClientStats} instances for individual 
methods.
+     */
+    static class Factory {
+
+        private final boolean includeLatencyHistograms;
+
+        Factory(boolean includeLatencyHistograms) {
+            this.includeLatencyHistograms = includeLatencyHistograms;
+        }
+
+        /** Creates a {@link ClientStats} for the supplied method. */
+        <ReqT, RespT> ClientStats 
createMetricsForMethod(MethodDescriptor<ReqT, RespT> methodDescriptor,
+                                                         StatsLogger 
statsLogger) {
+
+            String fullMethodName = methodDescriptor.getFullMethodName();
+            String serviceName = 
MethodDescriptor.extractFullServiceName(fullMethodName);
+            String methodName = fullMethodName.substring(serviceName.length() 
+ 1);
+
+            MethodType type = methodDescriptor.getType();
+            return new ClientStats(
+                statsLogger.scope(methodName),
+                includeLatencyHistograms,
+                type == MethodType.CLIENT_STREAMING || type == 
MethodType.BIDI_STREAMING,
+                type == MethodType.SERVER_STREAMING || type == 
MethodType.BIDI_STREAMING);
+        }
+    }
+
+}
diff --git 
a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientCall.java
 
b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientCall.java
new file mode 100644
index 0000000000..763ca98cb8
--- /dev/null
+++ 
b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientCall.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.ClientCall;
+import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import io.grpc.Metadata;
+
+/**
+ * A {@link SimpleForwardingClientCall} which increments counters for rpc 
calls.
+ */
+class MonitoringClientCall<ReqT, RespT> extends 
SimpleForwardingClientCall<ReqT, RespT> {
+
+    private final ClientStats stats;
+
+    MonitoringClientCall(ClientCall<ReqT, RespT> delegate,
+                         ClientStats stats) {
+        super(delegate);
+        this.stats = stats;
+    }
+
+    @Override
+    public void start(Listener<RespT> responseListener, Metadata headers) {
+        stats.recordCallStarted();
+        super.start(new MonitoringClientCallListener<>(
+            responseListener, stats
+        ), headers);
+    }
+
+    @Override
+    public void sendMessage(ReqT message) {
+        stats.recordStreamMessageSent();
+        super.sendMessage(message);
+    }
+}
diff --git 
a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientCallListener.java
 
b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientCallListener.java
new file mode 100644
index 0000000000..651439e01a
--- /dev/null
+++ 
b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientCallListener.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.ClientCall.Listener;
+import io.grpc.ForwardingClientCallListener;
+import io.grpc.Metadata;
+import io.grpc.Status;
+import org.apache.bookkeeper.common.util.MathUtils;
+
+/**
+ * A {@link ForwardingClientCallListener} that monitors stats on grpc clients.
+ */
+class MonitoringClientCallListener<RespT> extends 
ForwardingClientCallListener<RespT> {
+
+    private final Listener<RespT> delegate;
+    private final ClientStats stats;
+    private final long startNanos;
+
+    MonitoringClientCallListener(Listener<RespT> delegate,
+                                 ClientStats stats) {
+        this.delegate = delegate;
+        this.stats = stats;
+        this.startNanos = MathUtils.nowInNano();
+    }
+
+    @Override
+    protected Listener<RespT> delegate() {
+        return delegate;
+    }
+
+    @Override
+    public void onMessage(RespT message) {
+        stats.recordStreamMessageReceived();
+        super.onMessage(message);
+    }
+
+    @Override
+    public void onClose(Status status, Metadata trailers) {
+        stats.recordClientHandled(status.getCode());
+        if (stats.shouldRecordLatency()) {
+            long latencyMicros = MathUtils.elapsedMicroSec(startNanos);
+            stats.recordLatency(Status.OK == status, latencyMicros);
+        }
+        super.onClose(status, trailers);
+    }
+
+    @Override
+    public String toString() {
+        return delegate.toString();
+    }
+}
diff --git 
a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientInterceptor.java
 
b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientInterceptor.java
new file mode 100644
index 0000000000..868f249ccf
--- /dev/null
+++ 
b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringClientInterceptor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.MethodDescriptor;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.common.grpc.stats.ClientStats.Factory;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * A {@link ClientInterceptor} that sends stats about grpc calls to stats 
logger.
+ */
+public class MonitoringClientInterceptor implements ClientInterceptor {
+
+    /**
+     * Create a monitoring client interceptor with provided stats logger and 
configuration.
+     *
+     * @param statsLogger stats logger to collect grpc stats
+     * @param includeLatencyHistograms flag indicates whether to include 
latency histograms or not
+     * @return a monitoring client interceptor
+     */
+    public static MonitoringClientInterceptor create(StatsLogger statsLogger,
+                                                     boolean 
includeLatencyHistograms) {
+        return new MonitoringClientInterceptor(
+            new Factory(includeLatencyHistograms), statsLogger);
+    }
+
+    private final Factory statsFactory;
+    private final StatsLogger statsLogger;
+    private final ConcurrentMap<String, ClientStats> methods;
+
+    private MonitoringClientInterceptor(Factory statsFactory,
+                                        StatsLogger statsLogger) {
+        this.statsFactory = statsFactory;
+        this.statsLogger = statsLogger;
+        this.methods = new ConcurrentHashMap<>();
+    }
+
+    private ClientStats getMethodStats(MethodDescriptor<?, ?> method) {
+        ClientStats stats = methods.get(method.getFullMethodName());
+        if (null != stats) {
+            return stats;
+        }
+        ClientStats newStats = statsFactory.createMetricsForMethod(method, 
statsLogger);
+        ClientStats oldStats = methods.putIfAbsent(method.getFullMethodName(), 
newStats);
+        if (null != oldStats) {
+            return oldStats;
+        } else {
+            return newStats;
+        }
+    }
+
+    @Override
+    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel 
next) {
+        ClientStats stats = getMethodStats(method);
+        return new MonitoringClientCall<>(
+            next.newCall(method, callOptions),
+            stats
+        );
+    }
+}
diff --git 
a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerCall.java
 
b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerCall.java
new file mode 100644
index 0000000000..abb0969b07
--- /dev/null
+++ 
b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerCall.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.Status;
+import org.apache.bookkeeper.common.util.MathUtils;
+
+/**
+ * A {@link SimpleForwardingServerCall} which increments counters for rpc 
calls.
+ */
+class MonitoringServerCall<ReqT, RespT> extends 
SimpleForwardingServerCall<ReqT, RespT> {
+
+    private final ServerStats stats;
+    private final long startNanos;
+
+    MonitoringServerCall(ServerCall<ReqT, RespT> delegate,
+                         ServerStats stats) {
+        super(delegate);
+        this.stats = stats;
+        this.startNanos = MathUtils.nowInNano();
+        stats.recordCallStarted();
+    }
+
+    @Override
+    public void sendMessage(RespT message) {
+        stats.recordStreamMessageSent();
+        super.sendMessage(message);
+    }
+
+    @Override
+    public void close(Status status, Metadata trailers) {
+        stats.recordServerHandled(status.getCode());
+        if (stats.shouldRecordLatency()) {
+            long latencyMicros = MathUtils.elapsedMicroSec(startNanos);
+            stats.recordLatency(Status.OK == status, latencyMicros);
+        }
+        super.close(status, trailers);
+    }
+}
diff --git 
a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerCallListener.java
 
b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerCallListener.java
new file mode 100644
index 0000000000..6eef140c20
--- /dev/null
+++ 
b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerCallListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.ForwardingServerCallListener;
+import io.grpc.ServerCall.Listener;
+
+/**
+ * A {@link ForwardingServerCallListener} that monitors stats on grpc clients.
+ */
+class MonitoringServerCallListener<RespT> extends 
ForwardingServerCallListener<RespT> {
+
+    private final Listener<RespT> delegate;
+    private final ServerStats stats;
+
+    MonitoringServerCallListener(Listener<RespT> delegate,
+                                 ServerStats stats) {
+        this.delegate = delegate;
+        this.stats = stats;
+    }
+
+    @Override
+    protected Listener<RespT> delegate() {
+        return delegate;
+    }
+
+    @Override
+    public void onMessage(RespT message) {
+        stats.recordStreamMessageReceived();
+        super.onMessage(message);
+    }
+
+    @Override
+    public String toString() {
+        return delegate.toString();
+    }
+}
diff --git 
a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerInterceptor.java
 
b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerInterceptor.java
new file mode 100644
index 0000000000..1c3ed25529
--- /dev/null
+++ 
b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/MonitoringServerInterceptor.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.ServerCall;
+import io.grpc.ServerCall.Listener;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.common.grpc.stats.ServerStats.Factory;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * A {@link ServerInterceptor} that sends stats about grpc calls to stats 
logger.
+ */
+public class MonitoringServerInterceptor implements ServerInterceptor {
+
+    /**
+     * Create a monitoring client interceptor with provided stats logger and 
configuration.
+     *
+     * @param statsLogger stats logger to collect grpc stats
+     * @param includeLatencyHistograms flag indicates whether to include 
latency histograms or not
+     * @return a monitoring client interceptor
+     */
+    public static MonitoringServerInterceptor create(StatsLogger statsLogger,
+                                                     boolean 
includeLatencyHistograms) {
+        return new MonitoringServerInterceptor(
+            new Factory(includeLatencyHistograms), statsLogger);
+    }
+
+    private final Factory statsFactory;
+    private final StatsLogger statsLogger;
+    private final ConcurrentMap<String, ServerStats> methods;
+
+    private MonitoringServerInterceptor(Factory statsFactory,
+                                        StatsLogger statsLogger) {
+        this.statsFactory = statsFactory;
+        this.statsLogger = statsLogger;
+        this.methods = new ConcurrentHashMap<>();
+    }
+
+    private ServerStats getMethodStats(MethodDescriptor<?, ?> method) {
+        ServerStats stats = methods.get(method.getFullMethodName());
+        if (null != stats) {
+            return stats;
+        }
+        ServerStats newStats = statsFactory.createMetricsForMethod(method, 
statsLogger);
+        ServerStats oldStats = methods.putIfAbsent(method.getFullMethodName(), 
newStats);
+        if (null != oldStats) {
+            return oldStats;
+        } else {
+            return newStats;
+        }
+    }
+
+
+    @Override
+    public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> 
call,
+                                                      Metadata headers,
+                                                      ServerCallHandler<ReqT, 
RespT> next) {
+        MethodDescriptor<ReqT, RespT> method = call.getMethodDescriptor();
+        ServerStats stats = getMethodStats(method);
+        ServerCall<ReqT, RespT> monitoringCall = new 
MonitoringServerCall<>(call, stats);
+        return new MonitoringServerCallListener<>(
+            next.startCall(monitoringCall, headers), stats
+        );
+    }
+}
diff --git 
a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/ServerStats.java
 
b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/ServerStats.java
new file mode 100644
index 0000000000..eae8348eae
--- /dev/null
+++ 
b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/ServerStats.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import io.grpc.MethodDescriptor;
+import io.grpc.MethodDescriptor.MethodType;
+import io.grpc.Status.Code;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Client side monitoring for grpc services.
+ */
+class ServerStats {
+
+    private final Counter rpcStarted;
+    private final Counter rpcCompleted;
+    private final Counter streamMessagesReceived;
+    private final Counter streamMessagesSent;
+    private final Optional<OpStatsLogger> completedLatencyMicros;
+
+    private ServerStats(StatsLogger rootStatsLogger,
+                        boolean includeLatencyHistograms,
+                        boolean streamRequests,
+                        boolean streamResponses) {
+        this.rpcStarted = rootStatsLogger.getCounter("grpc_started");
+        this.rpcCompleted = rootStatsLogger.getCounter("grpc_completed");
+        if (streamRequests) {
+            this.streamMessagesReceived = 
rootStatsLogger.getCounter("grpc_msg_received");
+        } else {
+            this.streamMessagesReceived = 
NullStatsLogger.INSTANCE.getCounter("grpc_msg_received");
+        }
+        if (streamResponses) {
+            this.streamMessagesSent = 
rootStatsLogger.getCounter("grpc_msg_sent");
+        } else {
+            this.streamMessagesSent = 
NullStatsLogger.INSTANCE.getCounter("grpc_msg_sent");
+        }
+        if (includeLatencyHistograms) {
+            this.completedLatencyMicros = Optional.of(
+                rootStatsLogger.getOpStatsLogger("grpc_latency_micros")
+            );
+        } else {
+            this.completedLatencyMicros = Optional.empty();
+        }
+    }
+
+    public void recordCallStarted() {
+        rpcStarted.inc();
+    }
+
+    public void recordServerHandled(Code code) {
+        rpcCompleted.inc();
+    }
+
+    public void recordStreamMessageSent() {
+        streamMessagesSent.inc();
+    }
+
+    public void recordStreamMessageReceived() {
+        streamMessagesReceived.inc();
+    }
+
+    public boolean shouldRecordLatency() {
+        return completedLatencyMicros.isPresent();
+    }
+
+    public void recordLatency(boolean success, long latencyMicros) {
+        completedLatencyMicros.ifPresent(latencyLogger -> {
+            if (success) {
+                latencyLogger.registerSuccessfulEvent(latencyMicros, 
TimeUnit.MICROSECONDS);
+            } else {
+                latencyLogger.registerFailedEvent(latencyMicros, 
TimeUnit.MICROSECONDS);
+            }
+        });
+    }
+
+    /**
+     * Knows how to produce {@link ServerStats} instances for individual 
methods.
+     */
+    static class Factory {
+
+        private final boolean includeLatencyHistograms;
+
+        Factory(boolean includeLatencyHistograms) {
+            this.includeLatencyHistograms = includeLatencyHistograms;
+        }
+
+        /** Creates a {@link ServerStats} for the supplied method. */
+        <ReqT, RespT> ServerStats 
createMetricsForMethod(MethodDescriptor<ReqT, RespT> methodDescriptor,
+                                                         StatsLogger 
statsLogger) {
+
+            String fullMethodName = methodDescriptor.getFullMethodName();
+            String serviceName = 
MethodDescriptor.extractFullServiceName(fullMethodName);
+            String methodName = fullMethodName.substring(serviceName.length() 
+ 1);
+
+            MethodType type = methodDescriptor.getType();
+            return new ServerStats(
+                statsLogger.scope(methodName),
+                includeLatencyHistograms,
+                type == MethodType.CLIENT_STREAMING || type == 
MethodType.BIDI_STREAMING,
+                type == MethodType.SERVER_STREAMING || type == 
MethodType.BIDI_STREAMING);
+        }
+    }
+
+}
diff --git 
a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/package-info.java
 
b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/package-info.java
new file mode 100644
index 0000000000..a3e2d3ee6e
--- /dev/null
+++ 
b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/stats/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Collecting grpc related stats.
+ */
+package org.apache.bookkeeper.common.grpc.stats;
\ No newline at end of file
diff --git 
a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/ClientStatsTest.java
 
b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/ClientStatsTest.java
new file mode 100644
index 0000000000..d5cf2d356a
--- /dev/null
+++ 
b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/ClientStatsTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import static org.junit.Assert.assertEquals;
+
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.common.grpc.stats.ClientStats.Factory;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ClientStats}.
+ */
+public class ClientStatsTest {
+
+    private Factory factoryWithHistograms;
+    private Factory factoryWithoutHistograms;
+    private TestStatsProvider statsProvider;
+
+    @Before
+    public void setup() {
+        this.statsProvider = new TestStatsProvider();
+        this.factoryWithHistograms = new Factory(true);
+        this.factoryWithoutHistograms = new Factory(false);
+    }
+
+    @Test
+    public void testClientStatsWithHistogram() {
+        testClientStats(factoryWithHistograms, true);
+    }
+
+    @Test
+    public void testClientStatsWithoutHistogram() {
+        testClientStats(factoryWithoutHistograms, false);
+    }
+
+    private void testClientStats(Factory clientStatsFactory,
+                                 boolean includeLatencyHistogram) {
+        // test unary method
+        MethodDescriptor<?, ?> unaryMethod = 
PingPongServiceGrpc.getPingPongMethod();
+        testClientStats(
+            clientStatsFactory,
+            includeLatencyHistogram,
+            unaryMethod,
+            "PingPong",
+            "unary",
+            1,
+            1,
+            0,
+            0
+        );
+        // test client streaming
+        MethodDescriptor<?, ?> clientStreamingMethod = 
PingPongServiceGrpc.getLotsOfPingsMethod();
+        testClientStats(
+            clientStatsFactory,
+            includeLatencyHistogram,
+            clientStreamingMethod,
+            "LotsOfPings",
+            "client_streaming",
+            1,
+            1,
+            1,
+            0
+        );
+        // test server streaming
+        MethodDescriptor<?, ?> serverStreamingMethod = 
PingPongServiceGrpc.getLotsOfPongsMethod();
+        testClientStats(
+            clientStatsFactory,
+            includeLatencyHistogram,
+            serverStreamingMethod,
+            "LotsOfPongs",
+            "server_streaming",
+            1,
+            1,
+            0,
+            2
+        );
+        // test server streaming
+        MethodDescriptor<?, ?> biStreamingMethod = 
PingPongServiceGrpc.getBidiPingPongMethod();
+        testClientStats(
+            clientStatsFactory,
+            includeLatencyHistogram,
+            biStreamingMethod,
+            "BidiPingPong",
+            "bidi_streaming",
+            1,
+            1,
+            1,
+            2
+        );
+    }
+
+    private void testClientStats(Factory clientStatsFactory,
+                                 boolean includeLatencyHistogram,
+                                 MethodDescriptor<?, ?> method,
+                                 String methodName,
+                                 String statsScope,
+                                 long expectedCallStarted,
+                                 long expectedCallCompleted,
+                                 long expectedStreamMsgsSent,
+                                 long expectedStreamMsgsReceived) {
+        StatsLogger statsLogger = statsProvider.getStatsLogger(statsScope);
+        ClientStats unaryStats = clientStatsFactory.createMetricsForMethod(
+            method,
+            statsLogger
+        );
+        unaryStats.recordCallStarted();
+        assertEquals(
+            expectedCallStarted,
+            
statsLogger.scope(methodName).getCounter("grpc_started").get().longValue());
+        unaryStats.recordClientHandled(Status.OK.getCode());
+        assertEquals(
+            expectedCallCompleted,
+            
statsLogger.scope(methodName).getCounter("grpc_completed").get().longValue());
+        unaryStats.recordStreamMessageSent();
+        assertEquals(
+            expectedStreamMsgsSent,
+            
statsLogger.scope(methodName).getCounter("grpc_msg_sent").get().longValue());
+        unaryStats.recordStreamMessageReceived();
+        unaryStats.recordStreamMessageReceived();
+        assertEquals(
+            expectedStreamMsgsReceived,
+            
statsLogger.scope(methodName).getCounter("grpc_msg_received").get().longValue());
+        long latencyMicros = 12345L;
+        unaryStats.recordLatency(true, latencyMicros);
+        TestOpStatsLogger opStatsLogger =
+            (TestOpStatsLogger) 
statsLogger.scope(methodName).getOpStatsLogger("grpc_latency_micros");
+        if (includeLatencyHistogram) {
+            assertEquals(1, opStatsLogger.getSuccessCount());
+            assertEquals(
+                TimeUnit.MICROSECONDS.toNanos(latencyMicros),
+                (long) opStatsLogger.getSuccessAverage());
+        } else {
+            assertEquals(0, opStatsLogger.getSuccessCount());
+            assertEquals(0, (long) opStatsLogger.getSuccessAverage());
+        }
+    }
+
+}
diff --git 
a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/GrpcStatsIntegrationTest.java
 
b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/GrpcStatsIntegrationTest.java
new file mode 100644
index 0000000000..e34f30e1d2
--- /dev/null
+++ 
b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/GrpcStatsIntegrationTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import io.grpc.Channel;
+import io.grpc.ClientInterceptors;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.ServerInterceptors;
+import io.grpc.ServerServiceDefinition;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.util.MutableHandlerRegistry;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
+import org.apache.bookkeeper.tests.rpc.PingPongService;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc;
+import 
org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc.PingPongServiceBlockingStub;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc.PingPongServiceStub;
+import org.bookkeeper.tests.proto.rpc.PingRequest;
+import org.bookkeeper.tests.proto.rpc.PongResponse;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * End-to-end integration test on grpc stats.
+ */
+public class GrpcStatsIntegrationTest {
+
+    private static final int NUM_PONGS_PER_PING = 10;
+    private static final String SERVICE_NAME = "pingpong";
+
+    private Server server;
+    private PingPongService service;
+    private ManagedChannel channel;
+    private Channel monitoredChannel;
+    private PingPongServiceBlockingStub client;
+    private PingPongServiceStub clientNonBlocking;
+    private TestStatsProvider statsProvider;
+    private TestStatsLogger clientStatsLogger;
+    private TestStatsLogger serverStatsLogger;
+
+
+    @Before
+    public void setup() throws Exception {
+        statsProvider = new TestStatsProvider();
+        clientStatsLogger = statsProvider.getStatsLogger("client");
+        serverStatsLogger = statsProvider.getStatsLogger("server");
+        service = new PingPongService(NUM_PONGS_PER_PING);
+        ServerServiceDefinition monitoredService = 
ServerInterceptors.intercept(
+            service,
+            MonitoringServerInterceptor.create(serverStatsLogger, true)
+        );
+        MutableHandlerRegistry registry = new MutableHandlerRegistry();
+        server = InProcessServerBuilder
+            .forName(SERVICE_NAME)
+            .fallbackHandlerRegistry(registry)
+            .directExecutor()
+            .build()
+            .start();
+        registry.addService(monitoredService);
+
+        channel = InProcessChannelBuilder.forName(SERVICE_NAME)
+            .usePlaintext()
+            .build();
+        monitoredChannel = ClientInterceptors.intercept(
+            channel,
+            MonitoringClientInterceptor.create(clientStatsLogger, true)
+        );
+        client = PingPongServiceGrpc.newBlockingStub(monitoredChannel);
+        clientNonBlocking = PingPongServiceGrpc.newStub(monitoredChannel);
+    }
+
+    @After
+    public void teardown() {
+        if (null != channel) {
+            channel.shutdown();
+        }
+        if (null != server) {
+            server.shutdown();
+        }
+    }
+
+    private void assertStats(String methodName,
+                             long numCalls,
+                             long numClientMsgSent,
+                             long numClientMsgReceived,
+                             long numServerMsgSent,
+                             long numServerMsgReceived) {
+        // client stats
+        assertEquals(
+            numCalls,
+            
clientStatsLogger.scope(methodName).getCounter("grpc_started").get().longValue()
+        );
+        assertEquals(
+            numCalls,
+            
clientStatsLogger.scope(methodName).getCounter("grpc_completed").get().longValue()
+        );
+        assertEquals(
+            numClientMsgSent,
+            
clientStatsLogger.scope(methodName).getCounter("grpc_msg_sent").get().longValue()
+        );
+        assertEquals(
+            numClientMsgReceived,
+            
clientStatsLogger.scope(methodName).getCounter("grpc_msg_received").get().longValue()
+        );
+        TestOpStatsLogger opStatsLogger =
+            (TestOpStatsLogger) 
clientStatsLogger.scope(methodName).getOpStatsLogger("grpc_latency_micros");
+        assertEquals(
+            numCalls,
+            opStatsLogger.getSuccessCount()
+        );
+        // server stats
+        assertEquals(
+            numCalls,
+            
serverStatsLogger.scope(methodName).getCounter("grpc_started").get().longValue()
+        );
+        assertEquals(
+            numCalls,
+            
serverStatsLogger.scope(methodName).getCounter("grpc_completed").get().longValue()
+        );
+        assertEquals(
+            numServerMsgSent,
+            
serverStatsLogger.scope(methodName).getCounter("grpc_msg_sent").get().longValue()
+        );
+        assertEquals(
+            numServerMsgReceived,
+            
serverStatsLogger.scope(methodName).getCounter("grpc_msg_received").get().longValue()
+        );
+        opStatsLogger =
+            (TestOpStatsLogger) 
serverStatsLogger.scope(methodName).getOpStatsLogger("grpc_latency_micros");
+        assertEquals(
+            numCalls,
+            opStatsLogger.getSuccessCount()
+        );
+    }
+
+    @Test
+    public void testUnary() {
+        long sequence = ThreadLocalRandom.current().nextLong();
+        PingRequest request = PingRequest.newBuilder()
+            .setSequence(sequence)
+            .build();
+        PongResponse response = client.pingPong(request);
+        assertEquals(sequence, response.getLastSequence());
+        assertEquals(1, response.getNumPingReceived());
+        assertEquals(0, response.getSlotId());
+
+        // verify the stats
+        assertStats(
+            "PingPong",
+            1,
+            0,
+            0,
+            0,
+            0);
+    }
+
+    @Test
+    public void testServerStreaming() {
+        long sequence = ThreadLocalRandom.current().nextLong(100000);
+        PingRequest request = PingRequest.newBuilder()
+            .setSequence(sequence)
+            .build();
+        Iterator<PongResponse> respIter = client.lotsOfPongs(request);
+        int count = 0;
+        while (respIter.hasNext()) {
+            PongResponse resp = respIter.next();
+            assertEquals(sequence, resp.getLastSequence());
+            assertEquals(1, resp.getNumPingReceived());
+            assertEquals(count, resp.getSlotId());
+            ++count;
+        }
+
+        assertStats(
+            "LotsOfPongs",
+            1,
+            0,
+            NUM_PONGS_PER_PING,
+            NUM_PONGS_PER_PING,
+            0);
+    }
+
+    @Test
+    public void testClientStreaming() throws Exception {
+        final int numPings = 100;
+        final long sequence = ThreadLocalRandom.current().nextLong(100000);
+        final CompletableFuture<Void> respFuture = new CompletableFuture<>();
+        final LinkedBlockingQueue<PongResponse> respQueue = new 
LinkedBlockingQueue<>();
+        StreamObserver<PingRequest> pinger = clientNonBlocking.lotsOfPings(new 
StreamObserver<PongResponse>() {
+            @Override
+            public void onNext(PongResponse resp) {
+                respQueue.offer(resp);
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                respFuture.completeExceptionally(t);
+            }
+
+            @Override
+            public void onCompleted() {
+                FutureUtils.complete(respFuture, null);
+            }
+        });
+
+        for (int i = 0; i < numPings; i++) {
+            PingRequest request = PingRequest.newBuilder()
+                .setSequence(sequence + i)
+                .build();
+            pinger.onNext(request);
+        }
+        pinger.onCompleted();
+
+        // wait for response to be received.
+        result(respFuture);
+
+        assertEquals(1, respQueue.size());
+
+        PongResponse resp = respQueue.take();
+        assertEquals(sequence + numPings - 1, resp.getLastSequence());
+        assertEquals(numPings, resp.getNumPingReceived());
+        assertEquals(0, resp.getSlotId());
+
+        assertStats(
+            "LotsOfPings",
+            1,
+            numPings,
+            0,
+            0,
+            numPings
+        );
+    }
+
+    @Test
+    public void testBidiStreaming() throws Exception {
+        final int numPings = 100;
+
+        final CompletableFuture<Void> respFuture = new CompletableFuture<>();
+        final LinkedBlockingQueue<PongResponse> respQueue = new 
LinkedBlockingQueue<>();
+        StreamObserver<PingRequest> pinger = 
clientNonBlocking.bidiPingPong(new StreamObserver<PongResponse>() {
+            @Override
+            public void onNext(PongResponse resp) {
+                respQueue.offer(resp);
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                respFuture.completeExceptionally(t);
+            }
+
+            @Override
+            public void onCompleted() {
+                FutureUtils.complete(respFuture, null);
+            }
+        });
+
+        final LinkedBlockingQueue<PingRequest> reqQueue = new 
LinkedBlockingQueue<>();
+        for (int i = 0; i < numPings; i++) {
+            final long sequence = ThreadLocalRandom.current().nextLong(100000);
+            PingRequest request = PingRequest.newBuilder()
+                .setSequence(sequence)
+                .build();
+            reqQueue.put(request);
+            pinger.onNext(request);
+        }
+        pinger.onCompleted();
+
+        // wait for response to be received
+        result(respFuture);
+
+        assertEquals(numPings, respQueue.size());
+
+        int count = 0;
+        for (PingRequest request : reqQueue) {
+            PongResponse response = respQueue.take();
+
+            assertEquals(request.getSequence(), response.getLastSequence());
+            assertEquals(++count, response.getNumPingReceived());
+            assertEquals(0, response.getSlotId());
+        }
+        assertNull(respQueue.poll());
+        assertEquals(numPings, count);
+
+        assertStats(
+            "BidiPingPong",
+            1,
+            numPings,
+            numPings,
+            numPings,
+            numPings
+        );
+    }
+
+}
diff --git 
a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/ServerStatsTest.java
 
b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/ServerStatsTest.java
new file mode 100644
index 0000000000..8300aa6327
--- /dev/null
+++ 
b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/stats/ServerStatsTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.stats;
+
+import static org.junit.Assert.assertEquals;
+
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.common.grpc.stats.ServerStats.Factory;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ServerStats}.
+ */
+public class ServerStatsTest {
+
+    private Factory factoryWithHistograms;
+    private Factory factoryWithoutHistograms;
+    private TestStatsProvider statsProvider;
+
+    @Before
+    public void setup() {
+        this.statsProvider = new TestStatsProvider();
+        this.factoryWithHistograms = new Factory(true);
+        this.factoryWithoutHistograms = new Factory(false);
+    }
+
+    @Test
+    public void testServerStatsWithHistogram() {
+        testServerStats(factoryWithHistograms, true);
+    }
+
+    @Test
+    public void testServerStatsWithoutHistogram() {
+        testServerStats(factoryWithoutHistograms, false);
+    }
+
+    private void testServerStats(Factory clientStatsFactory,
+                                 boolean includeLatencyHistogram) {
+        // test unary method
+        MethodDescriptor<?, ?> unaryMethod = 
PingPongServiceGrpc.getPingPongMethod();
+        testServerStats(
+            clientStatsFactory,
+            includeLatencyHistogram,
+            unaryMethod,
+            "PingPong",
+            "unary",
+            1,
+            1,
+            0,
+            0
+        );
+        // test client streaming
+        MethodDescriptor<?, ?> clientStreamingMethod = 
PingPongServiceGrpc.getLotsOfPingsMethod();
+        testServerStats(
+            clientStatsFactory,
+            includeLatencyHistogram,
+            clientStreamingMethod,
+            "LotsOfPings",
+            "client_streaming",
+            1,
+            1,
+            0,
+            2
+        );
+        // test server streaming
+        MethodDescriptor<?, ?> serverStreamingMethod = 
PingPongServiceGrpc.getLotsOfPongsMethod();
+        testServerStats(
+            clientStatsFactory,
+            includeLatencyHistogram,
+            serverStreamingMethod,
+            "LotsOfPongs",
+            "server_streaming",
+            1,
+            1,
+            1,
+            0
+        );
+        // test server streaming
+        MethodDescriptor<?, ?> biStreamingMethod = 
PingPongServiceGrpc.getBidiPingPongMethod();
+        testServerStats(
+            clientStatsFactory,
+            includeLatencyHistogram,
+            biStreamingMethod,
+            "BidiPingPong",
+            "bidi_streaming",
+            1,
+            1,
+            1,
+            2
+        );
+    }
+
+    private void testServerStats(Factory clientStatsFactory,
+                                 boolean includeLatencyHistogram,
+                                 MethodDescriptor<?, ?> method,
+                                 String methodName,
+                                 String statsScope,
+                                 long expectedCallStarted,
+                                 long expectedCallCompleted,
+                                 long expectedStreamMsgsSent,
+                                 long expectedStreamMsgsReceived) {
+        StatsLogger statsLogger = statsProvider.getStatsLogger(statsScope);
+        ServerStats unaryStats = clientStatsFactory.createMetricsForMethod(
+            method,
+            statsLogger
+        );
+        unaryStats.recordCallStarted();
+        assertEquals(
+            expectedCallStarted,
+            
statsLogger.scope(methodName).getCounter("grpc_started").get().longValue());
+        unaryStats.recordServerHandled(Status.OK.getCode());
+        assertEquals(
+            expectedCallCompleted,
+            
statsLogger.scope(methodName).getCounter("grpc_completed").get().longValue());
+        unaryStats.recordStreamMessageSent();
+        assertEquals(
+            expectedStreamMsgsSent,
+            
statsLogger.scope(methodName).getCounter("grpc_msg_sent").get().longValue());
+        unaryStats.recordStreamMessageReceived();
+        unaryStats.recordStreamMessageReceived();
+        assertEquals(
+            expectedStreamMsgsReceived,
+            
statsLogger.scope(methodName).getCounter("grpc_msg_received").get().longValue());
+        long latencyMicros = 12345L;
+        unaryStats.recordLatency(true, latencyMicros);
+        TestOpStatsLogger opStatsLogger =
+            (TestOpStatsLogger) 
statsLogger.scope(methodName).getOpStatsLogger("grpc_latency_micros");
+        if (includeLatencyHistogram) {
+            assertEquals(1, opStatsLogger.getSuccessCount());
+            assertEquals(
+                TimeUnit.MICROSECONDS.toNanos(latencyMicros),
+                (long) opStatsLogger.getSuccessAverage());
+        } else {
+            assertEquals(0, opStatsLogger.getSuccessCount());
+            assertEquals(0, (long) opStatsLogger.getSuccessAverage());
+        }
+    }
+
+}
diff --git 
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java
 
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java
index 7a5f16306e..1ef26423a5 100644
--- 
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java
+++ 
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java
@@ -18,12 +18,14 @@
 import io.grpc.HandlerRegistry;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
+import io.grpc.ServerInterceptors;
 import io.grpc.ServerServiceDefinition;
 import io.grpc.inprocess.InProcessServerBuilder;
 import java.io.IOException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
 import org.apache.bookkeeper.common.grpc.proxy.ProxyHandlerRegistry;
+import org.apache.bookkeeper.common.grpc.stats.MonitoringServerInterceptor;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
@@ -75,13 +77,23 @@ public GrpcServer(StorageContainerStore 
storageContainerStore,
             }
             this.grpcServer = serverBuilder.build();
         } else {
+            MonitoringServerInterceptor monitoringInterceptor =
+                
MonitoringServerInterceptor.create(statsLogger.scope("services"), true);
             ProxyHandlerRegistry.Builder proxyRegistryBuilder = 
ProxyHandlerRegistry.newBuilder()
                 .setChannelFinder(storageContainerStore);
             for (ServerServiceDefinition definition : 
GrpcServices.create(null)) {
-                proxyRegistryBuilder = 
proxyRegistryBuilder.addService(definition);
+                ServerServiceDefinition monitoredService = 
ServerInterceptors.intercept(
+                    definition,
+                    monitoringInterceptor
+                );
+                proxyRegistryBuilder = 
proxyRegistryBuilder.addService(monitoredService);
             }
+            ServerServiceDefinition locationService = 
ServerInterceptors.intercept(
+                new GrpcStorageContainerService(storageContainerStore),
+                monitoringInterceptor
+            );
             this.grpcServer = ServerBuilder.forPort(this.myEndpoint.getPort())
-                .addService(new 
GrpcStorageContainerService(storageContainerStore))
+                .addService(locationService)
                 .fallbackHandlerRegistry(proxyRegistryBuilder.build())
                 .build();
         }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to