This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f2f21e363 HDDS-9424. Add metric for number of concurrent requests RPC 
Server is handling (#5420)
1f2f21e363 is described below

commit 1f2f21e363621a5b2d05f8c235aa709774e8d1f1
Author: Duong Nguyen <[email protected]>
AuthorDate: Sun Oct 15 04:18:11 2023 -0700

    HDDS-9424. Add metric for number of concurrent requests RPC Server is 
handling (#5420)
---
 .../server/OzoneProtocolMessageDispatcher.java     | 12 ++++-----
 .../hadoop/hdds/utils/ProtocolMessageMetrics.java  | 29 +++++++++++++++++-----
 2 files changed, 29 insertions(+), 12 deletions(-)

diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java
index b39707bfae..e462f455e3 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
 
 import com.google.protobuf.ServiceException;
 import io.opentracing.Span;
+import org.apache.ratis.util.UncheckedAutoCloseable;
 import org.apache.ratis.util.function.CheckedFunction;
 import org.slf4j.Logger;
 
@@ -82,12 +83,11 @@ public class OzoneProtocolMessageDispatcher<REQUEST, 
RESPONSE, TYPE> {
             serviceName, type);
       }
 
-      long startTime = System.currentTimeMillis();
-
-      RESPONSE response = methodCall.apply(request);
-
-      protocolMessageMetrics.increment(type,
-          System.currentTimeMillis() - startTime);
+      RESPONSE response;
+      try (UncheckedAutoCloseable ignored =
+               protocolMessageMetrics.measure(type)) {
+        response = methodCall.apply(request);
+      }
 
       if (logger.isTraceEnabled()) {
         logger.trace(
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/ProtocolMessageMetrics.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/ProtocolMessageMetrics.java
index aa50eab829..7d72b3aac2 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/ProtocolMessageMetrics.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/ProtocolMessageMetrics.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.utils;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.metrics2.MetricsCollector;
@@ -28,22 +29,25 @@ import org.apache.hadoop.metrics2.MetricsSource;
 import org.apache.hadoop.metrics2.MetricsTag;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.Interns;
+import org.apache.ratis.util.UncheckedAutoCloseable;
 
 /**
  * Metrics to count all the subtypes of a specific message.
  */
 public class ProtocolMessageMetrics<KEY> implements MetricsSource {
 
-  private String name;
+  private final String name;
 
-  private String description;
+  private final String description;
 
-  private Map<KEY, AtomicLong> counters =
+  private final Map<KEY, AtomicLong> counters =
       new ConcurrentHashMap<>();
 
-  private Map<KEY, AtomicLong> elapsedTimes =
+  private final Map<KEY, AtomicLong> elapsedTimes =
       new ConcurrentHashMap<>();
 
+  private final AtomicInteger concurrency = new AtomicInteger(0);
+
   public static <KEY> ProtocolMessageMetrics<KEY> create(String name,
       String description, KEY[] types) {
     return new ProtocolMessageMetrics<KEY>(name, description, types);
@@ -64,6 +68,16 @@ public class ProtocolMessageMetrics<KEY> implements 
MetricsSource {
     elapsedTimes.get(key).addAndGet(duration);
   }
 
+  public UncheckedAutoCloseable measure(KEY key) {
+    final long startTime = System.currentTimeMillis();
+    concurrency.incrementAndGet();
+    return () -> {
+      concurrency.decrementAndGet();
+      counters.get(key).incrementAndGet();
+      elapsedTimes.get(key).addAndGet(System.currentTimeMillis() - startTime);
+    };
+  }
+
   public void register() {
     DefaultMetricsSystem.instance()
         .register(name, description, this);
@@ -88,14 +102,17 @@ public class ProtocolMessageMetrics<KEY> implements 
MetricsSource {
       builder.endRecord();
 
     });
+    MetricsRecordBuilder builder = collector.addRecord(name);
+    builder.addCounter(new MetricName("concurrency",
+            "Number of requests processed concurrently"), concurrency.get());
   }
 
   /**
    * Simple metrics info implementation.
    */
   public static class MetricName implements MetricsInfo {
-    private String name;
-    private String description;
+    private final String name;
+    private final String description;
 
     public MetricName(String name, String description) {
       this.name = name;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to