This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new fc64c7ca9 [#1348] improvement(metrics): Unify tags generation for
shuffle-server metrics reporter (#1349)
fc64c7ca9 is described below
commit fc64c7ca9a92cd3dfd8b00d15119fbb839d34e7b
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Dec 13 16:58:08 2023 +0800
[#1348] improvement(metrics): Unify tags generation for shuffle-server
metrics reporter (#1349)
### What changes were proposed in this pull request?
Currently, the tags label is generated by self when registering metrics for
shuffle-server. This is wrong, which will lack the GRPC or NETTY_GRPC tags.
### Why are the changes needed?
Fix: #1348
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Neen't
---
.../org/apache/uniffle/common/util/Constants.java | 2 +-
.../apache/uniffle/test/CoordinatorGrpcTest.java | 5 +++
.../org/apache/uniffle/server/ShuffleServer.java | 42 ++++++++--------------
.../uniffle/server/ShuffleServerMetricsTest.java | 34 +++++++++---------
4 files changed, 38 insertions(+), 45 deletions(-)
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index 5970bacc4..4b35463cc 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -23,7 +23,7 @@ public final class Constants {
// the value is used for client/server compatible, eg, online upgrade
public static final String SHUFFLE_SERVER_VERSION = "ss_v4";
- public static final String METRICS_TAG_LABEL_NAME = "label";
+ public static final String METRICS_TAG_LABEL_NAME = "tags";
public static final String COORDINATOR_TAG = "coordinator";
public static final String SHUFFLE_DATA_FILE_SUFFIX = ".data";
public static final String SHUFFLE_INDEX_FILE_SUFFIX = ".index";
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
index 8b0693a62..0384c1f2a 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
@@ -45,6 +45,7 @@ import org.apache.uniffle.common.storage.StorageMedia;
import org.apache.uniffle.common.storage.StorageStatus;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.CoordinatorServer;
import org.apache.uniffle.coordinator.ServerNode;
import org.apache.uniffle.coordinator.SimpleClusterManager;
import org.apache.uniffle.coordinator.metric.CoordinatorGrpcMetrics;
@@ -153,6 +154,7 @@ public class CoordinatorGrpcTest extends
CoordinatorTestBase {
((ConcurrentHashMap<Object, Object>) field.get(shuffleServerConf))
.remove(ShuffleServerConf.NETTY_SERVER_PORT.key());
String storageTypeJsonSource = String.format("{\"%s\": \"ssd\"}", baseDir);
+
withEnvironmentVariables("RSS_ENV_KEY", storageTypeJsonSource)
.execute(
() -> {
@@ -162,6 +164,9 @@ public class CoordinatorGrpcTest extends
CoordinatorTestBase {
shuffleServers.set(0, ss);
});
Thread.sleep(5000);
+ CoordinatorServer coordinatorServer = coordinators.get(0);
+ ((SimpleClusterManager)
(coordinatorServer.getClusterManager())).nodesCheckTest();
+
// add tag when ClientType is `GRPC`
RssGetShuffleAssignmentsRequest request =
new RssGetShuffleAssignmentsRequest(
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 573af23ea..a5e230eb3 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -29,13 +29,14 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.prometheus.client.CollectorRegistry;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;
import org.apache.uniffle.common.Arguments;
-import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ServerStatus;
+import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.exception.InvalidRequestException;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.metrics.GRPCMetrics;
@@ -208,6 +209,8 @@ public class ShuffleServer {
grpcPort = shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT);
nettyPort =
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_PORT);
+ initServerTags();
+
jettyServer = new JettyServer(shuffleServerConf);
registerMetrics();
// register packages and instances for jersey
@@ -261,44 +264,37 @@ public class ShuffleServer {
shuffleTaskManager =
new ShuffleTaskManager(
shuffleServerConf, shuffleFlushManager, shuffleBufferManager,
storageManager);
+
nettyServerEnabled =
shuffleServerConf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0;
if (nettyServerEnabled) {
streamServer = new StreamServer(this);
}
setServer();
-
- initServerTags();
}
private void initServerTags() {
// it's the system tag for server's version
tags.add(Constants.SHUFFLE_SERVER_VERSION);
+ // the rpc service type bound into tags
+ tags.add(shuffleServerConf.get(RssBaseConf.RPC_SERVER_TYPE).name());
List<String> configuredTags =
shuffleServerConf.get(ShuffleServerConf.TAGS);
if (CollectionUtils.isNotEmpty(configuredTags)) {
tags.addAll(configuredTags);
}
- tagServer();
- LOG.info("Server tags: {}", tags);
- }
- private void tagServer() {
- if (nettyServerEnabled) {
- tags.add(ClientType.GRPC_NETTY.name());
- } else {
- tags.add(ClientType.GRPC.name());
- }
+ LOG.info("Server tags: {}", tags);
}
private void registerMetrics() {
LOG.info("Register metrics");
CollectorRegistry shuffleServerCollectorRegistry = new
CollectorRegistry(true);
- String tags = coverToString();
- ShuffleServerMetrics.register(shuffleServerCollectorRegistry, tags);
- grpcMetrics = new ShuffleServerGrpcMetrics(this.shuffleServerConf, tags);
+ String rawTags = getEncodedTags();
+ ShuffleServerMetrics.register(shuffleServerCollectorRegistry, rawTags);
+ grpcMetrics = new ShuffleServerGrpcMetrics(this.shuffleServerConf,
rawTags);
grpcMetrics.register(new CollectorRegistry(true));
- nettyMetrics = new ShuffleServerNettyMetrics(shuffleServerConf, tags);
+ nettyMetrics = new ShuffleServerNettyMetrics(shuffleServerConf, rawTags);
nettyMetrics.register(new CollectorRegistry(true));
CollectorRegistry jvmCollectorRegistry = new CollectorRegistry(true);
boolean verbose =
@@ -494,17 +490,7 @@ public class ShuffleServer {
return nettyPort;
}
- public String coverToString() {
- List<String> tags = shuffleServerConf.get(ShuffleServerConf.TAGS);
- StringBuilder sb = new StringBuilder();
- sb.append(Constants.SHUFFLE_SERVER_VERSION);
- if (tags == null || tags.size() == 0) {
- return sb.toString();
- }
- for (String tag : tags) {
- sb.append(",");
- sb.append(tag);
- }
- return sb.toString();
+ public String getEncodedTags() {
+ return StringUtils.join(tags, ",");
}
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index f7b3a5715..6d59b03b0 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -34,7 +34,6 @@ import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.metrics.TestUtils;
-import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.util.StorageType;
@@ -52,6 +51,8 @@ public class ShuffleServerMetricsTest {
private static final String STORAGE_HOST = "hdfs1";
private static ShuffleServer shuffleServer;
+ private static String encodedTagsForMetricLabel;
+
@BeforeAll
public static void setUp() throws Exception {
ShuffleServerConf ssc = new ShuffleServerConf();
@@ -69,6 +70,7 @@ public class ShuffleServerMetricsTest {
.getStorageManager()
.registerRemoteStorage("metricsTest", new
RemoteStorageInfo(REMOTE_STORAGE_PATH));
shuffleServer.start();
+ encodedTagsForMetricLabel = shuffleServer.getEncodedTags();
}
@AfterAll
@@ -88,16 +90,16 @@ public class ShuffleServerMetricsTest {
@Test
public void testServerMetrics() throws Exception {
ShuffleServerMetrics.counterRemoteStorageFailedWrite
- .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+ .labels(encodedTagsForMetricLabel, STORAGE_HOST)
.inc(0);
ShuffleServerMetrics.counterRemoteStorageSuccessWrite
- .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+ .labels(encodedTagsForMetricLabel, STORAGE_HOST)
.inc(0);
ShuffleServerMetrics.counterRemoteStorageTotalWrite
- .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+ .labels(encodedTagsForMetricLabel, STORAGE_HOST)
.inc(0);
ShuffleServerMetrics.counterRemoteStorageRetryWrite
- .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+ .labels(encodedTagsForMetricLabel, STORAGE_HOST)
.inc(0);
String content = TestUtils.httpGet(SERVER_METRICS_URL);
ObjectMapper mapper = new ObjectMapper();
@@ -112,7 +114,7 @@ public class ShuffleServerMetricsTest {
ShuffleServerMetrics.STORAGE_RETRY_WRITE_REMOTE);
for (String expectMetricName : expectedMetricNames) {
validateMetrics(
- mapper, metricsNode, expectMetricName,
Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST);
+ mapper, metricsNode, expectMetricName, encodedTagsForMetricLabel,
STORAGE_HOST);
}
}
@@ -140,7 +142,7 @@ public class ShuffleServerMetricsTest {
assertEquals(
1000.0,
ShuffleServerMetrics.counterTotalHadoopWriteDataSize
- .labels(Constants.SHUFFLE_SERVER_VERSION, host1)
+ .labels(encodedTagsForMetricLabel, host1)
.get());
// case2
@@ -148,7 +150,7 @@ public class ShuffleServerMetricsTest {
assertEquals(
1500.0,
ShuffleServerMetrics.counterTotalHadoopWriteDataSize
- .labels(Constants.SHUFFLE_SERVER_VERSION, host1)
+ .labels(encodedTagsForMetricLabel, host1)
.get());
// case3
@@ -157,14 +159,14 @@ public class ShuffleServerMetricsTest {
assertEquals(
2000.0,
ShuffleServerMetrics.counterTotalHadoopWriteDataSize
- .labels(Constants.SHUFFLE_SERVER_VERSION, host2)
+ .labels(encodedTagsForMetricLabel, host2)
.get());
// case4
assertEquals(
3500.0,
ShuffleServerMetrics.counterTotalHadoopWriteDataSize
- .labels(Constants.SHUFFLE_SERVER_VERSION,
ShuffleServerMetrics.STORAGE_HOST_LABEL_ALL)
+ .labels(encodedTagsForMetricLabel,
ShuffleServerMetrics.STORAGE_HOST_LABEL_ALL)
.get());
}
@@ -186,39 +188,39 @@ public class ShuffleServerMetricsTest {
assertEquals(
1.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite
- .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+ .labels(encodedTagsForMetricLabel, STORAGE_HOST)
.get(),
0.5);
assertEquals(
1.0,
ShuffleServerMetrics.counterRemoteStorageRetryWrite
- .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+ .labels(encodedTagsForMetricLabel, STORAGE_HOST)
.get(),
0.5);
ShuffleServerMetrics.incStorageSuccessCounter(STORAGE_HOST);
assertEquals(
2.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite
- .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+ .labels(encodedTagsForMetricLabel, STORAGE_HOST)
.get(),
0.5);
assertEquals(
1.0,
ShuffleServerMetrics.counterRemoteStorageSuccessWrite
- .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+ .labels(encodedTagsForMetricLabel, STORAGE_HOST)
.get(),
0.5);
ShuffleServerMetrics.incStorageFailedCounter(STORAGE_HOST);
assertEquals(
3.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite
- .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+ .labels(encodedTagsForMetricLabel, STORAGE_HOST)
.get(),
0.5);
assertEquals(
1.0,
ShuffleServerMetrics.counterRemoteStorageFailedWrite
- .labels(Constants.SHUFFLE_SERVER_VERSION, STORAGE_HOST)
+ .labels(encodedTagsForMetricLabel, STORAGE_HOST)
.get(),
0.5);
}