This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new a364f2aca [#2117] feat(server): Introduce memory buffer related
metrics(block/buffer/shuffle count) (#2118)
a364f2aca is described below
commit a364f2acad60a2807a3faef14d687ef5254e4166
Author: maobaolong <[email protected]>
AuthorDate: Thu Sep 26 14:47:45 2024 +0800
[#2117] feat(server): Introduce memory buffer related
metrics(block/buffer/shuffle count) (#2118)
### What changes were proposed in this pull request?
Introduce block count in buffer pool metrics.
### Why are the changes needed?
Fix: #2117
### Does this PR introduce _any_ user-facing change?
Yes. `block_count_in_buffer_pool` metrics introduced.
- "block_count_in_buffer_pool"
- "buffer_count_in_buffer_pool"
- "shuffle_count_in_buffer_pool"
### How was this patch tested?
By dashboard server metrics popup page.
---
.../uniffle/common/metrics/MetricsManager.java | 8 ++++++-
.../uniffle/common/metrics/SupplierGauge.java | 23 ++++++++++++++++++--
.../uniffle/server/ShuffleServerMetrics.java | 12 +++++++++++
.../server/buffer/ShuffleBufferManager.java | 25 ++++++++++++++++++++++
4 files changed, 65 insertions(+), 3 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
index bf6a95bd7..efa216e0c 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
@@ -85,6 +85,11 @@ public class MetricsManager {
}
public <T extends Number> void addLabeledGauge(String name, Supplier<T>
supplier) {
+ addLabeledCacheGauge(name, supplier, 0);
+ }
+
+ public <T extends Number> void addLabeledCacheGauge(
+ String name, Supplier<T> supplier, long updateInterval) {
supplierGaugeMap.computeIfAbsent(
name,
metricName ->
@@ -93,7 +98,8 @@ public class MetricsManager {
"Gauge " + name,
supplier,
this.defaultLabelNames,
- this.defaultLabelValues)
+ this.defaultLabelValues,
+ updateInterval)
.register(collectorRegistry));
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
b/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
index eb7756ef6..6892a3afa 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
@@ -36,27 +36,46 @@ class SupplierGauge<T extends Number> extends Collector
implements Collector.Des
private Supplier<T> supplier;
private List<String> labelNames;
private List<String> labelValues;
+ private long updateInterval;
+ private long lastUpdateTime;
+ private T lastValue;
SupplierGauge(
String name, String help, Supplier<T> supplier, String[] labelNames,
String[] labelValues) {
+ this(name, help, supplier, labelNames, labelValues, 0);
+ }
+
+ SupplierGauge(
+ String name,
+ String help,
+ Supplier<T> supplier,
+ String[] labelNames,
+ String[] labelValues,
+ long updateInterval) {
this.name = name;
this.help = help;
this.supplier = supplier;
this.labelNames = Arrays.asList(labelNames);
this.labelValues = Arrays.asList(labelValues);
+ this.updateInterval = updateInterval;
+ this.lastUpdateTime = 0;
}
@Override
public List<MetricFamilySamples> collect() {
List<MetricFamilySamples.Sample> samples = new ArrayList<>();
- T lastValue = supplier.get();
+ long time = System.currentTimeMillis();
+ if (time - lastUpdateTime > updateInterval) {
+ this.lastValue = this.supplier.get();
+ this.lastUpdateTime = time;
+ }
if (lastValue == null) {
LOG.warn("SupplierGauge {} returned null value.", this.name);
return Collections.emptyList();
}
samples.add(
new MetricFamilySamples.Sample(
- this.name, this.labelNames, this.labelValues,
lastValue.doubleValue()));
+ this.name, this.labelNames, this.labelValues,
this.lastValue.doubleValue()));
MetricFamilySamples mfs = new MetricFamilySamples(this.name, Type.GAUGE,
this.help, samples);
List<MetricFamilySamples> mfsList = new ArrayList<MetricFamilySamples>(1);
mfsList.add(mfs);
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index d22cec0b3..82f784d90 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -159,6 +159,10 @@ public class ShuffleServerMetrics {
public static final String REQUIRE_BUFFER_COUNT = "require_buffer_count";
+ public static final String BLOCK_COUNT_IN_BUFFER_POOL =
"block_count_in_buffer_pool";
+ public static final String BUFFER_COUNT_IN_BUFFER_POOL =
"buffer_count_in_buffer_pool";
+ public static final String SHUFFLE_COUNT_IN_BUFFER_POOL =
"shuffle_count_in_buffer_pool";
+
public static Counter.Child counterTotalAppNum;
public static Counter.Child counterTotalAppWithHugePartitionNum;
public static Counter.Child counterTotalPartitionNum;
@@ -518,4 +522,12 @@ public class ShuffleServerMetrics {
public static <T extends Number> void addLabeledGauge(String name,
Supplier<T> supplier) {
metricsManager.addLabeledGauge(name, supplier);
}
+
+ public static <T extends Number> void addLabeledCacheGauge(
+ String name, Supplier<T> supplier, long updateInterval) {
+ if (!isRegister) {
+ return;
+ }
+ metricsManager.addLabeledCacheGauge(name, supplier, updateInterval);
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 39002a0dc..18dd94c19 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -54,6 +54,10 @@ import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerMetrics;
import org.apache.uniffle.server.ShuffleTaskManager;
+import static
org.apache.uniffle.server.ShuffleServerMetrics.BLOCK_COUNT_IN_BUFFER_POOL;
+import static
org.apache.uniffle.server.ShuffleServerMetrics.BUFFER_COUNT_IN_BUFFER_POOL;
+import static
org.apache.uniffle.server.ShuffleServerMetrics.SHUFFLE_COUNT_IN_BUFFER_POOL;
+
public class ShuffleBufferManager {
private static final Logger LOG =
LoggerFactory.getLogger(ShuffleBufferManager.class);
@@ -141,6 +145,27 @@ public class ShuffleBufferManager {
appBlockSizeMetricEnabled =
conf.getBoolean(ShuffleServerConf.APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_ENABLED);
shuffleBufferType = conf.get(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE);
+
+ ShuffleServerMetrics.addLabeledCacheGauge(
+ BLOCK_COUNT_IN_BUFFER_POOL,
+ () ->
+ bufferPool.values().stream()
+ .flatMap(innerMap -> innerMap.values().stream())
+ .flatMap(rangeMap ->
rangeMap.asMapOfRanges().values().stream())
+ .mapToInt(shuffleBuffer -> shuffleBuffer.getBlockCount())
+ .sum(),
+ 2 * 60 * 1000L /* 2 minutes */);
+ ShuffleServerMetrics.addLabeledCacheGauge(
+ BUFFER_COUNT_IN_BUFFER_POOL,
+ () ->
+ bufferPool.values().stream()
+ .flatMap(innerMap -> innerMap.values().stream())
+ .mapToInt(rangeMap -> rangeMap.asMapOfRanges().size())
+ .sum(),
+ 2 * 60 * 1000L /* 2 minutes */);
+ ShuffleServerMetrics.addLabeledGauge(
+ SHUFFLE_COUNT_IN_BUFFER_POOL,
+ () -> bufferPool.values().stream().mapToInt(innerMap ->
innerMap.size()).sum());
}
public void setShuffleTaskManager(ShuffleTaskManager taskManager) {