This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new fc64c7ca9 [#1348] improvement(metrics): Unify tags generation for 
shuffle-server metrics reporter (#1349)
fc64c7ca9 is described below

commit fc64c7ca9a92cd3dfd8b00d15119fbb839d34e7b
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Dec 13 16:58:08 2023 +0800

    [#1348] improvement(metrics): Unify tags generation for shuffle-server 
metrics reporter (#1349)
    
    ### What changes were proposed in this pull request?
    
    Currently, the tags label is generated by self when registering metrics for 
shuffle-server. This is wrong, which will lack the GRPC or NETTY_GRPC tags.
    
    ### Why are the changes needed?
    
    Fix: #1348
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Neen't
---
 .../org/apache/uniffle/common/util/Constants.java  |  2 +-
 .../apache/uniffle/test/CoordinatorGrpcTest.java   |  5 +++
 .../org/apache/uniffle/server/ShuffleServer.java   | 42 ++++++++--------------
 .../uniffle/server/ShuffleServerMetricsTest.java   | 34 +++++++++---------
 4 files changed, 38 insertions(+), 45 deletions(-)

diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java 
b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index 5970bacc4..4b35463cc 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -23,7 +23,7 @@ public final class Constants {
 
   // the value is used for client/server compatible, eg, online upgrade
   public static final String SHUFFLE_SERVER_VERSION = "ss_v4";
-  public static final String METRICS_TAG_LABEL_NAME = "label";
+  public static final String METRICS_TAG_LABEL_NAME = "tags";
   public static final String COORDINATOR_TAG = "coordinator";
   public static final String SHUFFLE_DATA_FILE_SUFFIX = ".data";
   public static final String SHUFFLE_INDEX_FILE_SUFFIX = ".index";
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
index 8b0693a62..0384c1f2a 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
@@ -45,6 +45,7 @@ import org.apache.uniffle.common.storage.StorageMedia;
 import org.apache.uniffle.common.storage.StorageStatus;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.CoordinatorServer;
 import org.apache.uniffle.coordinator.ServerNode;
 import org.apache.uniffle.coordinator.SimpleClusterManager;
 import org.apache.uniffle.coordinator.metric.CoordinatorGrpcMetrics;
@@ -153,6 +154,7 @@ public class CoordinatorGrpcTest extends 
CoordinatorTestBase {
     ((ConcurrentHashMap<Object, Object>) field.get(shuffleServerConf))
         .remove(ShuffleServerConf.NETTY_SERVER_PORT.key());
     String storageTypeJsonSource = String.format("{\"%s\": \"ssd\"}", baseDir);
+
     withEnvironmentVariables("RSS_ENV_KEY", storageTypeJsonSource)
         .execute(
             () -> {
@@ -162,6 +164,9 @@ public class CoordinatorGrpcTest extends 
CoordinatorTestBase {
               shuffleServers.set(0, ss);
             });
     Thread.sleep(5000);
+    CoordinatorServer coordinatorServer = coordinators.get(0);
+    ((SimpleClusterManager) 
(coordinatorServer.getClusterManager())).nodesCheckTest();
+
     // add tag when ClientType is `GRPC`
     RssGetShuffleAssignmentsRequest request =
         new RssGetShuffleAssignmentsRequest(
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 573af23ea..a5e230eb3 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -29,13 +29,14 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import io.prometheus.client.CollectorRegistry;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import picocli.CommandLine;
 
 import org.apache.uniffle.common.Arguments;
-import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.ServerStatus;
+import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.exception.InvalidRequestException;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.metrics.GRPCMetrics;
@@ -208,6 +209,8 @@ public class ShuffleServer {
     grpcPort = shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT);
     nettyPort = 
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_PORT);
 
+    initServerTags();
+
     jettyServer = new JettyServer(shuffleServerConf);
     registerMetrics();
     // register packages and instances for jersey
@@ -261,44 +264,37 @@ public class ShuffleServer {
     shuffleTaskManager =
         new ShuffleTaskManager(
             shuffleServerConf, shuffleFlushManager, shuffleBufferManager, 
storageManager);
+
     nettyServerEnabled = 
shuffleServerConf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0;
     if (nettyServerEnabled) {
       streamServer = new StreamServer(this);
     }
 
     setServer();
-
-    initServerTags();
   }
 
   private void initServerTags() {
     // it's the system tag for server's version
     tags.add(Constants.SHUFFLE_SERVER_VERSION);
+    // the rpc service type bound into tags
+    tags.add(shuffleServerConf.get(RssBaseConf.RPC_SERVER_TYPE).name());
 
     List<String> configuredTags = 
shuffleServerConf.get(ShuffleServerConf.TAGS);
     if (CollectionUtils.isNotEmpty(configuredTags)) {
       tags.addAll(configuredTags);
     }
-    tagServer();
-    LOG.info("Server tags: {}", tags);
-  }
 
-  private void tagServer() {
-    if (nettyServerEnabled) {
-      tags.add(ClientType.GRPC_NETTY.name());
-    } else {
-      tags.add(ClientType.GRPC.name());
-    }
+    LOG.info("Server tags: {}", tags);
   }
 
   private void registerMetrics() {
     LOG.info("Register metrics");
     CollectorRegistry shuffleServerCollectorRegistry = new 
CollectorRegistry(true);
-    String tags = coverToString();
-    ShuffleServerMetrics.register(shuffleServerCollectorRegistry, tags);
-    grpcMetrics = new ShuffleServerGrpcMetrics(this.shuffleServerConf, tags);
+    String rawTags = getEncodedTags();
+    ShuffleServerMetrics.register(shuffleServerCollectorRegistry, rawTags);
+    grpcMetrics = new ShuffleServerGrpcMetrics(this.shuffleServerConf, 
rawTags);
     grpcMetrics.register(new CollectorRegistry(true));
-    nettyMetrics = new ShuffleServerNettyMetrics(shuffleServerConf, tags);
+    nettyMetrics = new ShuffleServerNettyMetrics(shuffleServerConf, rawTags);
     nettyMetrics.register(new CollectorRegistry(true));
     CollectorRegistry jvmCollectorRegistry = new CollectorRegistry(true);
     boolean verbose =
@@ -494,17 +490,7 @@ public class ShuffleServer {
     return nettyPort;
   }
 
-  public String coverToString() {
-    List<String> tags = shuffleServerConf.get(ShuffleServerConf.TAGS);
-    StringBuilder sb = new StringBuilder();
-    sb.append(Constants.SHUFFLE_SERVER_VERSION);
-    if (tags == null || tags.size() == 0) {
-      return sb.toString();
-    }
-    for (String tag : tags) {
-      sb.append(",");
-      sb.append(tag);
-    }
-    return sb.toString();
+  public String getEncodedTags() {
+    return StringUtils.join(tags, ",");
   }
 }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index f7b3a5715..6d59b03b0 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -34,7 +34,6 @@ import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.metrics.TestUtils;
-import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.common.LocalStorage;
 import org.apache.uniffle.storage.util.StorageType;
 
@@ -52,6 +51,8 @@ public class ShuffleServerMetricsTest {
   private static final String STORAGE_HOST = "hdfs1";
   private static ShuffleServer shuffleServer;
 
+  private static String encodedTagsForMetricLabel;
+
   @BeforeAll
   public static void setUp() throws Exception {
     ShuffleServerConf ssc = new ShuffleServerConf();
@@ -69,6 +70,7 @@ public class ShuffleServerMetricsTest {
         .getStorageManager()
         .registerRemoteStorage("metricsTest", new 
RemoteStorageInfo(REMOTE_STORAGE_PATH));
     shuffleServer.start();
+    encodedTagsForMetricLabel = shuffleServer.getEncodedTags();
   }
 
   @AfterAll
@@ -88,16 +90,16 @@ public class ShuffleServerMetricsTest {
   @Test
   public void testServerMetrics() throws Exception {
     ShuffleServerMetrics.counterRemoteStorageFailedWrite
-        .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+        .labels(encodedTagsForMetricLabel, STORAGE_HOST)
         .inc(0);
     ShuffleServerMetrics.counterRemoteStorageSuccessWrite
-        .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+        .labels(encodedTagsForMetricLabel, STORAGE_HOST)
         .inc(0);
     ShuffleServerMetrics.counterRemoteStorageTotalWrite
-        .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+        .labels(encodedTagsForMetricLabel, STORAGE_HOST)
         .inc(0);
     ShuffleServerMetrics.counterRemoteStorageRetryWrite
-        .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+        .labels(encodedTagsForMetricLabel, STORAGE_HOST)
         .inc(0);
     String content = TestUtils.httpGet(SERVER_METRICS_URL);
     ObjectMapper mapper = new ObjectMapper();
@@ -112,7 +114,7 @@ public class ShuffleServerMetricsTest {
             ShuffleServerMetrics.STORAGE_RETRY_WRITE_REMOTE);
     for (String expectMetricName : expectedMetricNames) {
       validateMetrics(
-          mapper, metricsNode, expectMetricName, 
Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST);
+          mapper, metricsNode, expectMetricName, encodedTagsForMetricLabel, 
STORAGE_HOST);
     }
   }
 
@@ -140,7 +142,7 @@ public class ShuffleServerMetricsTest {
     assertEquals(
         1000.0,
         ShuffleServerMetrics.counterTotalHadoopWriteDataSize
-            .labels(Constants.SHUFFLE_SERVER_VERSION, host1)
+            .labels(encodedTagsForMetricLabel, host1)
             .get());
 
     // case2
@@ -148,7 +150,7 @@ public class ShuffleServerMetricsTest {
     assertEquals(
         1500.0,
         ShuffleServerMetrics.counterTotalHadoopWriteDataSize
-            .labels(Constants.SHUFFLE_SERVER_VERSION, host1)
+            .labels(encodedTagsForMetricLabel, host1)
             .get());
 
     // case3
@@ -157,14 +159,14 @@ public class ShuffleServerMetricsTest {
     assertEquals(
         2000.0,
         ShuffleServerMetrics.counterTotalHadoopWriteDataSize
-            .labels(Constants.SHUFFLE_SERVER_VERSION, host2)
+            .labels(encodedTagsForMetricLabel, host2)
             .get());
 
     // case4
     assertEquals(
         3500.0,
         ShuffleServerMetrics.counterTotalHadoopWriteDataSize
-            .labels(Constants.SHUFFLE_SERVER_VERSION, 
ShuffleServerMetrics.STORAGE_HOST_LABEL_ALL)
+            .labels(encodedTagsForMetricLabel, 
ShuffleServerMetrics.STORAGE_HOST_LABEL_ALL)
             .get());
   }
 
@@ -186,39 +188,39 @@ public class ShuffleServerMetricsTest {
     assertEquals(
         1.0,
         ShuffleServerMetrics.counterRemoteStorageTotalWrite
-            .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+            .labels(encodedTagsForMetricLabel, STORAGE_HOST)
             .get(),
         0.5);
     assertEquals(
         1.0,
         ShuffleServerMetrics.counterRemoteStorageRetryWrite
-            .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+            .labels(encodedTagsForMetricLabel, STORAGE_HOST)
             .get(),
         0.5);
     ShuffleServerMetrics.incStorageSuccessCounter(STORAGE_HOST);
     assertEquals(
         2.0,
         ShuffleServerMetrics.counterRemoteStorageTotalWrite
-            .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+            .labels(encodedTagsForMetricLabel, STORAGE_HOST)
             .get(),
         0.5);
     assertEquals(
         1.0,
         ShuffleServerMetrics.counterRemoteStorageSuccessWrite
-            .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+            .labels(encodedTagsForMetricLabel, STORAGE_HOST)
             .get(),
         0.5);
     ShuffleServerMetrics.incStorageFailedCounter(STORAGE_HOST);
     assertEquals(
         3.0,
         ShuffleServerMetrics.counterRemoteStorageTotalWrite
-            .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+            .labels(encodedTagsForMetricLabel, STORAGE_HOST)
             .get(),
         0.5);
     assertEquals(
         1.0,
         ShuffleServerMetrics.counterRemoteStorageFailedWrite
-            .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+            .labels(encodedTagsForMetricLabel, STORAGE_HOST)
             .get(),
         0.5);
   }

Reply via email to