This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new a364f2aca [#2117] feat(server): Introduce memory buffer related 
metrics(block/buffer/shuffle count) (#2118)
a364f2aca is described below

commit a364f2acad60a2807a3faef14d687ef5254e4166
Author: maobaolong <[email protected]>
AuthorDate: Thu Sep 26 14:47:45 2024 +0800

    [#2117] feat(server): Introduce memory buffer related 
metrics(block/buffer/shuffle count) (#2118)
    
    ### What changes were proposed in this pull request?
    
    Introduce block count in buffer pool metrics.
    
    ### Why are the changes needed?
    
    Fix: #2117
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. `block_count_in_buffer_pool` metrics introduced.
    
    - "block_count_in_buffer_pool"
    - "buffer_count_in_buffer_pool"
    - "shuffle_count_in_buffer_pool"
    
    ### How was this patch tested?
    
    By dashboard server metrics popup page.
---
 .../uniffle/common/metrics/MetricsManager.java     |  8 ++++++-
 .../uniffle/common/metrics/SupplierGauge.java      | 23 ++++++++++++++++++--
 .../uniffle/server/ShuffleServerMetrics.java       | 12 +++++++++++
 .../server/buffer/ShuffleBufferManager.java        | 25 ++++++++++++++++++++++
 4 files changed, 65 insertions(+), 3 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
index bf6a95bd7..efa216e0c 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
@@ -85,6 +85,11 @@ public class MetricsManager {
   }
 
   public <T extends Number> void addLabeledGauge(String name, Supplier<T> 
supplier) {
+    addLabeledCacheGauge(name, supplier, 0);
+  }
+
+  public <T extends Number> void addLabeledCacheGauge(
+      String name, Supplier<T> supplier, long updateInterval) {
     supplierGaugeMap.computeIfAbsent(
         name,
         metricName ->
@@ -93,7 +98,8 @@ public class MetricsManager {
                     "Gauge " + name,
                     supplier,
                     this.defaultLabelNames,
-                    this.defaultLabelValues)
+                    this.defaultLabelValues,
+                    updateInterval)
                 .register(collectorRegistry));
   }
 
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
index eb7756ef6..6892a3afa 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
@@ -36,27 +36,46 @@ class SupplierGauge<T extends Number> extends Collector 
implements Collector.Des
   private Supplier<T> supplier;
   private List<String> labelNames;
   private List<String> labelValues;
+  private long updateInterval;
+  private long lastUpdateTime;
+  private T lastValue;
 
   SupplierGauge(
       String name, String help, Supplier<T> supplier, String[] labelNames, 
String[] labelValues) {
+    this(name, help, supplier, labelNames, labelValues, 0);
+  }
+
+  SupplierGauge(
+      String name,
+      String help,
+      Supplier<T> supplier,
+      String[] labelNames,
+      String[] labelValues,
+      long updateInterval) {
     this.name = name;
     this.help = help;
     this.supplier = supplier;
     this.labelNames = Arrays.asList(labelNames);
     this.labelValues = Arrays.asList(labelValues);
+    this.updateInterval = updateInterval;
+    this.lastUpdateTime = 0;
   }
 
   @Override
   public List<MetricFamilySamples> collect() {
     List<MetricFamilySamples.Sample> samples = new ArrayList<>();
-    T lastValue = supplier.get();
+    long time = System.currentTimeMillis();
+    if (time - lastUpdateTime > updateInterval) {
+      this.lastValue = this.supplier.get();
+      this.lastUpdateTime = time;
+    }
     if (lastValue == null) {
       LOG.warn("SupplierGauge {} returned null value.", this.name);
       return Collections.emptyList();
     }
     samples.add(
         new MetricFamilySamples.Sample(
-            this.name, this.labelNames, this.labelValues, 
lastValue.doubleValue()));
+            this.name, this.labelNames, this.labelValues, 
this.lastValue.doubleValue()));
     MetricFamilySamples mfs = new MetricFamilySamples(this.name, Type.GAUGE, 
this.help, samples);
     List<MetricFamilySamples> mfsList = new ArrayList<MetricFamilySamples>(1);
     mfsList.add(mfs);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index d22cec0b3..82f784d90 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -159,6 +159,10 @@ public class ShuffleServerMetrics {
 
   public static final String REQUIRE_BUFFER_COUNT = "require_buffer_count";
 
+  public static final String BLOCK_COUNT_IN_BUFFER_POOL = 
"block_count_in_buffer_pool";
+  public static final String BUFFER_COUNT_IN_BUFFER_POOL = 
"buffer_count_in_buffer_pool";
+  public static final String SHUFFLE_COUNT_IN_BUFFER_POOL = 
"shuffle_count_in_buffer_pool";
+
   public static Counter.Child counterTotalAppNum;
   public static Counter.Child counterTotalAppWithHugePartitionNum;
   public static Counter.Child counterTotalPartitionNum;
@@ -518,4 +522,12 @@ public class ShuffleServerMetrics {
   public static <T extends Number> void addLabeledGauge(String name, 
Supplier<T> supplier) {
     metricsManager.addLabeledGauge(name, supplier);
   }
+
+  public static <T extends Number> void addLabeledCacheGauge(
+      String name, Supplier<T> supplier, long updateInterval) {
+    if (!isRegister) {
+      return;
+    }
+    metricsManager.addLabeledCacheGauge(name, supplier, updateInterval);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 39002a0dc..18dd94c19 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -54,6 +54,10 @@ import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.ShuffleServerMetrics;
 import org.apache.uniffle.server.ShuffleTaskManager;
 
+import static 
org.apache.uniffle.server.ShuffleServerMetrics.BLOCK_COUNT_IN_BUFFER_POOL;
+import static 
org.apache.uniffle.server.ShuffleServerMetrics.BUFFER_COUNT_IN_BUFFER_POOL;
+import static 
org.apache.uniffle.server.ShuffleServerMetrics.SHUFFLE_COUNT_IN_BUFFER_POOL;
+
 public class ShuffleBufferManager {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleBufferManager.class);
@@ -141,6 +145,27 @@ public class ShuffleBufferManager {
     appBlockSizeMetricEnabled =
         
conf.getBoolean(ShuffleServerConf.APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_ENABLED);
     shuffleBufferType = conf.get(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE);
+
+    ShuffleServerMetrics.addLabeledCacheGauge(
+        BLOCK_COUNT_IN_BUFFER_POOL,
+        () ->
+            bufferPool.values().stream()
+                .flatMap(innerMap -> innerMap.values().stream())
+                .flatMap(rangeMap -> 
rangeMap.asMapOfRanges().values().stream())
+                .mapToInt(shuffleBuffer -> shuffleBuffer.getBlockCount())
+                .sum(),
+        2 * 60 * 1000L /* 2 minutes */);
+    ShuffleServerMetrics.addLabeledCacheGauge(
+        BUFFER_COUNT_IN_BUFFER_POOL,
+        () ->
+            bufferPool.values().stream()
+                .flatMap(innerMap -> innerMap.values().stream())
+                .mapToInt(rangeMap -> rangeMap.asMapOfRanges().size())
+                .sum(),
+        2 * 60 * 1000L /* 2 minutes */);
+    ShuffleServerMetrics.addLabeledGauge(
+        SHUFFLE_COUNT_IN_BUFFER_POOL,
+        () -> bufferPool.values().stream().mapToInt(innerMap -> 
innerMap.size()).sum());
   }
 
   public void setShuffleTaskManager(ShuffleTaskManager taskManager) {

Reply via email to