This is an automated email from the ASF dual-hosted git repository.
smallzhongfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new f361ac07 [#778] feat: Separate `ShuffleServer` metrics through tags
(#812)
f361ac07 is described below
commit f361ac079a6d61a40e9ad384f77e5b354e875282
Author: jokercurry <[email protected]>
AuthorDate: Mon Apr 17 19:09:48 2023 +0800
[#778] feat: Separate `ShuffleServer` metrics through tags (#812)
<!--
1. Title: [#<issue>] <type>(<scope>): <subject>
Examples:
- "[#123] feat(operator): support xxx"
- "[#233] fix: check null before access result in xxx"
- "[MINOR] refactor: fix typo in variable name"
- "[MINOR] docs: fix typo in README"
- "[#255] test: fix flaky test NameOfTheTest"
Reference: https://www.conventionalcommits.org/en/v1.0.0/
2. Contributor guidelines:
https://github.com/apache/incubator-uniffle/blob/master/CONTRIBUTING.md
3. If the PR is unfinished, please mark this PR as draft.
-->
### What changes were proposed in this pull request?
Add `tag` as label in the metrics.
So that we can separate the metrics in Grafana.
### Why are the changes needed?
Fix: #778
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
fix uts.
---
.../uniffle/common/metrics/EmptyGRPCMetrics.java | 5 +
.../apache/uniffle/common/metrics/GRPCMetrics.java | 71 +++---
.../uniffle/common/metrics/MetricsManager.java | 35 ++-
.../org/apache/uniffle/common/util/Constants.java | 2 +
.../uniffle/common/metrics/MetricsManagerTest.java | 7 +-
.../coordinator/metric/CoordinatorGrpcMetrics.java | 16 +-
.../coordinator/metric/CoordinatorMetrics.java | 6 +-
.../org/apache/uniffle/server/ShuffleServer.java | 19 +-
.../uniffle/server/ShuffleServerGrpcMetrics.java | 60 +++---
.../uniffle/server/ShuffleServerMetrics.java | 239 +++++++++++----------
.../uniffle/server/ShuffleFlushManagerTest.java | 21 +-
.../server/ShuffleServerGrpcMetricsTest.java | 8 +-
.../uniffle/server/ShuffleServerMetricsTest.java | 33 ++-
13 files changed, 311 insertions(+), 211 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java
b/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java
index 40750ecc..7b500fba 100644
---
a/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java
+++
b/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java
@@ -18,6 +18,11 @@
package org.apache.uniffle.common.metrics;
public class EmptyGRPCMetrics extends GRPCMetrics {
+
+ public EmptyGRPCMetrics(String tags) {
+ super(tags);
+ }
+
@Override
public void registerMetrics() {
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
index de3faad1..eaac241f 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.common.metrics;
import java.util.Map;
+import com.google.common.collect.Maps;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
@@ -39,19 +40,26 @@ public abstract class GRPCMetrics {
private static final String GRPC_TOTAL = "grpc_total";
private boolean isRegistered = false;
- protected Map<String, Counter> counterMap = JavaUtils.newConcurrentMap();
- protected Map<String, Gauge> gaugeMap = JavaUtils.newConcurrentMap();
- protected Map<String, Summary> transportTimeSummaryMap =
JavaUtils.newConcurrentMap();
- protected Map<String, Summary> processTimeSummaryMap =
JavaUtils.newConcurrentMap();
- private Gauge gaugeGrpcOpen;
- private Counter counterGrpcTotal;
+ protected Map<String, Counter.Child> counterMap =
JavaUtils.newConcurrentMap();
+ protected Map<String, Gauge.Child> gaugeMap = JavaUtils.newConcurrentMap();
+ protected Map<String, Summary.Child> transportTimeSummaryMap =
JavaUtils.newConcurrentMap();
+ protected Map<String, Summary.Child> processTimeSummaryMap =
JavaUtils.newConcurrentMap();
+ protected Gauge.Child gaugeGrpcOpen;
+ protected Counter.Child counterGrpcTotal;
protected MetricsManager metricsManager;
+ protected String tags;
+
+ public GRPCMetrics(String tags) {
+ this.tags = tags;
+ }
public abstract void registerMetrics();
public void register(CollectorRegistry collectorRegistry) {
if (!isRegistered) {
- metricsManager = new MetricsManager(collectorRegistry);
+ Map<String, String> labels = Maps.newHashMap();
+ labels.put(Constants.METRICS_TAG_LABEL_NAME, tags);
+ metricsManager = new MetricsManager(collectorRegistry, labels);
registerGeneralMetrics();
registerMetrics();
isRegistered = true;
@@ -59,25 +67,22 @@ public abstract class GRPCMetrics {
}
private void registerGeneralMetrics() {
- gaugeGrpcOpen = metricsManager.addGauge(GRPC_OPEN);
- counterGrpcTotal = metricsManager.addCounter(GRPC_TOTAL);
- gaugeMap.putIfAbsent(
- GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY,
- metricsManager.addGauge(GRPC_SERVER_EXECUTOR_ACTIVE_THREADS)
+ gaugeGrpcOpen = metricsManager.addLabeledGauge(GRPC_OPEN);
+ counterGrpcTotal = metricsManager.addLabeledCounter(GRPC_TOTAL);
+ gaugeMap.putIfAbsent(GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY,
+ metricsManager.addLabeledGauge(GRPC_SERVER_EXECUTOR_ACTIVE_THREADS)
);
- gaugeMap.putIfAbsent(
- GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE_KEY,
- metricsManager.addGauge(GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE)
+ gaugeMap.putIfAbsent(GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE_KEY,
+
metricsManager.addLabeledGauge(GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE)
);
- gaugeMap.putIfAbsent(
- GRPC_SERVER_CONNECTION_NUMBER_KEY,
- metricsManager.addGauge(GRPC_SERVER_CONNECTION_NUMBER)
+ gaugeMap.putIfAbsent(GRPC_SERVER_CONNECTION_NUMBER_KEY,
+ metricsManager.addLabeledGauge(GRPC_SERVER_CONNECTION_NUMBER)
);
}
public void setGauge(String tag, double value) {
if (isRegistered) {
- Gauge gauge = gaugeMap.get(tag);
+ Gauge.Child gauge = gaugeMap.get(tag);
if (gauge != null) {
gauge.set(value);
}
@@ -90,7 +95,7 @@ public abstract class GRPCMetrics {
public void incGauge(String tag, double value) {
if (isRegistered) {
- Gauge gauge = gaugeMap.get(tag);
+ Gauge.Child gauge = gaugeMap.get(tag);
if (gauge != null) {
gauge.inc(value);
}
@@ -103,7 +108,7 @@ public abstract class GRPCMetrics {
public void decGauge(String tag, double value) {
if (isRegistered) {
- Gauge gauge = gaugeMap.get(tag);
+ Gauge.Child gauge = gaugeMap.get(tag);
if (gauge != null) {
gauge.dec(value);
}
@@ -112,11 +117,11 @@ public abstract class GRPCMetrics {
public void incCounter(String methodName) {
if (isRegistered) {
- Gauge gauge = gaugeMap.get(methodName);
+ Gauge.Child gauge = gaugeMap.get(methodName);
if (gauge != null) {
gauge.inc();
}
- Counter counter = counterMap.get(methodName);
+ Counter.Child counter = counterMap.get(methodName);
if (counter != null) {
counter.inc();
}
@@ -127,7 +132,7 @@ public abstract class GRPCMetrics {
public void decCounter(String methodName) {
if (isRegistered) {
- Gauge gauge = gaugeMap.get(methodName);
+ Gauge.Child gauge = gaugeMap.get(methodName);
if (gauge != null) {
gauge.dec();
}
@@ -136,14 +141,14 @@ public abstract class GRPCMetrics {
}
public void recordTransportTime(String methodName, long
transportTimeInMillionSecond) {
- Summary summary = transportTimeSummaryMap.get(methodName);
+ Summary.Child summary = transportTimeSummaryMap.get(methodName);
if (summary != null) {
summary.observe(transportTimeInMillionSecond /
Constants.MILLION_SECONDS_PER_SECOND);
}
}
public void recordProcessTime(String methodName, long
processTimeInMillionSecond) {
- Summary summary = processTimeSummaryMap.get(methodName);
+ Summary.Child summary = processTimeSummaryMap.get(methodName);
if (summary != null) {
summary.observe(processTimeInMillionSecond /
Constants.MILLION_SECONDS_PER_SECOND);
}
@@ -153,31 +158,31 @@ public abstract class GRPCMetrics {
return metricsManager.getCollectorRegistry();
}
- public Map<String, Counter> getCounterMap() {
+ public Map<String, Counter.Child> getCounterMap() {
return counterMap;
}
- public Map<String, Gauge> getGaugeMap() {
+ public Map<String, Gauge.Child> getGaugeMap() {
return gaugeMap;
}
- public Gauge getGaugeGrpcOpen() {
+ public Gauge.Child getGaugeGrpcOpen() {
return gaugeGrpcOpen;
}
- public Counter getCounterGrpcTotal() {
+ public Counter.Child getCounterGrpcTotal() {
return counterGrpcTotal;
}
- public Map<String, Summary> getTransportTimeSummaryMap() {
+ public Map<String, Summary.Child> getTransportTimeSummaryMap() {
return transportTimeSummaryMap;
}
- public Map<String, Summary> getProcessTimeSummaryMap() {
+ public Map<String, Summary.Child> getProcessTimeSummaryMap() {
return processTimeSummaryMap;
}
public static GRPCMetrics getEmptyGRPCMetrics() {
- return new EmptyGRPCMetrics();
+ return new EmptyGRPCMetrics(Constants.SHUFFLE_SERVER_VERSION);
}
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
index 2981ca22..8366df4f 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
@@ -17,6 +17,10 @@
package org.apache.uniffle.common.metrics;
+import java.util.Arrays;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
@@ -24,20 +28,26 @@ import io.prometheus.client.Histogram;
import io.prometheus.client.Summary;
public class MetricsManager {
- private CollectorRegistry collectorRegistry;
+ private final CollectorRegistry collectorRegistry;
+ private final String[] defaultLabelNames;
+ private final String[] defaultLabelValues;
private static final double[] QUANTILES = {0.50, 0.75, 0.90, 0.95, 0.99};
private static final double QUANTILE_ERROR = 0.01;
public MetricsManager() {
- this(null);
+ this(null, Maps.newHashMap());
}
- public MetricsManager(CollectorRegistry collectorRegistry) {
+ public MetricsManager(CollectorRegistry collectorRegistry, Map<String,
String> defaultLabels) {
if (collectorRegistry == null) {
this.collectorRegistry = CollectorRegistry.defaultRegistry;
} else {
this.collectorRegistry = collectorRegistry;
}
+ this.defaultLabelNames = defaultLabels.keySet().toArray(new String[0]);
+ this.defaultLabelValues = Arrays.stream(defaultLabelNames)
+ .map(defaultLabels::get)
+ .toArray(String[]::new);
}
public CollectorRegistry getCollectorRegistry() {
@@ -52,6 +62,11 @@ public class MetricsManager {
return
Counter.build().name(name).labelNames(labels).help(help).register(collectorRegistry);
}
+ public Counter.Child addLabeledCounter(String name) {
+ Counter c = addCounter(name, this.defaultLabelNames);
+ return c.labels(this.defaultLabelValues);
+ }
+
public Gauge addGauge(String name, String... labels) {
return addGauge(name, "Gauge " + name, labels);
}
@@ -60,6 +75,11 @@ public class MetricsManager {
return
Gauge.build().name(name).labelNames(labels).help(help).register(collectorRegistry);
}
+ public Gauge.Child addLabeledGauge(String name) {
+ Gauge c = addGauge(name, this.defaultLabelNames);
+ return c.labels(this.defaultLabelValues);
+ }
+
public Histogram addHistogram(String name, double[] buckets, String...
labels) {
return addHistogram(name, "Histogram " + name, buckets, labels);
}
@@ -75,4 +95,13 @@ public class MetricsManager {
}
return builder.register(collectorRegistry);
}
+
+ public Summary.Child addLabeledSummary(String name) {
+ Summary.Builder builder =
+ Summary.build().name(name).labelNames(defaultLabelNames).help("Summary
" + name);
+ for (int i = 0; i < QUANTILES.length; i++) {
+ builder = builder.quantile(QUANTILES[i], QUANTILE_ERROR);
+ }
+ return builder.register(collectorRegistry).labels(defaultLabelValues);
+ }
}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index 514cf0e7..af959c5b 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -24,6 +24,8 @@ public final class Constants {
// the value is used for client/server compatible, eg, online upgrade
public static final String SHUFFLE_SERVER_VERSION = "ss_v4";
+ public static final String METRICS_TAG_LABEL_NAME = "label";
+ public static final String COORDINATOR_TAG = "coordinator";
public static final String SHUFFLE_DATA_FILE_SUFFIX = ".data";
public static final String SHUFFLE_INDEX_FILE_SUFFIX = ".index";
// BlockId is long and consist of partitionId, taskAttemptId, atomicInt
diff --git
a/common/src/test/java/org/apache/uniffle/common/metrics/MetricsManagerTest.java
b/common/src/test/java/org/apache/uniffle/common/metrics/MetricsManagerTest.java
index 8c7fd591..d6019254 100644
---
a/common/src/test/java/org/apache/uniffle/common/metrics/MetricsManagerTest.java
+++
b/common/src/test/java/org/apache/uniffle/common/metrics/MetricsManagerTest.java
@@ -28,6 +28,8 @@ import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import org.junit.jupiter.api.Test;
+import org.apache.uniffle.common.util.Constants;
+
import static io.prometheus.client.Collector.MetricFamilySamples;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -38,9 +40,10 @@ public class MetricsManagerTest {
public void testMetricsManager() {
MetricsManager metricsManager = new MetricsManager();
assertEquals(CollectorRegistry.defaultRegistry,
metricsManager.getCollectorRegistry());
-
+ Map<String, String> labels = new HashMap<>();
+ labels.put(Constants.METRICS_TAG_LABEL_NAME,
Constants.SHUFFLE_SERVER_VERSION);
CollectorRegistry expectedRegistry = new CollectorRegistry();
- metricsManager = new MetricsManager(expectedRegistry);
+ metricsManager = new MetricsManager(expectedRegistry, labels);
assertEquals(expectedRegistry, metricsManager.getCollectorRegistry());
String expectedName1 = "counter";
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorGrpcMetrics.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorGrpcMetrics.java
index 8a9999bf..922f4c68 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorGrpcMetrics.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorGrpcMetrics.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.coordinator.metric;
import org.apache.uniffle.common.metrics.GRPCMetrics;
+import org.apache.uniffle.common.util.Constants;
public class CoordinatorGrpcMetrics extends GRPCMetrics {
@@ -31,15 +32,16 @@ public class CoordinatorGrpcMetrics extends GRPCMetrics {
private static final String GRPC_HEARTBEAT_TOTAL =
"grpc_heartbeat_total";
+ public CoordinatorGrpcMetrics() {
+ super(Constants.COORDINATOR_TAG);
+ }
+
@Override
public void registerMetrics() {
- gaugeMap.putIfAbsent(HEARTBEAT_METHOD,
- metricsManager.addGauge(GRPC_HEARTBEAT));
- gaugeMap.putIfAbsent(GET_SHUFFLE_ASSIGNMENTS_METHOD,
- metricsManager.addGauge(GRPC_GET_SHUFFLE_ASSIGNMENTS));
- counterMap.putIfAbsent(HEARTBEAT_METHOD,
- metricsManager.addCounter(GRPC_HEARTBEAT_TOTAL));
+ gaugeMap.putIfAbsent(HEARTBEAT_METHOD,
metricsManager.addLabeledGauge(GRPC_HEARTBEAT));
+ gaugeMap.putIfAbsent(GET_SHUFFLE_ASSIGNMENTS_METHOD,
metricsManager.addLabeledGauge(GRPC_GET_SHUFFLE_ASSIGNMENTS));
+ counterMap.putIfAbsent(HEARTBEAT_METHOD,
metricsManager.addLabeledCounter(GRPC_HEARTBEAT_TOTAL));
counterMap.putIfAbsent(GET_SHUFFLE_ASSIGNMENTS_METHOD,
- metricsManager.addCounter(GRPC_GET_SHUFFLE_ASSIGNMENTS_TOTAL));
+ metricsManager.addLabeledCounter(GRPC_GET_SHUFFLE_ASSIGNMENTS_TOTAL));
}
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
index f06a339b..a4595b60 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
@@ -20,12 +20,14 @@ package org.apache.uniffle.coordinator.metric;
import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import org.apache.commons.lang3.StringUtils;
import org.apache.uniffle.common.metrics.MetricsManager;
+import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
@@ -61,7 +63,9 @@ public class CoordinatorMetrics {
public static synchronized void register(CollectorRegistry
collectorRegistry) {
if (!isRegister) {
- metricsManager = new MetricsManager(collectorRegistry);
+ Map<String, String> labels = Maps.newHashMap();
+ labels.put(Constants.METRICS_TAG_LABEL_NAME, Constants.COORDINATOR_TAG);
+ metricsManager = new MetricsManager(collectorRegistry, labels);
isRegister = true;
setUpMetrics();
}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index c23dbb0a..cb9c7484 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -268,8 +268,9 @@ public class ShuffleServer {
private void registerMetrics() {
LOG.info("Register metrics");
CollectorRegistry shuffleServerCollectorRegistry = new
CollectorRegistry(true);
- ShuffleServerMetrics.register(shuffleServerCollectorRegistry);
- grpcMetrics = new ShuffleServerGrpcMetrics();
+ String tags = coverToString();
+ ShuffleServerMetrics.register(shuffleServerCollectorRegistry, tags);
+ grpcMetrics = new ShuffleServerGrpcMetrics(tags);
grpcMetrics.register(new CollectorRegistry(true));
CollectorRegistry jvmCollectorRegistry = new CollectorRegistry(true);
boolean verbose =
shuffleServerConf.getBoolean(ShuffleServerConf.RSS_JVM_METRICS_VERBOSE_ENABLE);
@@ -473,4 +474,18 @@ public class ShuffleServer {
public int getNettyPort() {
return nettyPort;
}
+
+ public String coverToString() {
+ List<String> tags = shuffleServerConf.get(ShuffleServerConf.TAGS);
+ StringBuilder sb = new StringBuilder();
+ sb.append(Constants.SHUFFLE_SERVER_VERSION);
+ if (tags == null || tags.size() == 0) {
+ return sb.toString();
+ }
+ for (String tag : tags) {
+ sb.append(",");
+ sb.append(tag);
+ }
+ return sb.toString();
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java
index f4e616e5..eb60fd90 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java
@@ -70,67 +70,71 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics {
private static final String GRPC_GET_MEMORY_SHUFFLE_DATA_PROCESS_LATENCY =
"grpc_get_memory_shuffle_data_process_latency";
+ public ShuffleServerGrpcMetrics(String tags) {
+ super(tags);
+ }
+
@Override
public void registerMetrics() {
gaugeMap.putIfAbsent(REGISTER_SHUFFLE_METHOD,
- metricsManager.addGauge(GRPC_REGISTERED_SHUFFLE));
+ metricsManager.addLabeledGauge(GRPC_REGISTERED_SHUFFLE));
gaugeMap.putIfAbsent(SEND_SHUFFLE_DATA_METHOD,
- metricsManager.addGauge(GRPC_SEND_SHUFFLE_DATA));
+ metricsManager.addLabeledGauge(GRPC_SEND_SHUFFLE_DATA));
gaugeMap.putIfAbsent(COMMIT_SHUFFLE_TASK_METHOD,
- metricsManager.addGauge(GRPC_COMMIT_SHUFFLE_TASK));
+ metricsManager.addLabeledGauge(GRPC_COMMIT_SHUFFLE_TASK));
gaugeMap.putIfAbsent(FINISH_SHUFFLE_METHOD,
- metricsManager.addGauge(GRPC_FINISH_SHUFFLE));
+ metricsManager.addLabeledGauge(GRPC_FINISH_SHUFFLE));
gaugeMap.putIfAbsent(REQUIRE_BUFFER_METHOD,
- metricsManager.addGauge(GRPC_REQUIRE_BUFFER));
+ metricsManager.addLabeledGauge(GRPC_REQUIRE_BUFFER));
gaugeMap.putIfAbsent(APP_HEARTBEAT_METHOD,
- metricsManager.addGauge(GRPC_APP_HEARTBEAT));
+ metricsManager.addLabeledGauge(GRPC_APP_HEARTBEAT));
gaugeMap.putIfAbsent(REPORT_SHUFFLE_RESULT_METHOD,
- metricsManager.addGauge(GRPC_REPORT_SHUFFLE_RESULT));
+ metricsManager.addLabeledGauge(GRPC_REPORT_SHUFFLE_RESULT));
gaugeMap.putIfAbsent(GET_SHUFFLE_RESULT_METHOD,
- metricsManager.addGauge(GRPC_GET_SHUFFLE_RESULT));
+ metricsManager.addLabeledGauge(GRPC_GET_SHUFFLE_RESULT));
gaugeMap.putIfAbsent(GET_SHUFFLE_DATA_METHOD,
- metricsManager.addGauge(GRPC_GET_SHUFFLE_DATA));
+ metricsManager.addLabeledGauge(GRPC_GET_SHUFFLE_DATA));
gaugeMap.putIfAbsent(GET_MEMORY_SHUFFLE_DATA_METHOD,
- metricsManager.addGauge(GRPC_GET_MEMORY_SHUFFLE_DATA));
+ metricsManager.addLabeledGauge(GRPC_GET_MEMORY_SHUFFLE_DATA));
gaugeMap.putIfAbsent(GET_SHUFFLE_INDEX_METHOD,
- metricsManager.addGauge(GRPC_GET_SHUFFLE_INDEX));
+ metricsManager.addLabeledGauge(GRPC_GET_SHUFFLE_INDEX));
counterMap.putIfAbsent(REGISTER_SHUFFLE_METHOD,
- metricsManager.addCounter(GRPC_REGISTERED_SHUFFLE_TOTAL));
+ metricsManager.addLabeledCounter(GRPC_REGISTERED_SHUFFLE_TOTAL));
counterMap.putIfAbsent(SEND_SHUFFLE_DATA_METHOD,
- metricsManager.addCounter(GRPC_SEND_SHUFFLE_DATA_TOTAL));
+ metricsManager.addLabeledCounter(GRPC_SEND_SHUFFLE_DATA_TOTAL));
counterMap.putIfAbsent(COMMIT_SHUFFLE_TASK_METHOD,
- metricsManager.addCounter(GRPC_COMMIT_SHUFFLE_TASK_TOTAL));
+ metricsManager.addLabeledCounter(GRPC_COMMIT_SHUFFLE_TASK_TOTAL));
counterMap.putIfAbsent(FINISH_SHUFFLE_METHOD,
- metricsManager.addCounter(GRPC_FINISH_SHUFFLE_TOTAL));
+ metricsManager.addLabeledCounter(GRPC_FINISH_SHUFFLE_TOTAL));
counterMap.putIfAbsent(REQUIRE_BUFFER_METHOD,
- metricsManager.addCounter(GRPC_REQUIRE_BUFFER_TOTAL));
+ metricsManager.addLabeledCounter(GRPC_REQUIRE_BUFFER_TOTAL));
counterMap.putIfAbsent(APP_HEARTBEAT_METHOD,
- metricsManager.addCounter(GRPC_APP_HEARTBEAT_TOTAL));
+ metricsManager.addLabeledCounter(GRPC_APP_HEARTBEAT_TOTAL));
counterMap.putIfAbsent(REPORT_SHUFFLE_RESULT_METHOD,
- metricsManager.addCounter(GRPC_REPORT_SHUFFLE_RESULT_TOTAL));
+ metricsManager.addLabeledCounter(GRPC_REPORT_SHUFFLE_RESULT_TOTAL));
counterMap.putIfAbsent(GET_SHUFFLE_RESULT_METHOD,
- metricsManager.addCounter(GRPC_GET_SHUFFLE_RESULT_TOTAL));
+ metricsManager.addLabeledCounter(GRPC_GET_SHUFFLE_RESULT_TOTAL));
counterMap.putIfAbsent(GET_SHUFFLE_DATA_METHOD,
- metricsManager.addCounter(GRPC_GET_SHUFFLE_DATA_TOTAL));
+ metricsManager.addLabeledCounter(GRPC_GET_SHUFFLE_DATA_TOTAL));
counterMap.putIfAbsent(GET_MEMORY_SHUFFLE_DATA_METHOD,
- metricsManager.addCounter(GRPC_GET_MEMORY_SHUFFLE_DATA_TOTAL));
+ metricsManager.addLabeledCounter(GRPC_GET_MEMORY_SHUFFLE_DATA_TOTAL));
counterMap.putIfAbsent(GET_SHUFFLE_INDEX_METHOD,
- metricsManager.addCounter(GRPC_GET_SHUFFLE_INDEX_TOTAL));
+ metricsManager.addLabeledCounter(GRPC_GET_SHUFFLE_INDEX_TOTAL));
transportTimeSummaryMap.putIfAbsent(SEND_SHUFFLE_DATA_METHOD,
- metricsManager.addSummary(GRPC_SEND_SHUFFLE_DATA_TRANSPORT_LATENCY));
+
metricsManager.addLabeledSummary(GRPC_SEND_SHUFFLE_DATA_TRANSPORT_LATENCY));
transportTimeSummaryMap.putIfAbsent(GET_SHUFFLE_DATA_METHOD,
- metricsManager.addSummary(GRPC_GET_SHUFFLE_DATA_TRANSPORT_LATENCY));
+
metricsManager.addLabeledSummary(GRPC_GET_SHUFFLE_DATA_TRANSPORT_LATENCY));
transportTimeSummaryMap.putIfAbsent(GET_MEMORY_SHUFFLE_DATA_METHOD,
-
metricsManager.addSummary(GRPC_GET_MEMORY_SHUFFLE_DATA_TRANSPORT_LATENCY));
+
metricsManager.addLabeledSummary(GRPC_GET_MEMORY_SHUFFLE_DATA_TRANSPORT_LATENCY));
processTimeSummaryMap.putIfAbsent(SEND_SHUFFLE_DATA_METHOD,
- metricsManager.addSummary(GRPC_SEND_SHUFFLE_DATA_PROCESS_LATENCY));
+
metricsManager.addLabeledSummary(GRPC_SEND_SHUFFLE_DATA_PROCESS_LATENCY));
processTimeSummaryMap.putIfAbsent(GET_SHUFFLE_DATA_METHOD,
- metricsManager.addSummary(GRPC_GET_SHUFFLE_DATA_PROCESS_LATENCY));
+
metricsManager.addLabeledSummary(GRPC_GET_SHUFFLE_DATA_PROCESS_LATENCY));
processTimeSummaryMap.putIfAbsent(GET_MEMORY_SHUFFLE_DATA_METHOD,
-
metricsManager.addSummary(GRPC_GET_MEMORY_SHUFFLE_DATA_PROCESS_LATENCY));
+
metricsManager.addLabeledSummary(GRPC_GET_MEMORY_SHUFFLE_DATA_PROCESS_LATENCY));
}
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index a5b09ff1..131394dc 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -17,13 +17,17 @@
package org.apache.uniffle.server;
+import java.util.Map;
+
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import org.apache.commons.lang3.StringUtils;
import org.apache.uniffle.common.metrics.MetricsManager;
+import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.common.LocalStorage;
public class ShuffleServerMetrics {
@@ -90,72 +94,76 @@ public class ShuffleServerMetrics {
private static final String HUGE_PARTITION_NUM = "huge_partition_num";
private static final String APP_WITH_HUGE_PARTITION_NUM =
"app_with_huge_partition_num";
- public static Counter counterTotalAppNum;
- public static Counter counterTotalAppWithHugePartitionNum;
- public static Counter counterTotalPartitionNum;
- public static Counter counterTotalHugePartitionNum;
+ public static Counter.Child counterTotalAppNum;
+ public static Counter.Child counterTotalAppWithHugePartitionNum;
+ public static Counter.Child counterTotalPartitionNum;
+ public static Counter.Child counterTotalHugePartitionNum;
- public static Counter counterTotalReceivedDataSize;
- public static Counter counterTotalWriteDataSize;
- public static Counter counterTotalWriteBlockSize;
- public static Counter counterTotalWriteTime;
- public static Counter counterWriteException;
- public static Counter counterWriteSlow;
- public static Counter counterWriteTotal;
- public static Counter counterEventSizeThresholdLevel1;
- public static Counter counterEventSizeThresholdLevel2;
- public static Counter counterEventSizeThresholdLevel3;
- public static Counter counterEventSizeThresholdLevel4;
- public static Counter counterTotalReadDataSize;
- public static Counter counterTotalReadLocalDataFileSize;
- public static Counter counterTotalReadLocalIndexFileSize;
- public static Counter counterTotalReadMemoryDataSize;
- public static Counter counterTotalReadTime;
- public static Counter counterTotalFailedWrittenEventNum;
- public static Counter counterTotalDroppedEventNum;
- public static Counter counterTotalHdfsWriteDataSize;
- public static Counter counterTotalLocalFileWriteDataSize;
- public static Counter counterTotalRequireBufferFailed;
- public static Counter counterTotalRequireBufferFailedForHugePartition;
- public static Counter counterTotalRequireBufferFailedForRegularPartition;
+ public static Counter.Child counterTotalReceivedDataSize;
+ public static Counter.Child counterTotalWriteDataSize;
+ public static Counter.Child counterTotalWriteBlockSize;
+ public static Counter.Child counterTotalWriteTime;
+ public static Counter.Child counterWriteException;
+ public static Counter.Child counterWriteSlow;
+ public static Counter.Child counterWriteTotal;
+ public static Counter.Child counterEventSizeThresholdLevel1;
+ public static Counter.Child counterEventSizeThresholdLevel2;
+ public static Counter.Child counterEventSizeThresholdLevel3;
+ public static Counter.Child counterEventSizeThresholdLevel4;
+ public static Counter.Child counterTotalReadDataSize;
+ public static Counter.Child counterTotalReadLocalDataFileSize;
+ public static Counter.Child counterTotalReadLocalIndexFileSize;
+ public static Counter.Child counterTotalReadMemoryDataSize;
+ public static Counter.Child counterTotalReadTime;
+ public static Counter.Child counterTotalFailedWrittenEventNum;
+ public static Counter.Child counterTotalDroppedEventNum;
+ public static Counter.Child counterTotalHdfsWriteDataSize;
+ public static Counter.Child counterTotalLocalFileWriteDataSize;
+ public static Counter.Child counterTotalRequireBufferFailed;
+ public static Counter.Child counterTotalRequireBufferFailedForHugePartition;
+ public static Counter.Child
counterTotalRequireBufferFailedForRegularPartition;
- public static Counter counterLocalStorageTotalWrite;
- public static Counter counterLocalStorageRetryWrite;
- public static Counter counterLocalStorageFailedWrite;
- public static Counter counterLocalStorageSuccessWrite;
- public static Counter counterTotalRequireReadMemoryNum;
- public static Counter counterTotalRequireReadMemoryRetryNum;
- public static Counter counterTotalRequireReadMemoryFailedNum;
+ public static Counter.Child counterLocalStorageTotalWrite;
+ public static Counter.Child counterLocalStorageRetryWrite;
+ public static Counter.Child counterLocalStorageFailedWrite;
+ public static Counter.Child counterLocalStorageSuccessWrite;
+ public static Counter.Child counterTotalRequireReadMemoryNum;
+ public static Counter.Child counterTotalRequireReadMemoryRetryNum;
+ public static Counter.Child counterTotalRequireReadMemoryFailedNum;
- public static Gauge gaugeHugePartitionNum;
- public static Gauge gaugeAppWithHugePartitionNum;
+ public static Gauge.Child gaugeHugePartitionNum;
+ public static Gauge.Child gaugeAppWithHugePartitionNum;
- public static Gauge gaugeLocalStorageTotalDirsNum;
- public static Gauge gaugeLocalStorageCorruptedDirsNum;
- public static Gauge gaugeLocalStorageTotalSpace;
- public static Gauge gaugeLocalStorageUsedSpace;
- public static Gauge gaugeLocalStorageUsedSpaceRatio;
+ public static Gauge.Child gaugeLocalStorageTotalDirsNum;
+ public static Gauge.Child gaugeLocalStorageCorruptedDirsNum;
+ public static Gauge.Child gaugeLocalStorageTotalSpace;
+ public static Gauge.Child gaugeLocalStorageUsedSpace;
+ public static Gauge.Child gaugeLocalStorageUsedSpaceRatio;
- public static Gauge gaugeIsHealthy;
- public static Gauge gaugeAllocatedBufferSize;
- public static Gauge gaugeInFlushBufferSize;
- public static Gauge gaugeUsedBufferSize;
- public static Gauge gaugeReadBufferUsedSize;
- public static Gauge gaugeWriteHandler;
- public static Gauge gaugeEventQueueSize;
- public static Gauge gaugeAppNum;
- public static Gauge gaugeTotalPartitionNum;
+ public static Gauge.Child gaugeIsHealthy;
+ public static Gauge.Child gaugeAllocatedBufferSize;
+ public static Gauge.Child gaugeInFlushBufferSize;
+ public static Gauge.Child gaugeUsedBufferSize;
+ public static Gauge.Child gaugeReadBufferUsedSize;
+ public static Gauge.Child gaugeWriteHandler;
+ public static Gauge.Child gaugeEventQueueSize;
+ public static Gauge.Child gaugeAppNum;
+ public static Gauge.Child gaugeTotalPartitionNum;
public static Counter counterRemoteStorageTotalWrite;
public static Counter counterRemoteStorageRetryWrite;
public static Counter counterRemoteStorageFailedWrite;
public static Counter counterRemoteStorageSuccessWrite;
+ private static String tags;
private static MetricsManager metricsManager;
private static boolean isRegister = false;
- public static synchronized void register(CollectorRegistry
collectorRegistry) {
+ public static synchronized void register(CollectorRegistry
collectorRegistry, String tags) {
if (!isRegister) {
- metricsManager = new MetricsManager(collectorRegistry);
+ ShuffleServerMetrics.tags = tags;
+ Map<String, String> labels = Maps.newHashMap();
+ labels.put(Constants.METRICS_TAG_LABEL_NAME, ShuffleServerMetrics.tags);
+ metricsManager = new MetricsManager(collectorRegistry, labels);
isRegister = true;
setUpMetrics();
}
@@ -163,7 +171,7 @@ public class ShuffleServerMetrics {
@VisibleForTesting
public static void register() {
- register(CollectorRegistry.defaultRegistry);
+ register(CollectorRegistry.defaultRegistry,
Constants.SHUFFLE_SERVER_VERSION);
}
@VisibleForTesting
@@ -182,8 +190,8 @@ public class ShuffleServerMetrics {
counterLocalStorageRetryWrite.inc();
} else {
if (!StringUtils.isEmpty(storageHost)) {
- counterRemoteStorageTotalWrite.labels(storageHost).inc();
- counterRemoteStorageRetryWrite.labels(storageHost).inc();
+ counterRemoteStorageTotalWrite.labels(tags, storageHost).inc();
+ counterRemoteStorageRetryWrite.labels(tags, storageHost).inc();
}
}
}
@@ -194,8 +202,8 @@ public class ShuffleServerMetrics {
counterLocalStorageSuccessWrite.inc();
} else {
if (!StringUtils.isEmpty(storageHost)) {
- counterRemoteStorageTotalWrite.labels(storageHost).inc();
- counterRemoteStorageSuccessWrite.labels(storageHost).inc();
+ counterRemoteStorageTotalWrite.labels(tags, storageHost).inc();
+ counterRemoteStorageSuccessWrite.labels(tags, storageHost).inc();
}
}
}
@@ -206,73 +214,76 @@ public class ShuffleServerMetrics {
counterLocalStorageFailedWrite.inc();
} else {
if (!StringUtils.isEmpty(storageHost)) {
- counterRemoteStorageTotalWrite.labels(storageHost).inc();
- counterRemoteStorageFailedWrite.labels(storageHost).inc();
+ counterRemoteStorageTotalWrite.labels(tags, storageHost).inc();
+ counterRemoteStorageFailedWrite.labels(tags, storageHost).inc();
}
}
}
private static void setUpMetrics() {
- counterTotalReceivedDataSize =
metricsManager.addCounter(TOTAL_RECEIVED_DATA);
- counterTotalWriteDataSize = metricsManager.addCounter(TOTAL_WRITE_DATA);
- counterTotalWriteBlockSize = metricsManager.addCounter(TOTAL_WRITE_BLOCK);
- counterTotalWriteTime = metricsManager.addCounter(TOTAL_WRITE_TIME);
- counterWriteException = metricsManager.addCounter(TOTAL_WRITE_EXCEPTION);
- counterWriteSlow = metricsManager.addCounter(TOTAL_WRITE_SLOW);
- counterWriteTotal = metricsManager.addCounter(TOTAL_WRITE_NUM);
- counterEventSizeThresholdLevel1 =
metricsManager.addCounter(EVENT_SIZE_THRESHOLD_LEVEL1);
- counterEventSizeThresholdLevel2 =
metricsManager.addCounter(EVENT_SIZE_THRESHOLD_LEVEL2);
- counterEventSizeThresholdLevel3 =
metricsManager.addCounter(EVENT_SIZE_THRESHOLD_LEVEL3);
- counterEventSizeThresholdLevel4 =
metricsManager.addCounter(EVENT_SIZE_THRESHOLD_LEVEL4);
- counterTotalReadDataSize = metricsManager.addCounter(TOTAL_READ_DATA);
- counterTotalReadLocalDataFileSize =
metricsManager.addCounter(TOTAL_READ_LOCAL_DATA_FILE);
- counterTotalReadLocalIndexFileSize =
metricsManager.addCounter(TOTAL_READ_LOCAL_INDEX_FILE);
- counterTotalReadMemoryDataSize =
metricsManager.addCounter(TOTAL_READ_MEMORY_DATA);
- counterTotalReadTime = metricsManager.addCounter(TOTAL_READ_TIME);
- counterTotalDroppedEventNum =
metricsManager.addCounter(TOTAL_DROPPED_EVENT_NUM);
- counterTotalFailedWrittenEventNum =
metricsManager.addCounter(TOTAL_FAILED_WRITTEN_EVENT_NUM);
- counterTotalHdfsWriteDataSize =
metricsManager.addCounter(TOTAL_HDFS_WRITE_DATA);
- counterTotalLocalFileWriteDataSize =
metricsManager.addCounter(TOTAL_LOCALFILE_WRITE_DATA);
- counterTotalRequireBufferFailed =
metricsManager.addCounter(TOTAL_REQUIRE_BUFFER_FAILED);
+ counterTotalReceivedDataSize =
metricsManager.addLabeledCounter(TOTAL_RECEIVED_DATA);
+ counterTotalWriteDataSize =
metricsManager.addLabeledCounter(TOTAL_WRITE_DATA);
+ counterTotalWriteBlockSize =
metricsManager.addLabeledCounter(TOTAL_WRITE_BLOCK);
+ counterTotalWriteTime = metricsManager.addLabeledCounter(TOTAL_WRITE_TIME);
+ counterWriteException =
metricsManager.addLabeledCounter(TOTAL_WRITE_EXCEPTION);
+ counterWriteSlow = metricsManager.addLabeledCounter(TOTAL_WRITE_SLOW);
+ counterWriteTotal = metricsManager.addLabeledCounter(TOTAL_WRITE_NUM);
+ counterEventSizeThresholdLevel1 =
metricsManager.addLabeledCounter(EVENT_SIZE_THRESHOLD_LEVEL1);
+ counterEventSizeThresholdLevel2 =
metricsManager.addLabeledCounter(EVENT_SIZE_THRESHOLD_LEVEL2);
+ counterEventSizeThresholdLevel3 =
metricsManager.addLabeledCounter(EVENT_SIZE_THRESHOLD_LEVEL3);
+ counterEventSizeThresholdLevel4 =
metricsManager.addLabeledCounter(EVENT_SIZE_THRESHOLD_LEVEL4);
+ counterTotalReadDataSize =
metricsManager.addLabeledCounter(TOTAL_READ_DATA);
+ counterTotalReadLocalDataFileSize =
metricsManager.addLabeledCounter(TOTAL_READ_LOCAL_DATA_FILE);
+ counterTotalReadLocalIndexFileSize =
metricsManager.addLabeledCounter(TOTAL_READ_LOCAL_INDEX_FILE);
+ counterTotalReadMemoryDataSize =
metricsManager.addLabeledCounter(TOTAL_READ_MEMORY_DATA);
+ counterTotalReadTime = metricsManager.addLabeledCounter(TOTAL_READ_TIME);
+ counterTotalDroppedEventNum =
metricsManager.addLabeledCounter(TOTAL_DROPPED_EVENT_NUM);
+ counterTotalFailedWrittenEventNum =
metricsManager.addLabeledCounter(TOTAL_FAILED_WRITTEN_EVENT_NUM);
+ counterTotalHdfsWriteDataSize =
metricsManager.addLabeledCounter(TOTAL_HDFS_WRITE_DATA);
+ counterTotalLocalFileWriteDataSize =
metricsManager.addLabeledCounter(TOTAL_LOCALFILE_WRITE_DATA);
+ counterTotalRequireBufferFailed =
metricsManager.addLabeledCounter(TOTAL_REQUIRE_BUFFER_FAILED);
counterTotalRequireBufferFailedForRegularPartition =
-
metricsManager.addCounter(TOTAL_REQUIRE_BUFFER_FAILED_FOR_REGULAR_PARTITION);
+
metricsManager.addLabeledCounter(TOTAL_REQUIRE_BUFFER_FAILED_FOR_REGULAR_PARTITION);
counterTotalRequireBufferFailedForHugePartition =
-
metricsManager.addCounter(TOTAL_REQUIRE_BUFFER_FAILED_FOR_HUGE_PARTITION);
- counterLocalStorageTotalWrite =
metricsManager.addCounter(STORAGE_TOTAL_WRITE_LOCAL);
- counterLocalStorageRetryWrite =
metricsManager.addCounter(STORAGE_RETRY_WRITE_LOCAL);
- counterLocalStorageFailedWrite =
metricsManager.addCounter(STORAGE_FAILED_WRITE_LOCAL);
- counterLocalStorageSuccessWrite =
metricsManager.addCounter(STORAGE_SUCCESS_WRITE_LOCAL);
- counterRemoteStorageTotalWrite =
metricsManager.addCounter(STORAGE_TOTAL_WRITE_REMOTE, STORAGE_HOST_LABEL);
- counterRemoteStorageRetryWrite =
metricsManager.addCounter(STORAGE_RETRY_WRITE_REMOTE, STORAGE_HOST_LABEL);
- counterRemoteStorageFailedWrite =
metricsManager.addCounter(STORAGE_FAILED_WRITE_REMOTE, STORAGE_HOST_LABEL);
- counterRemoteStorageSuccessWrite =
metricsManager.addCounter(STORAGE_SUCCESS_WRITE_REMOTE, STORAGE_HOST_LABEL);
- counterTotalRequireReadMemoryNum =
metricsManager.addCounter(TOTAL_REQUIRE_READ_MEMORY);
- counterTotalRequireReadMemoryRetryNum =
metricsManager.addCounter(TOTAL_REQUIRE_READ_MEMORY_RETRY);
- counterTotalRequireReadMemoryFailedNum =
metricsManager.addCounter(TOTAL_REQUIRE_READ_MEMORY_FAILED);
+
metricsManager.addLabeledCounter(TOTAL_REQUIRE_BUFFER_FAILED_FOR_HUGE_PARTITION);
+ counterLocalStorageTotalWrite =
metricsManager.addLabeledCounter(STORAGE_TOTAL_WRITE_LOCAL);
+ counterLocalStorageRetryWrite =
metricsManager.addLabeledCounter(STORAGE_RETRY_WRITE_LOCAL);
+ counterLocalStorageFailedWrite =
metricsManager.addLabeledCounter(STORAGE_FAILED_WRITE_LOCAL);
+ counterLocalStorageSuccessWrite =
metricsManager.addLabeledCounter(STORAGE_SUCCESS_WRITE_LOCAL);
+ counterRemoteStorageTotalWrite = metricsManager.addCounter(
+ STORAGE_TOTAL_WRITE_REMOTE, Constants.METRICS_TAG_LABEL_NAME,
STORAGE_HOST_LABEL);
+ counterRemoteStorageRetryWrite = metricsManager.addCounter(
+ STORAGE_RETRY_WRITE_REMOTE, Constants.METRICS_TAG_LABEL_NAME,
STORAGE_HOST_LABEL);
+ counterRemoteStorageFailedWrite = metricsManager.addCounter(
+ STORAGE_FAILED_WRITE_REMOTE, Constants.METRICS_TAG_LABEL_NAME,
STORAGE_HOST_LABEL);
+ counterRemoteStorageSuccessWrite = metricsManager.addCounter(
+ STORAGE_SUCCESS_WRITE_REMOTE, Constants.METRICS_TAG_LABEL_NAME,
STORAGE_HOST_LABEL);
+ counterTotalRequireReadMemoryNum =
metricsManager.addLabeledCounter(TOTAL_REQUIRE_READ_MEMORY);
+ counterTotalRequireReadMemoryRetryNum =
metricsManager.addLabeledCounter(TOTAL_REQUIRE_READ_MEMORY_RETRY);
+ counterTotalRequireReadMemoryFailedNum =
metricsManager.addLabeledCounter(TOTAL_REQUIRE_READ_MEMORY_FAILED);
- counterTotalAppNum = metricsManager.addCounter(TOTAL_APP_NUM);
- counterTotalAppWithHugePartitionNum =
metricsManager.addCounter(TOTAL_APP_WITH_HUGE_PARTITION_NUM);
- counterTotalPartitionNum = metricsManager.addCounter(TOTAL_PARTITION_NUM);
- counterTotalHugePartitionNum =
metricsManager.addCounter(TOTAL_HUGE_PARTITION_NUM);
+ counterTotalAppNum = metricsManager.addLabeledCounter(TOTAL_APP_NUM);
+ counterTotalAppWithHugePartitionNum =
metricsManager.addLabeledCounter(TOTAL_APP_WITH_HUGE_PARTITION_NUM);
+ counterTotalPartitionNum =
metricsManager.addLabeledCounter(TOTAL_PARTITION_NUM);
+ counterTotalHugePartitionNum =
metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_NUM);
- gaugeLocalStorageTotalDirsNum =
metricsManager.addGauge(LOCAL_STORAGE_TOTAL_DIRS_NUM);
- gaugeLocalStorageCorruptedDirsNum =
metricsManager.addGauge(LOCAL_STORAGE_CORRUPTED_DIRS_NUM);
- gaugeLocalStorageTotalSpace =
metricsManager.addGauge(LOCAL_STORAGE_TOTAL_SPACE);
- gaugeLocalStorageUsedSpace =
metricsManager.addGauge(LOCAL_STORAGE_USED_SPACE);
- gaugeLocalStorageUsedSpaceRatio =
metricsManager.addGauge(LOCAL_STORAGE_USED_SPACE_RATIO);
+ gaugeLocalStorageTotalDirsNum =
metricsManager.addLabeledGauge(LOCAL_STORAGE_TOTAL_DIRS_NUM);
+ gaugeLocalStorageCorruptedDirsNum =
metricsManager.addLabeledGauge(LOCAL_STORAGE_CORRUPTED_DIRS_NUM);
+ gaugeLocalStorageTotalSpace =
metricsManager.addLabeledGauge(LOCAL_STORAGE_TOTAL_SPACE);
+ gaugeLocalStorageUsedSpace =
metricsManager.addLabeledGauge(LOCAL_STORAGE_USED_SPACE);
+ gaugeLocalStorageUsedSpaceRatio =
metricsManager.addLabeledGauge(LOCAL_STORAGE_USED_SPACE_RATIO);
- gaugeIsHealthy = metricsManager.addGauge(IS_HEALTHY);
- gaugeAllocatedBufferSize = metricsManager.addGauge(ALLOCATED_BUFFER_SIZE);
- gaugeInFlushBufferSize = metricsManager.addGauge(IN_FLUSH_BUFFER_SIZE);
- gaugeUsedBufferSize = metricsManager.addGauge(USED_BUFFER_SIZE);
- gaugeReadBufferUsedSize = metricsManager.addGauge(READ_USED_BUFFER_SIZE);
- gaugeWriteHandler = metricsManager.addGauge(TOTAL_WRITE_HANDLER);
- gaugeEventQueueSize = metricsManager.addGauge(EVENT_QUEUE_SIZE);
- gaugeAppNum = metricsManager.addGauge(APP_NUM_WITH_NODE);
- gaugeTotalPartitionNum = metricsManager.addGauge(PARTITION_NUM_WITH_NODE);
+ gaugeIsHealthy = metricsManager.addLabeledGauge(IS_HEALTHY);
+ gaugeAllocatedBufferSize =
metricsManager.addLabeledGauge(ALLOCATED_BUFFER_SIZE);
+ gaugeInFlushBufferSize =
metricsManager.addLabeledGauge(IN_FLUSH_BUFFER_SIZE);
+ gaugeUsedBufferSize = metricsManager.addLabeledGauge(USED_BUFFER_SIZE);
+ gaugeReadBufferUsedSize =
metricsManager.addLabeledGauge(READ_USED_BUFFER_SIZE);
+ gaugeWriteHandler = metricsManager.addLabeledGauge(TOTAL_WRITE_HANDLER);
+ gaugeEventQueueSize = metricsManager.addLabeledGauge(EVENT_QUEUE_SIZE);
+ gaugeAppNum = metricsManager.addLabeledGauge(APP_NUM_WITH_NODE);
+ gaugeTotalPartitionNum =
metricsManager.addLabeledGauge(PARTITION_NUM_WITH_NODE);
- gaugeHugePartitionNum = metricsManager.addGauge(HUGE_PARTITION_NUM);
- gaugeAppWithHugePartitionNum =
metricsManager.addGauge(APP_WITH_HUGE_PARTITION_NUM);
+ gaugeHugePartitionNum = metricsManager.addLabeledGauge(HUGE_PARTITION_NUM);
+ gaugeAppWithHugePartitionNum =
metricsManager.addLabeledGauge(APP_WITH_HUGE_PARTITION_NUM);
}
-
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 54f4a0b5..53eb4ef8 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -52,6 +52,7 @@ import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.util.ChecksumUtils;
+import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.event.AppPurgeEvent;
import org.apache.uniffle.server.storage.HdfsStorageManager;
@@ -186,10 +187,14 @@ public class ShuffleFlushManagerTest extends HdfsTestBase
{
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
storageManager.registerRemoteStorage(appId, remoteStorage);
String storageHost = cluster.getURI().getHost();
- assertEquals(0.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(storageHost).get(),
0.5);
- assertEquals(0.0,
ShuffleServerMetrics.counterRemoteStorageRetryWrite.labels(storageHost).get(),
0.5);
- assertEquals(0.0,
ShuffleServerMetrics.counterRemoteStorageFailedWrite.labels(storageHost).get(),
0.5);
- assertEquals(0.0,
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.labels(storageHost).get(),
0.5);
+ assertEquals(0.0, ShuffleServerMetrics.counterRemoteStorageTotalWrite
+ .labels(Constants.SHUFFLE_SERVER_VERSION, storageHost).get(), 0.5);
+ assertEquals(0.0, ShuffleServerMetrics.counterRemoteStorageRetryWrite
+ .labels(Constants.SHUFFLE_SERVER_VERSION, storageHost).get(), 0.5);
+ assertEquals(0.0, ShuffleServerMetrics.counterRemoteStorageFailedWrite
+ .labels(Constants.SHUFFLE_SERVER_VERSION, storageHost).get(), 0.5);
+ assertEquals(0.0, ShuffleServerMetrics.counterRemoteStorageSuccessWrite
+ .labels(Constants.SHUFFLE_SERVER_VERSION, storageHost).get(), 0.5);
ShuffleFlushManager manager =
new ShuffleFlushManager(shuffleServerConf, mockShuffleServer,
storageManager);
ShuffleDataFlushEvent event1 =
@@ -214,8 +219,10 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
validate(appId, 2, 2, blocks21, 1, remoteStorage.getPath());
assertEquals(blocks21.size(), manager.getCommittedBlockIds(appId,
2).getLongCardinality());
- assertEquals(3.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(storageHost).get(),
0.5);
- assertEquals(3.0,
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.labels(storageHost).get(),
0.5);
+ assertEquals(3.0, ShuffleServerMetrics.counterRemoteStorageTotalWrite
+ .labels(Constants.SHUFFLE_SERVER_VERSION, storageHost).get(), 0.5);
+ assertEquals(3.0, ShuffleServerMetrics.counterRemoteStorageSuccessWrite
+ .labels(Constants.SHUFFLE_SERVER_VERSION, storageHost).get(), 0.5);
// test case for process event whose related app was cleared already
assertEquals(0, ShuffleServerMetrics.gaugeWriteHandler.get(), 0.5);
@@ -378,7 +385,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
assertEquals(0, storage.getHandlerSize());
}
- private void waitForMetrics(Gauge gauge, double expected, double delta)
throws Exception {
+ private void waitForMetrics(Gauge.Child gauge, double expected, double
delta) throws Exception {
int retry = 0;
boolean match = false;
do {
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
index 6f5c39db..67b750fc 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
@@ -23,12 +23,14 @@ import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Summary;
import org.junit.jupiter.api.Test;
+import org.apache.uniffle.common.util.Constants;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ShuffleServerGrpcMetricsTest {
@Test
public void testLatencyMetrics() {
- ShuffleServerGrpcMetrics metrics = new ShuffleServerGrpcMetrics();
+ ShuffleServerGrpcMetrics metrics = new
ShuffleServerGrpcMetrics(Constants.SHUFFLE_SERVER_VERSION);
metrics.register(new CollectorRegistry(true));
metrics.recordTransportTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD,
1000);
metrics.recordTransportTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD,
500);
@@ -36,8 +38,8 @@ public class ShuffleServerGrpcMetricsTest {
metrics.recordProcessTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD,
1000);
metrics.recordProcessTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD,
500);
metrics.recordProcessTime(ShuffleServerGrpcMetrics.GET_MEMORY_SHUFFLE_DATA_METHOD,
200);
- Map<String, Summary> sendTimeSummaryTime =
metrics.getTransportTimeSummaryMap();
- Map<String, Summary> processTimeSummaryTime =
metrics.getProcessTimeSummaryMap();
+ Map<String, Summary.Child> sendTimeSummaryTime =
metrics.getTransportTimeSummaryMap();
+ Map<String, Summary.Child> processTimeSummaryTime =
metrics.getProcessTimeSummaryMap();
assertEquals(3, sendTimeSummaryTime.size());
assertEquals(3, processTimeSummaryTime.size());
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index 93f48f8a..540184a3 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.metrics.TestUtils;
+import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.util.StorageType;
@@ -86,10 +87,14 @@ public class ShuffleServerMetricsTest {
@Test
public void testServerMetrics() throws Exception {
-
ShuffleServerMetrics.counterRemoteStorageFailedWrite.labels(STORAGE_HOST).inc(0);
-
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.labels(STORAGE_HOST).inc(0);
-
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(STORAGE_HOST).inc(0);
-
ShuffleServerMetrics.counterRemoteStorageRetryWrite.labels(STORAGE_HOST).inc(0);
+ ShuffleServerMetrics.counterRemoteStorageFailedWrite
+ .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).inc(0);
+ ShuffleServerMetrics.counterRemoteStorageSuccessWrite
+ .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).inc(0);
+ ShuffleServerMetrics.counterRemoteStorageTotalWrite
+ .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).inc(0);
+ ShuffleServerMetrics.counterRemoteStorageRetryWrite
+ .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).inc(0);
String content = TestUtils.httpGet(SERVER_METRICS_URL);
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(content);
@@ -101,7 +106,7 @@ public class ShuffleServerMetricsTest {
ShuffleServerMetrics.STORAGE_FAILED_WRITE_REMOTE,
ShuffleServerMetrics.STORAGE_RETRY_WRITE_REMOTE);
for (String expectMetricName : expectedMetricNames) {
- validateMetrics(mapper, metricsNode, expectMetricName, STORAGE_HOST);
+ validateMetrics(mapper, metricsNode, expectMetricName,
Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST);
}
}
@@ -134,14 +139,20 @@ public class ShuffleServerMetricsTest {
// test for remote storage
ShuffleServerMetrics.incStorageRetryCounter(STORAGE_HOST);
- assertEquals(1.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(STORAGE_HOST).get(),
0.5);
- assertEquals(1.0,
ShuffleServerMetrics.counterRemoteStorageRetryWrite.labels(STORAGE_HOST).get(),
0.5);
+ assertEquals(1.0, ShuffleServerMetrics.counterRemoteStorageTotalWrite
+ .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).get(), 0.5);
+ assertEquals(1.0, ShuffleServerMetrics.counterRemoteStorageRetryWrite
+ .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).get(), 0.5);
ShuffleServerMetrics.incStorageSuccessCounter(STORAGE_HOST);
- assertEquals(2.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(STORAGE_HOST).get(),
0.5);
- assertEquals(1.0,
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.labels(STORAGE_HOST).get(),
0.5);
+ assertEquals(2.0, ShuffleServerMetrics.counterRemoteStorageTotalWrite
+ .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).get(), 0.5);
+ assertEquals(1.0, ShuffleServerMetrics.counterRemoteStorageSuccessWrite
+ .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).get(), 0.5);
ShuffleServerMetrics.incStorageFailedCounter(STORAGE_HOST);
- assertEquals(3.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(STORAGE_HOST).get(),
0.5);
- assertEquals(1.0,
ShuffleServerMetrics.counterRemoteStorageFailedWrite.labels(STORAGE_HOST).get(),
0.5);
+ assertEquals(3.0, ShuffleServerMetrics.counterRemoteStorageTotalWrite
+ .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).get(), 0.5);
+ assertEquals(1.0, ShuffleServerMetrics.counterRemoteStorageFailedWrite
+ .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).get(), 0.5);
}
@Test