This is an automated email from the ASF dual-hosted git repository.

smallzhongfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new f361ac07 [#778] feat: Separate `ShuffleServer` metrics through tags 
(#812)
f361ac07 is described below

commit f361ac079a6d61a40e9ad384f77e5b354e875282
Author: jokercurry <[email protected]>
AuthorDate: Mon Apr 17 19:09:48 2023 +0800

    [#778] feat: Separate `ShuffleServer` metrics through tags (#812)
    
    <!--
    1. Title: [#<issue>] <type>(<scope>): <subject>
       Examples:
         - "[#123] feat(operator): support xxx"
         - "[#233] fix: check null before access result in xxx"
         - "[MINOR] refactor: fix typo in variable name"
         - "[MINOR] docs: fix typo in README"
         - "[#255] test: fix flaky test NameOfTheTest"
       Reference: https://www.conventionalcommits.org/en/v1.0.0/
    2. Contributor guidelines:
       https://github.com/apache/incubator-uniffle/blob/master/CONTRIBUTING.md
    3. If the PR is unfinished, please mark this PR as draft.
    -->
    
    ### What changes were proposed in this pull request?
    Add `tag` as label in the metrics.
    
    So that we can separate the metrics in Grafana.
    
    ### Why are the changes needed?
    Fix: #778
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    fix uts.
---
 .../uniffle/common/metrics/EmptyGRPCMetrics.java   |   5 +
 .../apache/uniffle/common/metrics/GRPCMetrics.java |  71 +++---
 .../uniffle/common/metrics/MetricsManager.java     |  35 ++-
 .../org/apache/uniffle/common/util/Constants.java  |   2 +
 .../uniffle/common/metrics/MetricsManagerTest.java |   7 +-
 .../coordinator/metric/CoordinatorGrpcMetrics.java |  16 +-
 .../coordinator/metric/CoordinatorMetrics.java     |   6 +-
 .../org/apache/uniffle/server/ShuffleServer.java   |  19 +-
 .../uniffle/server/ShuffleServerGrpcMetrics.java   |  60 +++---
 .../uniffle/server/ShuffleServerMetrics.java       | 239 +++++++++++----------
 .../uniffle/server/ShuffleFlushManagerTest.java    |  21 +-
 .../server/ShuffleServerGrpcMetricsTest.java       |   8 +-
 .../uniffle/server/ShuffleServerMetricsTest.java   |  33 ++-
 13 files changed, 311 insertions(+), 211 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java
index 40750ecc..7b500fba 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java
@@ -18,6 +18,11 @@
 package org.apache.uniffle.common.metrics;
 
 public class EmptyGRPCMetrics extends GRPCMetrics {
+
+  public EmptyGRPCMetrics(String tags) {
+    super(tags);
+  }
+
   @Override
   public void registerMetrics() {
   }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
index de3faad1..eaac241f 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.common.metrics;
 
 import java.util.Map;
 
+import com.google.common.collect.Maps;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
@@ -39,19 +40,26 @@ public abstract class GRPCMetrics {
   private static final String GRPC_TOTAL = "grpc_total";
 
   private boolean isRegistered = false;
-  protected Map<String, Counter> counterMap = JavaUtils.newConcurrentMap();
-  protected Map<String, Gauge> gaugeMap = JavaUtils.newConcurrentMap();
-  protected Map<String, Summary> transportTimeSummaryMap = 
JavaUtils.newConcurrentMap();
-  protected Map<String, Summary> processTimeSummaryMap = 
JavaUtils.newConcurrentMap();
-  private Gauge gaugeGrpcOpen;
-  private Counter counterGrpcTotal;
+  protected Map<String, Counter.Child> counterMap = 
JavaUtils.newConcurrentMap();
+  protected Map<String, Gauge.Child> gaugeMap = JavaUtils.newConcurrentMap();
+  protected Map<String, Summary.Child> transportTimeSummaryMap = 
JavaUtils.newConcurrentMap();
+  protected Map<String, Summary.Child> processTimeSummaryMap = 
JavaUtils.newConcurrentMap();
+  protected Gauge.Child gaugeGrpcOpen;
+  protected Counter.Child counterGrpcTotal;
   protected MetricsManager metricsManager;
+  protected String tags;
+
+  public GRPCMetrics(String tags) {
+    this.tags = tags;
+  }
 
   public abstract void registerMetrics();
 
   public void register(CollectorRegistry collectorRegistry) {
     if (!isRegistered) {
-      metricsManager = new MetricsManager(collectorRegistry);
+      Map<String, String> labels = Maps.newHashMap();
+      labels.put(Constants.METRICS_TAG_LABEL_NAME, tags);
+      metricsManager = new MetricsManager(collectorRegistry, labels);
       registerGeneralMetrics();
       registerMetrics();
       isRegistered = true;
@@ -59,25 +67,22 @@ public abstract class GRPCMetrics {
   }
 
   private void registerGeneralMetrics() {
-    gaugeGrpcOpen = metricsManager.addGauge(GRPC_OPEN);
-    counterGrpcTotal = metricsManager.addCounter(GRPC_TOTAL);
-    gaugeMap.putIfAbsent(
-        GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY,
-        metricsManager.addGauge(GRPC_SERVER_EXECUTOR_ACTIVE_THREADS)
+    gaugeGrpcOpen = metricsManager.addLabeledGauge(GRPC_OPEN);
+    counterGrpcTotal = metricsManager.addLabeledCounter(GRPC_TOTAL);
+    gaugeMap.putIfAbsent(GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY,
+        metricsManager.addLabeledGauge(GRPC_SERVER_EXECUTOR_ACTIVE_THREADS)
     );
-    gaugeMap.putIfAbsent(
-        GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE_KEY,
-        metricsManager.addGauge(GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE)
+    gaugeMap.putIfAbsent(GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE_KEY,
+        
metricsManager.addLabeledGauge(GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE)
     );
-    gaugeMap.putIfAbsent(
-            GRPC_SERVER_CONNECTION_NUMBER_KEY,
-        metricsManager.addGauge(GRPC_SERVER_CONNECTION_NUMBER)
+    gaugeMap.putIfAbsent(GRPC_SERVER_CONNECTION_NUMBER_KEY,
+        metricsManager.addLabeledGauge(GRPC_SERVER_CONNECTION_NUMBER)
     );
   }
 
   public void setGauge(String tag, double value) {
     if (isRegistered) {
-      Gauge gauge = gaugeMap.get(tag);
+      Gauge.Child gauge = gaugeMap.get(tag);
       if (gauge != null) {
         gauge.set(value);
       }
@@ -90,7 +95,7 @@ public abstract class GRPCMetrics {
 
   public void incGauge(String tag, double value) {
     if (isRegistered) {
-      Gauge gauge = gaugeMap.get(tag);
+      Gauge.Child gauge = gaugeMap.get(tag);
       if (gauge != null) {
         gauge.inc(value);
       }
@@ -103,7 +108,7 @@ public abstract class GRPCMetrics {
 
   public void decGauge(String tag, double value) {
     if (isRegistered) {
-      Gauge gauge = gaugeMap.get(tag);
+      Gauge.Child gauge = gaugeMap.get(tag);
       if (gauge != null) {
         gauge.dec(value);
       }
@@ -112,11 +117,11 @@ public abstract class GRPCMetrics {
 
   public void incCounter(String methodName) {
     if (isRegistered) {
-      Gauge gauge = gaugeMap.get(methodName);
+      Gauge.Child gauge = gaugeMap.get(methodName);
       if (gauge != null) {
         gauge.inc();
       }
-      Counter counter = counterMap.get(methodName);
+      Counter.Child counter = counterMap.get(methodName);
       if (counter != null) {
         counter.inc();
       }
@@ -127,7 +132,7 @@ public abstract class GRPCMetrics {
 
   public void decCounter(String methodName) {
     if (isRegistered) {
-      Gauge gauge = gaugeMap.get(methodName);
+      Gauge.Child gauge = gaugeMap.get(methodName);
       if (gauge != null) {
         gauge.dec();
       }
@@ -136,14 +141,14 @@ public abstract class GRPCMetrics {
   }
 
   public void recordTransportTime(String methodName, long 
transportTimeInMillionSecond) {
-    Summary summary = transportTimeSummaryMap.get(methodName);
+    Summary.Child summary = transportTimeSummaryMap.get(methodName);
     if (summary != null) {
       summary.observe(transportTimeInMillionSecond / 
Constants.MILLION_SECONDS_PER_SECOND);
     }
   }
 
   public void recordProcessTime(String methodName, long 
processTimeInMillionSecond) {
-    Summary summary = processTimeSummaryMap.get(methodName);
+    Summary.Child summary = processTimeSummaryMap.get(methodName);
     if (summary != null) {
       summary.observe(processTimeInMillionSecond / 
Constants.MILLION_SECONDS_PER_SECOND);
     }
@@ -153,31 +158,31 @@ public abstract class GRPCMetrics {
     return metricsManager.getCollectorRegistry();
   }
 
-  public Map<String, Counter> getCounterMap() {
+  public Map<String, Counter.Child> getCounterMap() {
     return counterMap;
   }
 
-  public Map<String, Gauge> getGaugeMap() {
+  public Map<String, Gauge.Child> getGaugeMap() {
     return gaugeMap;
   }
 
-  public Gauge getGaugeGrpcOpen() {
+  public Gauge.Child getGaugeGrpcOpen() {
     return gaugeGrpcOpen;
   }
 
-  public Counter getCounterGrpcTotal() {
+  public Counter.Child getCounterGrpcTotal() {
     return counterGrpcTotal;
   }
 
-  public Map<String, Summary> getTransportTimeSummaryMap() {
+  public Map<String, Summary.Child> getTransportTimeSummaryMap() {
     return transportTimeSummaryMap;
   }
 
-  public Map<String, Summary> getProcessTimeSummaryMap() {
+  public Map<String, Summary.Child> getProcessTimeSummaryMap() {
     return processTimeSummaryMap;
   }
 
   public static GRPCMetrics getEmptyGRPCMetrics() {
-    return new EmptyGRPCMetrics();
+    return new EmptyGRPCMetrics(Constants.SHUFFLE_SERVER_VERSION);
   }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
index 2981ca22..8366df4f 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
@@ -17,6 +17,10 @@
 
 package org.apache.uniffle.common.metrics;
 
+import java.util.Arrays;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
@@ -24,20 +28,26 @@ import io.prometheus.client.Histogram;
 import io.prometheus.client.Summary;
 
 public class MetricsManager {
-  private CollectorRegistry collectorRegistry;
+  private final CollectorRegistry collectorRegistry;
+  private final String[] defaultLabelNames;
+  private final String[] defaultLabelValues;
   private static final double[] QUANTILES = {0.50, 0.75, 0.90, 0.95, 0.99};
   private static final double QUANTILE_ERROR = 0.01;
 
   public MetricsManager() {
-    this(null);
+    this(null, Maps.newHashMap());
   }
 
-  public MetricsManager(CollectorRegistry collectorRegistry) {
+  public MetricsManager(CollectorRegistry collectorRegistry, Map<String, 
String> defaultLabels) {
     if (collectorRegistry == null) {
       this.collectorRegistry = CollectorRegistry.defaultRegistry;
     } else {
       this.collectorRegistry = collectorRegistry;
     }
+    this.defaultLabelNames = defaultLabels.keySet().toArray(new String[0]);
+    this.defaultLabelValues = Arrays.stream(defaultLabelNames)
+        .map(defaultLabels::get)
+        .toArray(String[]::new);
   }
 
   public CollectorRegistry getCollectorRegistry() {
@@ -52,6 +62,11 @@ public class MetricsManager {
     return 
Counter.build().name(name).labelNames(labels).help(help).register(collectorRegistry);
   }
 
+  public Counter.Child addLabeledCounter(String name) {
+    Counter c = addCounter(name, this.defaultLabelNames);
+    return c.labels(this.defaultLabelValues);
+  }
+
   public Gauge addGauge(String name, String... labels) {
     return addGauge(name, "Gauge " + name, labels);
   }
@@ -60,6 +75,11 @@ public class MetricsManager {
     return 
Gauge.build().name(name).labelNames(labels).help(help).register(collectorRegistry);
   }
 
+  public Gauge.Child addLabeledGauge(String name) {
+    Gauge c = addGauge(name, this.defaultLabelNames);
+    return c.labels(this.defaultLabelValues);
+  }
+
   public Histogram addHistogram(String name, double[] buckets, String... 
labels) {
     return addHistogram(name, "Histogram " + name, buckets, labels);
   }
@@ -75,4 +95,13 @@ public class MetricsManager {
     }
     return builder.register(collectorRegistry);
   }
+
+  public Summary.Child addLabeledSummary(String name) {
+    Summary.Builder builder =
+        Summary.build().name(name).labelNames(defaultLabelNames).help("Summary 
" + name);
+    for (int i = 0; i < QUANTILES.length; i++) {
+      builder = builder.quantile(QUANTILES[i], QUANTILE_ERROR);
+    }
+    return builder.register(collectorRegistry).labels(defaultLabelValues);
+  }
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java 
b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index 514cf0e7..af959c5b 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -24,6 +24,8 @@ public final class Constants {
 
   // the value is used for client/server compatible, eg, online upgrade
   public static final String SHUFFLE_SERVER_VERSION = "ss_v4";
+  public static final String METRICS_TAG_LABEL_NAME = "label";
+  public static final String COORDINATOR_TAG = "coordinator";
   public static final String SHUFFLE_DATA_FILE_SUFFIX = ".data";
   public static final String SHUFFLE_INDEX_FILE_SUFFIX = ".index";
   // BlockId is long and consist of partitionId, taskAttemptId, atomicInt
diff --git 
a/common/src/test/java/org/apache/uniffle/common/metrics/MetricsManagerTest.java
 
b/common/src/test/java/org/apache/uniffle/common/metrics/MetricsManagerTest.java
index 8c7fd591..d6019254 100644
--- 
a/common/src/test/java/org/apache/uniffle/common/metrics/MetricsManagerTest.java
+++ 
b/common/src/test/java/org/apache/uniffle/common/metrics/MetricsManagerTest.java
@@ -28,6 +28,8 @@ import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Gauge;
 import org.junit.jupiter.api.Test;
 
+import org.apache.uniffle.common.util.Constants;
+
 import static io.prometheus.client.Collector.MetricFamilySamples;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -38,9 +40,10 @@ public class MetricsManagerTest {
   public void testMetricsManager() {
     MetricsManager metricsManager = new MetricsManager();
     assertEquals(CollectorRegistry.defaultRegistry, 
metricsManager.getCollectorRegistry());
-
+    Map<String, String> labels = new HashMap<>();
+    labels.put(Constants.METRICS_TAG_LABEL_NAME, 
Constants.SHUFFLE_SERVER_VERSION);
     CollectorRegistry expectedRegistry = new CollectorRegistry();
-    metricsManager = new MetricsManager(expectedRegistry);
+    metricsManager = new MetricsManager(expectedRegistry, labels);
     assertEquals(expectedRegistry, metricsManager.getCollectorRegistry());
 
     String expectedName1 = "counter";
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorGrpcMetrics.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorGrpcMetrics.java
index 8a9999bf..922f4c68 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorGrpcMetrics.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorGrpcMetrics.java
@@ -18,6 +18,7 @@
 package org.apache.uniffle.coordinator.metric;
 
 import org.apache.uniffle.common.metrics.GRPCMetrics;
+import org.apache.uniffle.common.util.Constants;
 
 public class CoordinatorGrpcMetrics extends GRPCMetrics {
 
@@ -31,15 +32,16 @@ public class CoordinatorGrpcMetrics extends GRPCMetrics {
   private static final String GRPC_HEARTBEAT_TOTAL =
       "grpc_heartbeat_total";
 
+  public CoordinatorGrpcMetrics() {
+    super(Constants.COORDINATOR_TAG);
+  }
+
   @Override
   public void registerMetrics() {
-    gaugeMap.putIfAbsent(HEARTBEAT_METHOD,
-        metricsManager.addGauge(GRPC_HEARTBEAT));
-    gaugeMap.putIfAbsent(GET_SHUFFLE_ASSIGNMENTS_METHOD,
-        metricsManager.addGauge(GRPC_GET_SHUFFLE_ASSIGNMENTS));
-    counterMap.putIfAbsent(HEARTBEAT_METHOD,
-        metricsManager.addCounter(GRPC_HEARTBEAT_TOTAL));
+    gaugeMap.putIfAbsent(HEARTBEAT_METHOD, 
metricsManager.addLabeledGauge(GRPC_HEARTBEAT));
+    gaugeMap.putIfAbsent(GET_SHUFFLE_ASSIGNMENTS_METHOD, 
metricsManager.addLabeledGauge(GRPC_GET_SHUFFLE_ASSIGNMENTS));
+    counterMap.putIfAbsent(HEARTBEAT_METHOD, 
metricsManager.addLabeledCounter(GRPC_HEARTBEAT_TOTAL));
     counterMap.putIfAbsent(GET_SHUFFLE_ASSIGNMENTS_METHOD,
-        metricsManager.addCounter(GRPC_GET_SHUFFLE_ASSIGNMENTS_TOTAL));
+        metricsManager.addLabeledCounter(GRPC_GET_SHUFFLE_ASSIGNMENTS_TOTAL));
   }
 }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
index f06a339b..a4595b60 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
@@ -20,12 +20,14 @@ package org.apache.uniffle.coordinator.metric;
 import java.util.Map;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.uniffle.common.metrics.MetricsManager;
+import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RssUtils;
 
@@ -61,7 +63,9 @@ public class CoordinatorMetrics {
 
   public static synchronized void register(CollectorRegistry 
collectorRegistry) {
     if (!isRegister) {
-      metricsManager = new MetricsManager(collectorRegistry);
+      Map<String, String> labels = Maps.newHashMap();
+      labels.put(Constants.METRICS_TAG_LABEL_NAME, Constants.COORDINATOR_TAG);
+      metricsManager = new MetricsManager(collectorRegistry, labels);
       isRegister = true;
       setUpMetrics();
     }
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index c23dbb0a..cb9c7484 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -268,8 +268,9 @@ public class ShuffleServer {
   private void registerMetrics() {
     LOG.info("Register metrics");
     CollectorRegistry shuffleServerCollectorRegistry = new 
CollectorRegistry(true);
-    ShuffleServerMetrics.register(shuffleServerCollectorRegistry);
-    grpcMetrics = new ShuffleServerGrpcMetrics();
+    String tags = coverToString();
+    ShuffleServerMetrics.register(shuffleServerCollectorRegistry, tags);
+    grpcMetrics = new ShuffleServerGrpcMetrics(tags);
     grpcMetrics.register(new CollectorRegistry(true));
     CollectorRegistry jvmCollectorRegistry = new CollectorRegistry(true);
     boolean verbose = 
shuffleServerConf.getBoolean(ShuffleServerConf.RSS_JVM_METRICS_VERBOSE_ENABLE);
@@ -473,4 +474,18 @@ public class ShuffleServer {
   public int getNettyPort() {
     return nettyPort;
   }
+
+  public String coverToString() {
+    List<String> tags = shuffleServerConf.get(ShuffleServerConf.TAGS);
+    StringBuilder sb = new StringBuilder();
+    sb.append(Constants.SHUFFLE_SERVER_VERSION);
+    if (tags == null || tags.size() == 0) {
+      return sb.toString();
+    }
+    for (String tag : tags) {
+      sb.append(",");
+      sb.append(tag);
+    }
+    return sb.toString();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java
index f4e616e5..eb60fd90 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java
@@ -70,67 +70,71 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics {
   private static final String GRPC_GET_MEMORY_SHUFFLE_DATA_PROCESS_LATENCY =
       "grpc_get_memory_shuffle_data_process_latency";
 
+  public ShuffleServerGrpcMetrics(String tags) {
+    super(tags);
+  }
+
   @Override
   public void registerMetrics() {
     gaugeMap.putIfAbsent(REGISTER_SHUFFLE_METHOD,
-        metricsManager.addGauge(GRPC_REGISTERED_SHUFFLE));
+        metricsManager.addLabeledGauge(GRPC_REGISTERED_SHUFFLE));
     gaugeMap.putIfAbsent(SEND_SHUFFLE_DATA_METHOD,
-        metricsManager.addGauge(GRPC_SEND_SHUFFLE_DATA));
+        metricsManager.addLabeledGauge(GRPC_SEND_SHUFFLE_DATA));
     gaugeMap.putIfAbsent(COMMIT_SHUFFLE_TASK_METHOD,
-        metricsManager.addGauge(GRPC_COMMIT_SHUFFLE_TASK));
+        metricsManager.addLabeledGauge(GRPC_COMMIT_SHUFFLE_TASK));
     gaugeMap.putIfAbsent(FINISH_SHUFFLE_METHOD,
-        metricsManager.addGauge(GRPC_FINISH_SHUFFLE));
+        metricsManager.addLabeledGauge(GRPC_FINISH_SHUFFLE));
     gaugeMap.putIfAbsent(REQUIRE_BUFFER_METHOD,
-        metricsManager.addGauge(GRPC_REQUIRE_BUFFER));
+        metricsManager.addLabeledGauge(GRPC_REQUIRE_BUFFER));
     gaugeMap.putIfAbsent(APP_HEARTBEAT_METHOD,
-        metricsManager.addGauge(GRPC_APP_HEARTBEAT));
+        metricsManager.addLabeledGauge(GRPC_APP_HEARTBEAT));
     gaugeMap.putIfAbsent(REPORT_SHUFFLE_RESULT_METHOD,
-        metricsManager.addGauge(GRPC_REPORT_SHUFFLE_RESULT));
+        metricsManager.addLabeledGauge(GRPC_REPORT_SHUFFLE_RESULT));
     gaugeMap.putIfAbsent(GET_SHUFFLE_RESULT_METHOD,
-        metricsManager.addGauge(GRPC_GET_SHUFFLE_RESULT));
+        metricsManager.addLabeledGauge(GRPC_GET_SHUFFLE_RESULT));
     gaugeMap.putIfAbsent(GET_SHUFFLE_DATA_METHOD,
-        metricsManager.addGauge(GRPC_GET_SHUFFLE_DATA));
+        metricsManager.addLabeledGauge(GRPC_GET_SHUFFLE_DATA));
     gaugeMap.putIfAbsent(GET_MEMORY_SHUFFLE_DATA_METHOD,
-        metricsManager.addGauge(GRPC_GET_MEMORY_SHUFFLE_DATA));
+        metricsManager.addLabeledGauge(GRPC_GET_MEMORY_SHUFFLE_DATA));
     gaugeMap.putIfAbsent(GET_SHUFFLE_INDEX_METHOD,
-        metricsManager.addGauge(GRPC_GET_SHUFFLE_INDEX));
+        metricsManager.addLabeledGauge(GRPC_GET_SHUFFLE_INDEX));
 
     counterMap.putIfAbsent(REGISTER_SHUFFLE_METHOD,
-        metricsManager.addCounter(GRPC_REGISTERED_SHUFFLE_TOTAL));
+        metricsManager.addLabeledCounter(GRPC_REGISTERED_SHUFFLE_TOTAL));
     counterMap.putIfAbsent(SEND_SHUFFLE_DATA_METHOD,
-        metricsManager.addCounter(GRPC_SEND_SHUFFLE_DATA_TOTAL));
+        metricsManager.addLabeledCounter(GRPC_SEND_SHUFFLE_DATA_TOTAL));
     counterMap.putIfAbsent(COMMIT_SHUFFLE_TASK_METHOD,
-        metricsManager.addCounter(GRPC_COMMIT_SHUFFLE_TASK_TOTAL));
+        metricsManager.addLabeledCounter(GRPC_COMMIT_SHUFFLE_TASK_TOTAL));
     counterMap.putIfAbsent(FINISH_SHUFFLE_METHOD,
-        metricsManager.addCounter(GRPC_FINISH_SHUFFLE_TOTAL));
+        metricsManager.addLabeledCounter(GRPC_FINISH_SHUFFLE_TOTAL));
     counterMap.putIfAbsent(REQUIRE_BUFFER_METHOD,
-        metricsManager.addCounter(GRPC_REQUIRE_BUFFER_TOTAL));
+        metricsManager.addLabeledCounter(GRPC_REQUIRE_BUFFER_TOTAL));
     counterMap.putIfAbsent(APP_HEARTBEAT_METHOD,
-        metricsManager.addCounter(GRPC_APP_HEARTBEAT_TOTAL));
+        metricsManager.addLabeledCounter(GRPC_APP_HEARTBEAT_TOTAL));
     counterMap.putIfAbsent(REPORT_SHUFFLE_RESULT_METHOD,
-        metricsManager.addCounter(GRPC_REPORT_SHUFFLE_RESULT_TOTAL));
+        metricsManager.addLabeledCounter(GRPC_REPORT_SHUFFLE_RESULT_TOTAL));
     counterMap.putIfAbsent(GET_SHUFFLE_RESULT_METHOD,
-        metricsManager.addCounter(GRPC_GET_SHUFFLE_RESULT_TOTAL));
+        metricsManager.addLabeledCounter(GRPC_GET_SHUFFLE_RESULT_TOTAL));
     counterMap.putIfAbsent(GET_SHUFFLE_DATA_METHOD,
-        metricsManager.addCounter(GRPC_GET_SHUFFLE_DATA_TOTAL));
+        metricsManager.addLabeledCounter(GRPC_GET_SHUFFLE_DATA_TOTAL));
     counterMap.putIfAbsent(GET_MEMORY_SHUFFLE_DATA_METHOD,
-        metricsManager.addCounter(GRPC_GET_MEMORY_SHUFFLE_DATA_TOTAL));
+        metricsManager.addLabeledCounter(GRPC_GET_MEMORY_SHUFFLE_DATA_TOTAL));
     counterMap.putIfAbsent(GET_SHUFFLE_INDEX_METHOD,
-        metricsManager.addCounter(GRPC_GET_SHUFFLE_INDEX_TOTAL));
+        metricsManager.addLabeledCounter(GRPC_GET_SHUFFLE_INDEX_TOTAL));
 
     transportTimeSummaryMap.putIfAbsent(SEND_SHUFFLE_DATA_METHOD,
-        metricsManager.addSummary(GRPC_SEND_SHUFFLE_DATA_TRANSPORT_LATENCY));
+        
metricsManager.addLabeledSummary(GRPC_SEND_SHUFFLE_DATA_TRANSPORT_LATENCY));
     transportTimeSummaryMap.putIfAbsent(GET_SHUFFLE_DATA_METHOD,
-        metricsManager.addSummary(GRPC_GET_SHUFFLE_DATA_TRANSPORT_LATENCY));
+        
metricsManager.addLabeledSummary(GRPC_GET_SHUFFLE_DATA_TRANSPORT_LATENCY));
     transportTimeSummaryMap.putIfAbsent(GET_MEMORY_SHUFFLE_DATA_METHOD,
-        
metricsManager.addSummary(GRPC_GET_MEMORY_SHUFFLE_DATA_TRANSPORT_LATENCY));
+        
metricsManager.addLabeledSummary(GRPC_GET_MEMORY_SHUFFLE_DATA_TRANSPORT_LATENCY));
 
     processTimeSummaryMap.putIfAbsent(SEND_SHUFFLE_DATA_METHOD,
-        metricsManager.addSummary(GRPC_SEND_SHUFFLE_DATA_PROCESS_LATENCY));
+        
metricsManager.addLabeledSummary(GRPC_SEND_SHUFFLE_DATA_PROCESS_LATENCY));
     processTimeSummaryMap.putIfAbsent(GET_SHUFFLE_DATA_METHOD,
-        metricsManager.addSummary(GRPC_GET_SHUFFLE_DATA_PROCESS_LATENCY));
+        
metricsManager.addLabeledSummary(GRPC_GET_SHUFFLE_DATA_PROCESS_LATENCY));
     processTimeSummaryMap.putIfAbsent(GET_MEMORY_SHUFFLE_DATA_METHOD,
-        
metricsManager.addSummary(GRPC_GET_MEMORY_SHUFFLE_DATA_PROCESS_LATENCY));
+        
metricsManager.addLabeledSummary(GRPC_GET_MEMORY_SHUFFLE_DATA_PROCESS_LATENCY));
   }
 
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index a5b09ff1..131394dc 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -17,13 +17,17 @@
 
 package org.apache.uniffle.server;
 
+import java.util.Map;
+
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.uniffle.common.metrics.MetricsManager;
+import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.common.LocalStorage;
 
 public class ShuffleServerMetrics {
@@ -90,72 +94,76 @@ public class ShuffleServerMetrics {
   private static final String HUGE_PARTITION_NUM = "huge_partition_num";
   private static final String APP_WITH_HUGE_PARTITION_NUM = 
"app_with_huge_partition_num";
 
-  public static Counter counterTotalAppNum;
-  public static Counter counterTotalAppWithHugePartitionNum;
-  public static Counter counterTotalPartitionNum;
-  public static Counter counterTotalHugePartitionNum;
+  public static Counter.Child counterTotalAppNum;
+  public static Counter.Child counterTotalAppWithHugePartitionNum;
+  public static Counter.Child counterTotalPartitionNum;
+  public static Counter.Child counterTotalHugePartitionNum;
 
-  public static Counter counterTotalReceivedDataSize;
-  public static Counter counterTotalWriteDataSize;
-  public static Counter counterTotalWriteBlockSize;
-  public static Counter counterTotalWriteTime;
-  public static Counter counterWriteException;
-  public static Counter counterWriteSlow;
-  public static Counter counterWriteTotal;
-  public static Counter counterEventSizeThresholdLevel1;
-  public static Counter counterEventSizeThresholdLevel2;
-  public static Counter counterEventSizeThresholdLevel3;
-  public static Counter counterEventSizeThresholdLevel4;
-  public static Counter counterTotalReadDataSize;
-  public static Counter counterTotalReadLocalDataFileSize;
-  public static Counter counterTotalReadLocalIndexFileSize;
-  public static Counter counterTotalReadMemoryDataSize;
-  public static Counter counterTotalReadTime;
-  public static Counter counterTotalFailedWrittenEventNum;
-  public static Counter counterTotalDroppedEventNum;
-  public static Counter counterTotalHdfsWriteDataSize;
-  public static Counter counterTotalLocalFileWriteDataSize;
-  public static Counter counterTotalRequireBufferFailed;
-  public static Counter counterTotalRequireBufferFailedForHugePartition;
-  public static Counter counterTotalRequireBufferFailedForRegularPartition;
+  public static Counter.Child  counterTotalReceivedDataSize;
+  public static Counter.Child  counterTotalWriteDataSize;
+  public static Counter.Child  counterTotalWriteBlockSize;
+  public static Counter.Child  counterTotalWriteTime;
+  public static Counter.Child  counterWriteException;
+  public static Counter.Child  counterWriteSlow;
+  public static Counter.Child  counterWriteTotal;
+  public static Counter.Child  counterEventSizeThresholdLevel1;
+  public static Counter.Child  counterEventSizeThresholdLevel2;
+  public static Counter.Child  counterEventSizeThresholdLevel3;
+  public static Counter.Child  counterEventSizeThresholdLevel4;
+  public static Counter.Child  counterTotalReadDataSize;
+  public static Counter.Child  counterTotalReadLocalDataFileSize;
+  public static Counter.Child  counterTotalReadLocalIndexFileSize;
+  public static Counter.Child  counterTotalReadMemoryDataSize;
+  public static Counter.Child  counterTotalReadTime;
+  public static Counter.Child  counterTotalFailedWrittenEventNum;
+  public static Counter.Child  counterTotalDroppedEventNum;
+  public static Counter.Child  counterTotalHdfsWriteDataSize;
+  public static Counter.Child  counterTotalLocalFileWriteDataSize;
+  public static Counter.Child  counterTotalRequireBufferFailed;
+  public static Counter.Child  counterTotalRequireBufferFailedForHugePartition;
+  public static Counter.Child  
counterTotalRequireBufferFailedForRegularPartition;
 
-  public static Counter counterLocalStorageTotalWrite;
-  public static Counter counterLocalStorageRetryWrite;
-  public static Counter counterLocalStorageFailedWrite;
-  public static Counter counterLocalStorageSuccessWrite;
-  public static Counter counterTotalRequireReadMemoryNum;
-  public static Counter counterTotalRequireReadMemoryRetryNum;
-  public static Counter counterTotalRequireReadMemoryFailedNum;
+  public static Counter.Child  counterLocalStorageTotalWrite;
+  public static Counter.Child  counterLocalStorageRetryWrite;
+  public static Counter.Child  counterLocalStorageFailedWrite;
+  public static Counter.Child  counterLocalStorageSuccessWrite;
+  public static Counter.Child  counterTotalRequireReadMemoryNum;
+  public static Counter.Child  counterTotalRequireReadMemoryRetryNum;
+  public static Counter.Child  counterTotalRequireReadMemoryFailedNum;
 
-  public static Gauge gaugeHugePartitionNum;
-  public static Gauge gaugeAppWithHugePartitionNum;
+  public static Gauge.Child gaugeHugePartitionNum;
+  public static Gauge.Child gaugeAppWithHugePartitionNum;
 
-  public static Gauge gaugeLocalStorageTotalDirsNum;
-  public static Gauge gaugeLocalStorageCorruptedDirsNum;
-  public static Gauge gaugeLocalStorageTotalSpace;
-  public static Gauge gaugeLocalStorageUsedSpace;
-  public static Gauge gaugeLocalStorageUsedSpaceRatio;
+  public static Gauge.Child gaugeLocalStorageTotalDirsNum;
+  public static Gauge.Child gaugeLocalStorageCorruptedDirsNum;
+  public static Gauge.Child gaugeLocalStorageTotalSpace;
+  public static Gauge.Child gaugeLocalStorageUsedSpace;
+  public static Gauge.Child gaugeLocalStorageUsedSpaceRatio;
 
-  public static Gauge gaugeIsHealthy;
-  public static Gauge gaugeAllocatedBufferSize;
-  public static Gauge gaugeInFlushBufferSize;
-  public static Gauge gaugeUsedBufferSize;
-  public static Gauge gaugeReadBufferUsedSize;
-  public static Gauge gaugeWriteHandler;
-  public static Gauge gaugeEventQueueSize;
-  public static Gauge gaugeAppNum;
-  public static Gauge gaugeTotalPartitionNum;
+  public static Gauge.Child gaugeIsHealthy;
+  public static Gauge.Child gaugeAllocatedBufferSize;
+  public static Gauge.Child gaugeInFlushBufferSize;
+  public static Gauge.Child gaugeUsedBufferSize;
+  public static Gauge.Child gaugeReadBufferUsedSize;
+  public static Gauge.Child gaugeWriteHandler;
+  public static Gauge.Child gaugeEventQueueSize;
+  public static Gauge.Child gaugeAppNum;
+  public static Gauge.Child gaugeTotalPartitionNum;
   public static Counter counterRemoteStorageTotalWrite;
   public static Counter counterRemoteStorageRetryWrite;
   public static Counter counterRemoteStorageFailedWrite;
   public static Counter counterRemoteStorageSuccessWrite;
+  private static String tags;
 
   private static MetricsManager metricsManager;
   private static boolean isRegister = false;
 
-  public static synchronized void register(CollectorRegistry 
collectorRegistry) {
+  public static synchronized void register(CollectorRegistry 
collectorRegistry, String tags) {
     if (!isRegister) {
-      metricsManager = new MetricsManager(collectorRegistry);
+      ShuffleServerMetrics.tags = tags;
+      Map<String, String> labels = Maps.newHashMap();
+      labels.put(Constants.METRICS_TAG_LABEL_NAME, ShuffleServerMetrics.tags);
+      metricsManager = new MetricsManager(collectorRegistry, labels);
       isRegister = true;
       setUpMetrics();
     }
@@ -163,7 +171,7 @@ public class ShuffleServerMetrics {
 
   @VisibleForTesting
   public static void register() {
-    register(CollectorRegistry.defaultRegistry);
+    register(CollectorRegistry.defaultRegistry, 
Constants.SHUFFLE_SERVER_VERSION);
   }
 
   @VisibleForTesting
@@ -182,8 +190,8 @@ public class ShuffleServerMetrics {
       counterLocalStorageRetryWrite.inc();
     } else {
       if (!StringUtils.isEmpty(storageHost)) {
-        counterRemoteStorageTotalWrite.labels(storageHost).inc();
-        counterRemoteStorageRetryWrite.labels(storageHost).inc();
+        counterRemoteStorageTotalWrite.labels(tags, storageHost).inc();
+        counterRemoteStorageRetryWrite.labels(tags, storageHost).inc();
       }
     }
   }
@@ -194,8 +202,8 @@ public class ShuffleServerMetrics {
       counterLocalStorageSuccessWrite.inc();
     } else {
       if (!StringUtils.isEmpty(storageHost)) {
-        counterRemoteStorageTotalWrite.labels(storageHost).inc();
-        counterRemoteStorageSuccessWrite.labels(storageHost).inc();
+        counterRemoteStorageTotalWrite.labels(tags, storageHost).inc();
+        counterRemoteStorageSuccessWrite.labels(tags, storageHost).inc();
       }
     }
   }
@@ -206,73 +214,76 @@ public class ShuffleServerMetrics {
       counterLocalStorageFailedWrite.inc();
     } else {
       if (!StringUtils.isEmpty(storageHost)) {
-        counterRemoteStorageTotalWrite.labels(storageHost).inc();
-        counterRemoteStorageFailedWrite.labels(storageHost).inc();
+        counterRemoteStorageTotalWrite.labels(tags, storageHost).inc();
+        counterRemoteStorageFailedWrite.labels(tags, storageHost).inc();
       }
     }
   }
 
   private static void setUpMetrics() {
-    counterTotalReceivedDataSize = 
metricsManager.addCounter(TOTAL_RECEIVED_DATA);
-    counterTotalWriteDataSize = metricsManager.addCounter(TOTAL_WRITE_DATA);
-    counterTotalWriteBlockSize = metricsManager.addCounter(TOTAL_WRITE_BLOCK);
-    counterTotalWriteTime = metricsManager.addCounter(TOTAL_WRITE_TIME);
-    counterWriteException = metricsManager.addCounter(TOTAL_WRITE_EXCEPTION);
-    counterWriteSlow = metricsManager.addCounter(TOTAL_WRITE_SLOW);
-    counterWriteTotal = metricsManager.addCounter(TOTAL_WRITE_NUM);
-    counterEventSizeThresholdLevel1 = 
metricsManager.addCounter(EVENT_SIZE_THRESHOLD_LEVEL1);
-    counterEventSizeThresholdLevel2 = 
metricsManager.addCounter(EVENT_SIZE_THRESHOLD_LEVEL2);
-    counterEventSizeThresholdLevel3 = 
metricsManager.addCounter(EVENT_SIZE_THRESHOLD_LEVEL3);
-    counterEventSizeThresholdLevel4 = 
metricsManager.addCounter(EVENT_SIZE_THRESHOLD_LEVEL4);
-    counterTotalReadDataSize = metricsManager.addCounter(TOTAL_READ_DATA);
-    counterTotalReadLocalDataFileSize = 
metricsManager.addCounter(TOTAL_READ_LOCAL_DATA_FILE);
-    counterTotalReadLocalIndexFileSize = 
metricsManager.addCounter(TOTAL_READ_LOCAL_INDEX_FILE);
-    counterTotalReadMemoryDataSize = 
metricsManager.addCounter(TOTAL_READ_MEMORY_DATA);
-    counterTotalReadTime = metricsManager.addCounter(TOTAL_READ_TIME);
-    counterTotalDroppedEventNum = 
metricsManager.addCounter(TOTAL_DROPPED_EVENT_NUM);
-    counterTotalFailedWrittenEventNum = 
metricsManager.addCounter(TOTAL_FAILED_WRITTEN_EVENT_NUM);
-    counterTotalHdfsWriteDataSize = 
metricsManager.addCounter(TOTAL_HDFS_WRITE_DATA);
-    counterTotalLocalFileWriteDataSize = 
metricsManager.addCounter(TOTAL_LOCALFILE_WRITE_DATA);
-    counterTotalRequireBufferFailed = 
metricsManager.addCounter(TOTAL_REQUIRE_BUFFER_FAILED);
+    counterTotalReceivedDataSize = 
metricsManager.addLabeledCounter(TOTAL_RECEIVED_DATA);
+    counterTotalWriteDataSize = 
metricsManager.addLabeledCounter(TOTAL_WRITE_DATA);
+    counterTotalWriteBlockSize = 
metricsManager.addLabeledCounter(TOTAL_WRITE_BLOCK);
+    counterTotalWriteTime = metricsManager.addLabeledCounter(TOTAL_WRITE_TIME);
+    counterWriteException = 
metricsManager.addLabeledCounter(TOTAL_WRITE_EXCEPTION);
+    counterWriteSlow = metricsManager.addLabeledCounter(TOTAL_WRITE_SLOW);
+    counterWriteTotal = metricsManager.addLabeledCounter(TOTAL_WRITE_NUM);
+    counterEventSizeThresholdLevel1 = 
metricsManager.addLabeledCounter(EVENT_SIZE_THRESHOLD_LEVEL1);
+    counterEventSizeThresholdLevel2 = 
metricsManager.addLabeledCounter(EVENT_SIZE_THRESHOLD_LEVEL2);
+    counterEventSizeThresholdLevel3 = 
metricsManager.addLabeledCounter(EVENT_SIZE_THRESHOLD_LEVEL3);
+    counterEventSizeThresholdLevel4 = 
metricsManager.addLabeledCounter(EVENT_SIZE_THRESHOLD_LEVEL4);
+    counterTotalReadDataSize = 
metricsManager.addLabeledCounter(TOTAL_READ_DATA);
+    counterTotalReadLocalDataFileSize = 
metricsManager.addLabeledCounter(TOTAL_READ_LOCAL_DATA_FILE);
+    counterTotalReadLocalIndexFileSize = 
metricsManager.addLabeledCounter(TOTAL_READ_LOCAL_INDEX_FILE);
+    counterTotalReadMemoryDataSize = 
metricsManager.addLabeledCounter(TOTAL_READ_MEMORY_DATA);
+    counterTotalReadTime = metricsManager.addLabeledCounter(TOTAL_READ_TIME);
+    counterTotalDroppedEventNum = 
metricsManager.addLabeledCounter(TOTAL_DROPPED_EVENT_NUM);
+    counterTotalFailedWrittenEventNum = 
metricsManager.addLabeledCounter(TOTAL_FAILED_WRITTEN_EVENT_NUM);
+    counterTotalHdfsWriteDataSize = 
metricsManager.addLabeledCounter(TOTAL_HDFS_WRITE_DATA);
+    counterTotalLocalFileWriteDataSize = 
metricsManager.addLabeledCounter(TOTAL_LOCALFILE_WRITE_DATA);
+    counterTotalRequireBufferFailed = 
metricsManager.addLabeledCounter(TOTAL_REQUIRE_BUFFER_FAILED);
     counterTotalRequireBufferFailedForRegularPartition =
-        
metricsManager.addCounter(TOTAL_REQUIRE_BUFFER_FAILED_FOR_REGULAR_PARTITION);
+        
metricsManager.addLabeledCounter(TOTAL_REQUIRE_BUFFER_FAILED_FOR_REGULAR_PARTITION);
     counterTotalRequireBufferFailedForHugePartition =
-        
metricsManager.addCounter(TOTAL_REQUIRE_BUFFER_FAILED_FOR_HUGE_PARTITION);
-    counterLocalStorageTotalWrite = 
metricsManager.addCounter(STORAGE_TOTAL_WRITE_LOCAL);
-    counterLocalStorageRetryWrite = 
metricsManager.addCounter(STORAGE_RETRY_WRITE_LOCAL);
-    counterLocalStorageFailedWrite = 
metricsManager.addCounter(STORAGE_FAILED_WRITE_LOCAL);
-    counterLocalStorageSuccessWrite = 
metricsManager.addCounter(STORAGE_SUCCESS_WRITE_LOCAL);
-    counterRemoteStorageTotalWrite = 
metricsManager.addCounter(STORAGE_TOTAL_WRITE_REMOTE, STORAGE_HOST_LABEL);
-    counterRemoteStorageRetryWrite = 
metricsManager.addCounter(STORAGE_RETRY_WRITE_REMOTE, STORAGE_HOST_LABEL);
-    counterRemoteStorageFailedWrite = 
metricsManager.addCounter(STORAGE_FAILED_WRITE_REMOTE, STORAGE_HOST_LABEL);
-    counterRemoteStorageSuccessWrite = 
metricsManager.addCounter(STORAGE_SUCCESS_WRITE_REMOTE, STORAGE_HOST_LABEL);
-    counterTotalRequireReadMemoryNum = 
metricsManager.addCounter(TOTAL_REQUIRE_READ_MEMORY);
-    counterTotalRequireReadMemoryRetryNum = 
metricsManager.addCounter(TOTAL_REQUIRE_READ_MEMORY_RETRY);
-    counterTotalRequireReadMemoryFailedNum = 
metricsManager.addCounter(TOTAL_REQUIRE_READ_MEMORY_FAILED);
+        
metricsManager.addLabeledCounter(TOTAL_REQUIRE_BUFFER_FAILED_FOR_HUGE_PARTITION);
+    counterLocalStorageTotalWrite = 
metricsManager.addLabeledCounter(STORAGE_TOTAL_WRITE_LOCAL);
+    counterLocalStorageRetryWrite = 
metricsManager.addLabeledCounter(STORAGE_RETRY_WRITE_LOCAL);
+    counterLocalStorageFailedWrite = 
metricsManager.addLabeledCounter(STORAGE_FAILED_WRITE_LOCAL);
+    counterLocalStorageSuccessWrite = 
metricsManager.addLabeledCounter(STORAGE_SUCCESS_WRITE_LOCAL);
+    counterRemoteStorageTotalWrite = metricsManager.addCounter(
+        STORAGE_TOTAL_WRITE_REMOTE, Constants.METRICS_TAG_LABEL_NAME, 
STORAGE_HOST_LABEL);
+    counterRemoteStorageRetryWrite = metricsManager.addCounter(
+        STORAGE_RETRY_WRITE_REMOTE, Constants.METRICS_TAG_LABEL_NAME, 
STORAGE_HOST_LABEL);
+    counterRemoteStorageFailedWrite = metricsManager.addCounter(
+        STORAGE_FAILED_WRITE_REMOTE, Constants.METRICS_TAG_LABEL_NAME, 
STORAGE_HOST_LABEL);
+    counterRemoteStorageSuccessWrite = metricsManager.addCounter(
+        STORAGE_SUCCESS_WRITE_REMOTE, Constants.METRICS_TAG_LABEL_NAME, 
STORAGE_HOST_LABEL);
+    counterTotalRequireReadMemoryNum = 
metricsManager.addLabeledCounter(TOTAL_REQUIRE_READ_MEMORY);
+    counterTotalRequireReadMemoryRetryNum = 
metricsManager.addLabeledCounter(TOTAL_REQUIRE_READ_MEMORY_RETRY);
+    counterTotalRequireReadMemoryFailedNum = 
metricsManager.addLabeledCounter(TOTAL_REQUIRE_READ_MEMORY_FAILED);
 
-    counterTotalAppNum = metricsManager.addCounter(TOTAL_APP_NUM);
-    counterTotalAppWithHugePartitionNum = 
metricsManager.addCounter(TOTAL_APP_WITH_HUGE_PARTITION_NUM);
-    counterTotalPartitionNum = metricsManager.addCounter(TOTAL_PARTITION_NUM);
-    counterTotalHugePartitionNum = 
metricsManager.addCounter(TOTAL_HUGE_PARTITION_NUM);
+    counterTotalAppNum = metricsManager.addLabeledCounter(TOTAL_APP_NUM);
+    counterTotalAppWithHugePartitionNum = 
metricsManager.addLabeledCounter(TOTAL_APP_WITH_HUGE_PARTITION_NUM);
+    counterTotalPartitionNum = 
metricsManager.addLabeledCounter(TOTAL_PARTITION_NUM);
+    counterTotalHugePartitionNum = 
metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_NUM);
 
-    gaugeLocalStorageTotalDirsNum = 
metricsManager.addGauge(LOCAL_STORAGE_TOTAL_DIRS_NUM);
-    gaugeLocalStorageCorruptedDirsNum = 
metricsManager.addGauge(LOCAL_STORAGE_CORRUPTED_DIRS_NUM);
-    gaugeLocalStorageTotalSpace = 
metricsManager.addGauge(LOCAL_STORAGE_TOTAL_SPACE);
-    gaugeLocalStorageUsedSpace = 
metricsManager.addGauge(LOCAL_STORAGE_USED_SPACE);
-    gaugeLocalStorageUsedSpaceRatio = 
metricsManager.addGauge(LOCAL_STORAGE_USED_SPACE_RATIO);
+    gaugeLocalStorageTotalDirsNum = 
metricsManager.addLabeledGauge(LOCAL_STORAGE_TOTAL_DIRS_NUM);
+    gaugeLocalStorageCorruptedDirsNum = 
metricsManager.addLabeledGauge(LOCAL_STORAGE_CORRUPTED_DIRS_NUM);
+    gaugeLocalStorageTotalSpace = 
metricsManager.addLabeledGauge(LOCAL_STORAGE_TOTAL_SPACE);
+    gaugeLocalStorageUsedSpace = 
metricsManager.addLabeledGauge(LOCAL_STORAGE_USED_SPACE);
+    gaugeLocalStorageUsedSpaceRatio = 
metricsManager.addLabeledGauge(LOCAL_STORAGE_USED_SPACE_RATIO);
 
-    gaugeIsHealthy = metricsManager.addGauge(IS_HEALTHY);
-    gaugeAllocatedBufferSize = metricsManager.addGauge(ALLOCATED_BUFFER_SIZE);
-    gaugeInFlushBufferSize = metricsManager.addGauge(IN_FLUSH_BUFFER_SIZE);
-    gaugeUsedBufferSize = metricsManager.addGauge(USED_BUFFER_SIZE);
-    gaugeReadBufferUsedSize = metricsManager.addGauge(READ_USED_BUFFER_SIZE);
-    gaugeWriteHandler = metricsManager.addGauge(TOTAL_WRITE_HANDLER);
-    gaugeEventQueueSize = metricsManager.addGauge(EVENT_QUEUE_SIZE);
-    gaugeAppNum = metricsManager.addGauge(APP_NUM_WITH_NODE);
-    gaugeTotalPartitionNum = metricsManager.addGauge(PARTITION_NUM_WITH_NODE);
+    gaugeIsHealthy = metricsManager.addLabeledGauge(IS_HEALTHY);
+    gaugeAllocatedBufferSize = 
metricsManager.addLabeledGauge(ALLOCATED_BUFFER_SIZE);
+    gaugeInFlushBufferSize = 
metricsManager.addLabeledGauge(IN_FLUSH_BUFFER_SIZE);
+    gaugeUsedBufferSize = metricsManager.addLabeledGauge(USED_BUFFER_SIZE);
+    gaugeReadBufferUsedSize = 
metricsManager.addLabeledGauge(READ_USED_BUFFER_SIZE);
+    gaugeWriteHandler = metricsManager.addLabeledGauge(TOTAL_WRITE_HANDLER);
+    gaugeEventQueueSize = metricsManager.addLabeledGauge(EVENT_QUEUE_SIZE);
+    gaugeAppNum = metricsManager.addLabeledGauge(APP_NUM_WITH_NODE);
+    gaugeTotalPartitionNum = 
metricsManager.addLabeledGauge(PARTITION_NUM_WITH_NODE);
 
-    gaugeHugePartitionNum = metricsManager.addGauge(HUGE_PARTITION_NUM);
-    gaugeAppWithHugePartitionNum = 
metricsManager.addGauge(APP_WITH_HUGE_PARTITION_NUM);
+    gaugeHugePartitionNum = metricsManager.addLabeledGauge(HUGE_PARTITION_NUM);
+    gaugeAppWithHugePartitionNum = 
metricsManager.addLabeledGauge(APP_WITH_HUGE_PARTITION_NUM);
   }
-
 }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 54f4a0b5..53eb4ef8 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -52,6 +52,7 @@ import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.util.ChecksumUtils;
+import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
 import org.apache.uniffle.server.event.AppPurgeEvent;
 import org.apache.uniffle.server.storage.HdfsStorageManager;
@@ -186,10 +187,14 @@ public class ShuffleFlushManagerTest extends HdfsTestBase 
{
         
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
     storageManager.registerRemoteStorage(appId, remoteStorage);
     String storageHost = cluster.getURI().getHost();
-    assertEquals(0.0, 
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(storageHost).get(), 
0.5);
-    assertEquals(0.0, 
ShuffleServerMetrics.counterRemoteStorageRetryWrite.labels(storageHost).get(), 
0.5);
-    assertEquals(0.0, 
ShuffleServerMetrics.counterRemoteStorageFailedWrite.labels(storageHost).get(), 
0.5);
-    assertEquals(0.0, 
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.labels(storageHost).get(),
 0.5);
+    assertEquals(0.0, ShuffleServerMetrics.counterRemoteStorageTotalWrite
+        .labels(Constants.SHUFFLE_SERVER_VERSION, storageHost).get(), 0.5);
+    assertEquals(0.0, ShuffleServerMetrics.counterRemoteStorageRetryWrite
+        .labels(Constants.SHUFFLE_SERVER_VERSION, storageHost).get(), 0.5);
+    assertEquals(0.0, ShuffleServerMetrics.counterRemoteStorageFailedWrite
+        .labels(Constants.SHUFFLE_SERVER_VERSION, storageHost).get(), 0.5);
+    assertEquals(0.0, ShuffleServerMetrics.counterRemoteStorageSuccessWrite
+        .labels(Constants.SHUFFLE_SERVER_VERSION, storageHost).get(), 0.5);
     ShuffleFlushManager manager =
         new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, 
storageManager);
     ShuffleDataFlushEvent event1 =
@@ -214,8 +219,10 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     validate(appId, 2, 2, blocks21, 1, remoteStorage.getPath());
     assertEquals(blocks21.size(), manager.getCommittedBlockIds(appId, 
2).getLongCardinality());
 
-    assertEquals(3.0, 
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(storageHost).get(), 
0.5);
-    assertEquals(3.0, 
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.labels(storageHost).get(),
 0.5);
+    assertEquals(3.0, ShuffleServerMetrics.counterRemoteStorageTotalWrite
+        .labels(Constants.SHUFFLE_SERVER_VERSION, storageHost).get(), 0.5);
+    assertEquals(3.0, ShuffleServerMetrics.counterRemoteStorageSuccessWrite
+        .labels(Constants.SHUFFLE_SERVER_VERSION, storageHost).get(), 0.5);
 
     // test case for process event whose related app was cleared already
     assertEquals(0, ShuffleServerMetrics.gaugeWriteHandler.get(), 0.5);
@@ -378,7 +385,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     assertEquals(0, storage.getHandlerSize());
   }
 
-  private void waitForMetrics(Gauge gauge, double expected, double delta) 
throws Exception {
+  private void waitForMetrics(Gauge.Child gauge, double expected, double 
delta) throws Exception {
     int retry = 0;
     boolean match = false;
     do {
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
index 6f5c39db..67b750fc 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
@@ -23,12 +23,14 @@ import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Summary;
 import org.junit.jupiter.api.Test;
 
+import org.apache.uniffle.common.util.Constants;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class ShuffleServerGrpcMetricsTest {
   @Test
   public void testLatencyMetrics() {
-    ShuffleServerGrpcMetrics metrics = new ShuffleServerGrpcMetrics();
+    ShuffleServerGrpcMetrics metrics = new 
ShuffleServerGrpcMetrics(Constants.SHUFFLE_SERVER_VERSION);
     metrics.register(new CollectorRegistry(true));
     
metrics.recordTransportTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, 
1000);
     
metrics.recordTransportTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, 
500);
@@ -36,8 +38,8 @@ public class ShuffleServerGrpcMetricsTest {
     
metrics.recordProcessTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, 
1000);
     
metrics.recordProcessTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, 
500);
     
metrics.recordProcessTime(ShuffleServerGrpcMetrics.GET_MEMORY_SHUFFLE_DATA_METHOD,
 200);
-    Map<String, Summary> sendTimeSummaryTime = 
metrics.getTransportTimeSummaryMap();
-    Map<String, Summary> processTimeSummaryTime = 
metrics.getProcessTimeSummaryMap();
+    Map<String, Summary.Child> sendTimeSummaryTime = 
metrics.getTransportTimeSummaryMap();
+    Map<String, Summary.Child> processTimeSummaryTime = 
metrics.getProcessTimeSummaryMap();
     assertEquals(3, sendTimeSummaryTime.size());
     assertEquals(3, processTimeSummaryTime.size());
 
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index 93f48f8a..540184a3 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.metrics.TestUtils;
+import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.common.LocalStorage;
 import org.apache.uniffle.storage.util.StorageType;
 
@@ -86,10 +87,14 @@ public class ShuffleServerMetricsTest {
 
   @Test
   public void testServerMetrics() throws Exception {
-    
ShuffleServerMetrics.counterRemoteStorageFailedWrite.labels(STORAGE_HOST).inc(0);
-    
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.labels(STORAGE_HOST).inc(0);
-    
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(STORAGE_HOST).inc(0);
-    
ShuffleServerMetrics.counterRemoteStorageRetryWrite.labels(STORAGE_HOST).inc(0);
+    ShuffleServerMetrics.counterRemoteStorageFailedWrite
+        .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).inc(0);
+    ShuffleServerMetrics.counterRemoteStorageSuccessWrite
+        .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).inc(0);
+    ShuffleServerMetrics.counterRemoteStorageTotalWrite
+        .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).inc(0);
+    ShuffleServerMetrics.counterRemoteStorageRetryWrite
+        .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).inc(0);
     String content = TestUtils.httpGet(SERVER_METRICS_URL);
     ObjectMapper mapper = new ObjectMapper();
     JsonNode actualObj = mapper.readTree(content);
@@ -101,7 +106,7 @@ public class ShuffleServerMetricsTest {
         ShuffleServerMetrics.STORAGE_FAILED_WRITE_REMOTE,
         ShuffleServerMetrics.STORAGE_RETRY_WRITE_REMOTE);
     for (String expectMetricName : expectedMetricNames) {
-      validateMetrics(mapper, metricsNode, expectMetricName, STORAGE_HOST);
+      validateMetrics(mapper, metricsNode, expectMetricName, 
Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST);
     }
   }
 
@@ -134,14 +139,20 @@ public class ShuffleServerMetricsTest {
 
     // test for remote storage
     ShuffleServerMetrics.incStorageRetryCounter(STORAGE_HOST);
-    assertEquals(1.0, 
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(STORAGE_HOST).get(), 
0.5);
-    assertEquals(1.0, 
ShuffleServerMetrics.counterRemoteStorageRetryWrite.labels(STORAGE_HOST).get(), 
0.5);
+    assertEquals(1.0, ShuffleServerMetrics.counterRemoteStorageTotalWrite
+        .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).get(), 0.5);
+    assertEquals(1.0, ShuffleServerMetrics.counterRemoteStorageRetryWrite
+        .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).get(), 0.5);
     ShuffleServerMetrics.incStorageSuccessCounter(STORAGE_HOST);
-    assertEquals(2.0, 
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(STORAGE_HOST).get(), 
0.5);
-    assertEquals(1.0, 
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.labels(STORAGE_HOST).get(),
 0.5);
+    assertEquals(2.0, ShuffleServerMetrics.counterRemoteStorageTotalWrite
+        .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).get(), 0.5);
+    assertEquals(1.0, ShuffleServerMetrics.counterRemoteStorageSuccessWrite
+        .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).get(), 0.5);
     ShuffleServerMetrics.incStorageFailedCounter(STORAGE_HOST);
-    assertEquals(3.0, 
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(STORAGE_HOST).get(), 
0.5);
-    assertEquals(1.0, 
ShuffleServerMetrics.counterRemoteStorageFailedWrite.labels(STORAGE_HOST).get(),
 0.5);
+    assertEquals(3.0, ShuffleServerMetrics.counterRemoteStorageTotalWrite
+        .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).get(), 0.5);
+    assertEquals(1.0, ShuffleServerMetrics.counterRemoteStorageFailedWrite
+        .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST).get(), 0.5);
   }
 
   @Test

Reply via email to