This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new d1aa51b04 [#1991][FOLLOWUP] improvement: Refactor SupplierGauge to 
support generic type and add requireBufferCount metrics (#2113)
d1aa51b04 is described below

commit d1aa51b0495fc926e1305405c6147f0930f025f6
Author: maobaolong <[email protected]>
AuthorDate: Wed Sep 18 17:25:53 2024 +0800

    [#1991][FOLLOWUP] improvement: Refactor SupplierGauge to support generic 
type and add requireBufferCount metrics (#2113)
    
    ### What changes were proposed in this pull request?
    
    Refactor SupplierGauge to support generic type and add requireBufferCount 
metrics
    
    ### Why are the changes needed?
    
    Fix: #1991
    
    ### Does this PR introduce _any_ user-facing change?
    
    Add new metrics named `require_buffer_count`
    
    ### How was this patch tested?
    
    Tested through dashboard server metrics popup page.
    
    <img width="945" alt="image" 
src="https://github.com/user-attachments/assets/2fbbb1d1-7f1f-41ac-ab41-adba55790005";>
    
    
    ---------
    
    Co-authored-by: xianjingfeng <[email protected]>
---
 .../uniffle/common/metrics/MetricsManager.java      |  2 +-
 .../uniffle/common/metrics/SupplierGauge.java       | 21 +++++++++++++--------
 .../org/apache/uniffle/server/ShuffleServer.java    | 21 ++++++++-------------
 .../apache/uniffle/server/ShuffleServerMetrics.java |  4 +++-
 .../apache/uniffle/server/ShuffleTaskManager.java   |  3 +++
 5 files changed, 28 insertions(+), 23 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
index b26c055c7..bf6a95bd7 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
@@ -84,7 +84,7 @@ public class MetricsManager {
     return c.labels(this.defaultLabelValues);
   }
 
-  public void addLabeledGauge(String name, Supplier<Double> supplier) {
+  public <T extends Number> void addLabeledGauge(String name, Supplier<T> 
supplier) {
     supplierGaugeMap.computeIfAbsent(
         name,
         metricName ->
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
index 674980def..eb7756ef6 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
@@ -25,20 +25,20 @@ import java.util.function.Supplier;
 
 import io.prometheus.client.Collector;
 import io.prometheus.client.GaugeMetricFamily;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SupplierGauge<T extends Number> extends Collector implements 
Collector.Describable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SupplierGauge.class);
 
-class SupplierGauge extends Collector implements Collector.Describable {
   private String name;
   private String help;
-  private Supplier<Double> supplier;
+  private Supplier<T> supplier;
   private List<String> labelNames;
   private List<String> labelValues;
 
   SupplierGauge(
-      String name,
-      String help,
-      Supplier<Double> supplier,
-      String[] labelNames,
-      String[] labelValues) {
+      String name, String help, Supplier<T> supplier, String[] labelNames, 
String[] labelValues) {
     this.name = name;
     this.help = help;
     this.supplier = supplier;
@@ -49,9 +49,14 @@ class SupplierGauge extends Collector implements 
Collector.Describable {
   @Override
   public List<MetricFamilySamples> collect() {
     List<MetricFamilySamples.Sample> samples = new ArrayList<>();
+    T lastValue = supplier.get();
+    if (lastValue == null) {
+      LOG.warn("SupplierGauge {} returned null value.", this.name);
+      return Collections.emptyList();
+    }
     samples.add(
         new MetricFamilySamples.Sample(
-            this.name, this.labelNames, this.labelValues, 
this.supplier.get()));
+            this.name, this.labelNames, this.labelValues, 
lastValue.doubleValue()));
     MetricFamilySamples mfs = new MetricFamilySamples(this.name, Type.GAUGE, 
this.help, samples);
     List<MetricFamilySamples> mfsList = new ArrayList<MetricFamilySamples>(1);
     mfsList.add(mfs);
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 88ec8ea64..30ecacf3e 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -321,27 +321,22 @@ public class ShuffleServer {
             shuffleMergeManager);
     shuffleTaskManager.start();
     ShuffleServerMetrics.addLabeledGauge(
-        USED_DIRECT_MEMORY_SIZE_BY_NETTY, () -> (double) 
PlatformDependent.usedDirectMemory());
+        USED_DIRECT_MEMORY_SIZE_BY_NETTY, PlatformDependent::usedDirectMemory);
     ShuffleServerMetrics.addLabeledGauge(
         USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY,
-        () ->
-            (double)
-                
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.usedDirectMemory());
+        
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent::usedDirectMemory);
     ShuffleServerMetrics.addLabeledGauge(
         USED_DIRECT_MEMORY_SIZE,
         () ->
-            (double)
-                (PlatformDependent.usedDirectMemory()
-                    + 
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent
-                        .usedDirectMemory()));
+            (PlatformDependent.usedDirectMemory()
+                + io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent
+                    .usedDirectMemory()));
     ShuffleServerMetrics.addLabeledGauge(
-        JVM_PAUSE_TOTAL_EXTRA_TIME, () -> (double) 
jvmPauseMonitor.getTotalGcExtraSleepTime());
+        JVM_PAUSE_TOTAL_EXTRA_TIME, jvmPauseMonitor::getTotalGcExtraSleepTime);
     ShuffleServerMetrics.addLabeledGauge(
-        JVM_PAUSE_INFO_TIME_EXCEEDED,
-        () -> (double) jvmPauseMonitor.getNumGcInfoThresholdExceeded());
+        JVM_PAUSE_INFO_TIME_EXCEEDED, 
jvmPauseMonitor::getNumGcInfoThresholdExceeded);
     ShuffleServerMetrics.addLabeledGauge(
-        JVM_PAUSE_WARN_TIME_EXCEEDED,
-        () -> (double) jvmPauseMonitor.getNumGcWarnThresholdExceeded());
+        JVM_PAUSE_WARN_TIME_EXCEEDED, 
jvmPauseMonitor::getNumGcWarnThresholdExceeded);
 
     setServer();
   }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index b3e56c0b3..d22cec0b3 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -157,6 +157,8 @@ public class ShuffleServerMetrics {
   public static final String TOPN_OF_ON_HADOOP_DATA_SIZE_FOR_APP =
       "topN_of_on_hadoop_data_size_for_app";
 
+  public static final String REQUIRE_BUFFER_COUNT = "require_buffer_count";
+
   public static Counter.Child counterTotalAppNum;
   public static Counter.Child counterTotalAppWithHugePartitionNum;
   public static Counter.Child counterTotalPartitionNum;
@@ -513,7 +515,7 @@ public class ShuffleServerMetrics {
             .register(metricsManager.getCollectorRegistry());
   }
 
-  public static void addLabeledGauge(String name, Supplier<Double> supplier) {
+  public static <T extends Number> void addLabeledGauge(String name, 
Supplier<T> supplier) {
     metricsManager.addLabeledGauge(name, supplier);
   }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index b48258423..d5e747071 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -88,6 +88,7 @@ import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 
 import static 
org.apache.uniffle.server.ShuffleServerConf.CLIENT_MAX_CONCURRENCY_LIMITATION_OF_ONE_PARTITION;
 import static 
org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION;
+import static 
org.apache.uniffle.server.ShuffleServerMetrics.REQUIRE_BUFFER_COUNT;
 
 public class ShuffleTaskManager {
 
@@ -240,6 +241,8 @@ public class ShuffleTaskManager {
 
     topNShuffleDataSizeOfAppCalcTask = new 
TopNShuffleDataSizeOfAppCalcTask(this, conf);
     topNShuffleDataSizeOfAppCalcTask.start();
+
+    ShuffleServerMetrics.addLabeledGauge(REQUIRE_BUFFER_COUNT, 
requireBufferIds::size);
   }
 
   public ReentrantReadWriteLock.WriteLock getAppWriteLock(String appId) {

Reply via email to