This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new d1aa51b04 [#1991][FOLLOWUP] improvement: Refactor SupplierGauge to
support generic type and add requireBufferCount metrics (#2113)
d1aa51b04 is described below
commit d1aa51b0495fc926e1305405c6147f0930f025f6
Author: maobaolong <[email protected]>
AuthorDate: Wed Sep 18 17:25:53 2024 +0800
[#1991][FOLLOWUP] improvement: Refactor SupplierGauge to support generic
type and add requireBufferCount metrics (#2113)
### What changes were proposed in this pull request?
Refactor SupplierGauge to support generic type and add requireBufferCount
metrics
### Why are the changes needed?
Fix: #1991
### Does this PR introduce _any_ user-facing change?
Add new metrics named `require_buffer_count`
### How was this patch tested?
Tested through dashboard server metrics popup page.
<img width="945" alt="image"
src="https://github.com/user-attachments/assets/2fbbb1d1-7f1f-41ac-ab41-adba55790005">
---------
Co-authored-by: xianjingfeng <[email protected]>
---
.../uniffle/common/metrics/MetricsManager.java | 2 +-
.../uniffle/common/metrics/SupplierGauge.java | 21 +++++++++++++--------
.../org/apache/uniffle/server/ShuffleServer.java | 21 ++++++++-------------
.../apache/uniffle/server/ShuffleServerMetrics.java | 4 +++-
.../apache/uniffle/server/ShuffleTaskManager.java | 3 +++
5 files changed, 28 insertions(+), 23 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
index b26c055c7..bf6a95bd7 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
@@ -84,7 +84,7 @@ public class MetricsManager {
return c.labels(this.defaultLabelValues);
}
- public void addLabeledGauge(String name, Supplier<Double> supplier) {
+ public <T extends Number> void addLabeledGauge(String name, Supplier<T>
supplier) {
supplierGaugeMap.computeIfAbsent(
name,
metricName ->
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
b/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
index 674980def..eb7756ef6 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
@@ -25,20 +25,20 @@ import java.util.function.Supplier;
import io.prometheus.client.Collector;
import io.prometheus.client.GaugeMetricFamily;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SupplierGauge<T extends Number> extends Collector implements
Collector.Describable {
+ private static final Logger LOG =
LoggerFactory.getLogger(SupplierGauge.class);
-class SupplierGauge extends Collector implements Collector.Describable {
private String name;
private String help;
- private Supplier<Double> supplier;
+ private Supplier<T> supplier;
private List<String> labelNames;
private List<String> labelValues;
SupplierGauge(
- String name,
- String help,
- Supplier<Double> supplier,
- String[] labelNames,
- String[] labelValues) {
+ String name, String help, Supplier<T> supplier, String[] labelNames,
String[] labelValues) {
this.name = name;
this.help = help;
this.supplier = supplier;
@@ -49,9 +49,14 @@ class SupplierGauge extends Collector implements
Collector.Describable {
@Override
public List<MetricFamilySamples> collect() {
List<MetricFamilySamples.Sample> samples = new ArrayList<>();
+ T lastValue = supplier.get();
+ if (lastValue == null) {
+ LOG.warn("SupplierGauge {} returned null value.", this.name);
+ return Collections.emptyList();
+ }
samples.add(
new MetricFamilySamples.Sample(
- this.name, this.labelNames, this.labelValues,
this.supplier.get()));
+ this.name, this.labelNames, this.labelValues,
lastValue.doubleValue()));
MetricFamilySamples mfs = new MetricFamilySamples(this.name, Type.GAUGE,
this.help, samples);
List<MetricFamilySamples> mfsList = new ArrayList<MetricFamilySamples>(1);
mfsList.add(mfs);
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 88ec8ea64..30ecacf3e 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -321,27 +321,22 @@ public class ShuffleServer {
shuffleMergeManager);
shuffleTaskManager.start();
ShuffleServerMetrics.addLabeledGauge(
- USED_DIRECT_MEMORY_SIZE_BY_NETTY, () -> (double)
PlatformDependent.usedDirectMemory());
+ USED_DIRECT_MEMORY_SIZE_BY_NETTY, PlatformDependent::usedDirectMemory);
ShuffleServerMetrics.addLabeledGauge(
USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY,
- () ->
- (double)
-
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.usedDirectMemory());
+
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent::usedDirectMemory);
ShuffleServerMetrics.addLabeledGauge(
USED_DIRECT_MEMORY_SIZE,
() ->
- (double)
- (PlatformDependent.usedDirectMemory()
- +
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent
- .usedDirectMemory()));
+ (PlatformDependent.usedDirectMemory()
+ + io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent
+ .usedDirectMemory()));
ShuffleServerMetrics.addLabeledGauge(
- JVM_PAUSE_TOTAL_EXTRA_TIME, () -> (double)
jvmPauseMonitor.getTotalGcExtraSleepTime());
+ JVM_PAUSE_TOTAL_EXTRA_TIME, jvmPauseMonitor::getTotalGcExtraSleepTime);
ShuffleServerMetrics.addLabeledGauge(
- JVM_PAUSE_INFO_TIME_EXCEEDED,
- () -> (double) jvmPauseMonitor.getNumGcInfoThresholdExceeded());
+ JVM_PAUSE_INFO_TIME_EXCEEDED,
jvmPauseMonitor::getNumGcInfoThresholdExceeded);
ShuffleServerMetrics.addLabeledGauge(
- JVM_PAUSE_WARN_TIME_EXCEEDED,
- () -> (double) jvmPauseMonitor.getNumGcWarnThresholdExceeded());
+ JVM_PAUSE_WARN_TIME_EXCEEDED,
jvmPauseMonitor::getNumGcWarnThresholdExceeded);
setServer();
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index b3e56c0b3..d22cec0b3 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -157,6 +157,8 @@ public class ShuffleServerMetrics {
public static final String TOPN_OF_ON_HADOOP_DATA_SIZE_FOR_APP =
"topN_of_on_hadoop_data_size_for_app";
+ public static final String REQUIRE_BUFFER_COUNT = "require_buffer_count";
+
public static Counter.Child counterTotalAppNum;
public static Counter.Child counterTotalAppWithHugePartitionNum;
public static Counter.Child counterTotalPartitionNum;
@@ -513,7 +515,7 @@ public class ShuffleServerMetrics {
.register(metricsManager.getCollectorRegistry());
}
- public static void addLabeledGauge(String name, Supplier<Double> supplier) {
+ public static <T extends Number> void addLabeledGauge(String name,
Supplier<T> supplier) {
metricsManager.addLabeledGauge(name, supplier);
}
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index b48258423..d5e747071 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -88,6 +88,7 @@ import org.apache.uniffle.storage.util.ShuffleStorageUtils;
import static
org.apache.uniffle.server.ShuffleServerConf.CLIENT_MAX_CONCURRENCY_LIMITATION_OF_ONE_PARTITION;
import static
org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION;
+import static
org.apache.uniffle.server.ShuffleServerMetrics.REQUIRE_BUFFER_COUNT;
public class ShuffleTaskManager {
@@ -240,6 +241,8 @@ public class ShuffleTaskManager {
topNShuffleDataSizeOfAppCalcTask = new
TopNShuffleDataSizeOfAppCalcTask(this, conf);
topNShuffleDataSizeOfAppCalcTask.start();
+
+ ShuffleServerMetrics.addLabeledGauge(REQUIRE_BUFFER_COUNT,
requireBufferIds::size);
}
public ReentrantReadWriteLock.WriteLock getAppWriteLock(String appId) {