This is an automated email from the ASF dual-hosted git repository.
xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new c59e2a4b [#799] feat: use storage host label for remote storage write
metrics (#800)
c59e2a4b is described below
commit c59e2a4b10284534b47f5a52373cbe4e62ac6b90
Author: advancedxy <[email protected]>
AuthorDate: Fri Apr 7 10:32:33 2023 +0800
[#799] feat: use storage host label for remote storage write metrics (#800)
### What changes were proposed in this pull request?
1. replace dynamic counter with counter with label
### Why are the changes needed?
Fix: #799
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
---
.../uniffle/server/ShuffleServerMetrics.java | 70 ++++++----------------
.../uniffle/server/storage/HdfsStorageManager.java | 2 -
.../uniffle/server/ShuffleFlushManagerTest.java | 12 ++--
.../uniffle/server/ShuffleServerMetricsTest.java | 37 ++++++------
4 files changed, 44 insertions(+), 77 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index 098560d1..a5b09ff1 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -17,8 +17,6 @@
package org.apache.uniffle.server;
-import java.util.Map;
-
import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
@@ -26,8 +24,6 @@ import io.prometheus.client.Gauge;
import org.apache.commons.lang3.StringUtils;
import org.apache.uniffle.common.metrics.MetricsManager;
-import org.apache.uniffle.common.util.JavaUtils;
-import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.storage.common.LocalStorage;
public class ShuffleServerMetrics {
@@ -80,10 +76,11 @@ public class ShuffleServerMetrics {
private static final String STORAGE_RETRY_WRITE_LOCAL =
"storage_retry_write_local";
private static final String STORAGE_FAILED_WRITE_LOCAL =
"storage_failed_write_local";
private static final String STORAGE_SUCCESS_WRITE_LOCAL =
"storage_success_write_local";
- public static final String STORAGE_TOTAL_WRITE_REMOTE_PREFIX =
"storage_total_write_remote_";
- public static final String STORAGE_RETRY_WRITE_REMOTE_PREFIX =
"storage_retry_write_remote_";
- public static final String STORAGE_FAILED_WRITE_REMOTE_PREFIX =
"storage_failed_write_remote_";
- public static final String STORAGE_SUCCESS_WRITE_REMOTE_PREFIX =
"storage_success_write_remote_";
+ private static final String STORAGE_HOST_LABEL = "storage_host";
+ public static final String STORAGE_TOTAL_WRITE_REMOTE =
"storage_total_write_remote";
+ public static final String STORAGE_RETRY_WRITE_REMOTE =
"storage_retry_write_remote";
+ public static final String STORAGE_FAILED_WRITE_REMOTE =
"storage_failed_write_remote";
+ public static final String STORAGE_SUCCESS_WRITE_REMOTE =
"storage_success_write_remote";
private static final String TOTAL_APP_NUM = "total_app_num";
private static final String TOTAL_APP_WITH_HUGE_PARTITION_NUM =
"total_app_with_huge_partition_num";
@@ -148,20 +145,16 @@ public class ShuffleServerMetrics {
public static Gauge gaugeEventQueueSize;
public static Gauge gaugeAppNum;
public static Gauge gaugeTotalPartitionNum;
- public static Map<String, Counter> counterRemoteStorageTotalWrite;
- public static Map<String, Counter> counterRemoteStorageRetryWrite;
- public static Map<String, Counter> counterRemoteStorageFailedWrite;
- public static Map<String, Counter> counterRemoteStorageSuccessWrite;
+ public static Counter counterRemoteStorageTotalWrite;
+ public static Counter counterRemoteStorageRetryWrite;
+ public static Counter counterRemoteStorageFailedWrite;
+ public static Counter counterRemoteStorageSuccessWrite;
private static MetricsManager metricsManager;
private static boolean isRegister = false;
public static synchronized void register(CollectorRegistry
collectorRegistry) {
if (!isRegister) {
- counterRemoteStorageTotalWrite = JavaUtils.newConcurrentMap();
- counterRemoteStorageRetryWrite = JavaUtils.newConcurrentMap();
- counterRemoteStorageFailedWrite = JavaUtils.newConcurrentMap();
- counterRemoteStorageSuccessWrite = JavaUtils.newConcurrentMap();
metricsManager = new MetricsManager(collectorRegistry);
isRegister = true;
setUpMetrics();
@@ -183,43 +176,14 @@ public class ShuffleServerMetrics {
return metricsManager.getCollectorRegistry();
}
- public static synchronized void addDynamicCounterForRemoteStorage(String
storageHost) {
- if (!StringUtils.isEmpty(storageHost)) {
- String totalWriteMetricName = STORAGE_TOTAL_WRITE_REMOTE_PREFIX
- + RssUtils.getMetricNameForHostName(storageHost);
- if (!counterRemoteStorageTotalWrite.containsKey(storageHost)) {
- counterRemoteStorageTotalWrite.putIfAbsent(storageHost,
- metricsManager.addCounter(totalWriteMetricName));
- }
- String retryWriteMetricName = STORAGE_RETRY_WRITE_REMOTE_PREFIX
- + RssUtils.getMetricNameForHostName(storageHost);
- if (!counterRemoteStorageRetryWrite.containsKey(storageHost)) {
- counterRemoteStorageRetryWrite.putIfAbsent(storageHost,
- metricsManager.addCounter(retryWriteMetricName));
- }
- String failedWriteMetricName = STORAGE_FAILED_WRITE_REMOTE_PREFIX
- + RssUtils.getMetricNameForHostName(storageHost);
- if (!counterRemoteStorageFailedWrite.containsKey(storageHost)) {
- counterRemoteStorageFailedWrite.putIfAbsent(storageHost,
- metricsManager.addCounter(failedWriteMetricName));
- }
- String successWriteMetricName = STORAGE_SUCCESS_WRITE_REMOTE_PREFIX
- + RssUtils.getMetricNameForHostName(storageHost);
- if (!counterRemoteStorageSuccessWrite.containsKey(storageHost)) {
- counterRemoteStorageSuccessWrite.putIfAbsent(storageHost,
- metricsManager.addCounter(successWriteMetricName));
- }
- }
- }
-
public static void incStorageRetryCounter(String storageHost) {
if (LocalStorage.STORAGE_HOST.equals(storageHost)) {
counterLocalStorageTotalWrite.inc();
counterLocalStorageRetryWrite.inc();
} else {
if (!StringUtils.isEmpty(storageHost)) {
- counterRemoteStorageTotalWrite.get(storageHost).inc();
- counterRemoteStorageRetryWrite.get(storageHost).inc();
+ counterRemoteStorageTotalWrite.labels(storageHost).inc();
+ counterRemoteStorageRetryWrite.labels(storageHost).inc();
}
}
}
@@ -230,8 +194,8 @@ public class ShuffleServerMetrics {
counterLocalStorageSuccessWrite.inc();
} else {
if (!StringUtils.isEmpty(storageHost)) {
- counterRemoteStorageTotalWrite.get(storageHost).inc();
- counterRemoteStorageSuccessWrite.get(storageHost).inc();
+ counterRemoteStorageTotalWrite.labels(storageHost).inc();
+ counterRemoteStorageSuccessWrite.labels(storageHost).inc();
}
}
}
@@ -242,8 +206,8 @@ public class ShuffleServerMetrics {
counterLocalStorageFailedWrite.inc();
} else {
if (!StringUtils.isEmpty(storageHost)) {
- counterRemoteStorageTotalWrite.get(storageHost).inc();
- counterRemoteStorageFailedWrite.get(storageHost).inc();
+ counterRemoteStorageTotalWrite.labels(storageHost).inc();
+ counterRemoteStorageFailedWrite.labels(storageHost).inc();
}
}
}
@@ -278,6 +242,10 @@ public class ShuffleServerMetrics {
counterLocalStorageRetryWrite =
metricsManager.addCounter(STORAGE_RETRY_WRITE_LOCAL);
counterLocalStorageFailedWrite =
metricsManager.addCounter(STORAGE_FAILED_WRITE_LOCAL);
counterLocalStorageSuccessWrite =
metricsManager.addCounter(STORAGE_SUCCESS_WRITE_LOCAL);
+ counterRemoteStorageTotalWrite =
metricsManager.addCounter(STORAGE_TOTAL_WRITE_REMOTE, STORAGE_HOST_LABEL);
+ counterRemoteStorageRetryWrite =
metricsManager.addCounter(STORAGE_RETRY_WRITE_REMOTE, STORAGE_HOST_LABEL);
+ counterRemoteStorageFailedWrite =
metricsManager.addCounter(STORAGE_FAILED_WRITE_REMOTE, STORAGE_HOST_LABEL);
+ counterRemoteStorageSuccessWrite =
metricsManager.addCounter(STORAGE_SUCCESS_WRITE_REMOTE, STORAGE_HOST_LABEL);
counterTotalRequireReadMemoryNum =
metricsManager.addCounter(TOTAL_REQUIRE_READ_MEMORY);
counterTotalRequireReadMemoryRetryNum =
metricsManager.addCounter(TOTAL_REQUIRE_READ_MEMORY_RETRY);
counterTotalRequireReadMemoryFailedNum =
metricsManager.addCounter(TOTAL_REQUIRE_READ_MEMORY_FAILED);
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
index 7fddb86a..2cebe0ca 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
@@ -131,8 +131,6 @@ public class HdfsStorageManager extends
SingleStorageManager {
}
}
HdfsStorage hdfsStorage = new HdfsStorage(remoteStorage,
remoteStorageHadoopConf);
- String storageHost = hdfsStorage.getStorageHost();
- ShuffleServerMetrics.addDynamicCounterForRemoteStorage(storageHost);
return hdfsStorage;
});
appIdToStorages.computeIfAbsent(appId, key ->
pathToStorages.get(remoteStorage));
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index aa8f88b6..54f4a0b5 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -186,10 +186,10 @@ public class ShuffleFlushManagerTest extends HdfsTestBase
{
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
storageManager.registerRemoteStorage(appId, remoteStorage);
String storageHost = cluster.getURI().getHost();
- assertEquals(0.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite.get(storageHost).get(),
0.5);
- assertEquals(0.0,
ShuffleServerMetrics.counterRemoteStorageRetryWrite.get(storageHost).get(),
0.5);
- assertEquals(0.0,
ShuffleServerMetrics.counterRemoteStorageFailedWrite.get(storageHost).get(),
0.5);
- assertEquals(0.0,
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.get(storageHost).get(),
0.5);
+ assertEquals(0.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(storageHost).get(),
0.5);
+ assertEquals(0.0,
ShuffleServerMetrics.counterRemoteStorageRetryWrite.labels(storageHost).get(),
0.5);
+ assertEquals(0.0,
ShuffleServerMetrics.counterRemoteStorageFailedWrite.labels(storageHost).get(),
0.5);
+ assertEquals(0.0,
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.labels(storageHost).get(),
0.5);
ShuffleFlushManager manager =
new ShuffleFlushManager(shuffleServerConf, mockShuffleServer,
storageManager);
ShuffleDataFlushEvent event1 =
@@ -214,8 +214,8 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
validate(appId, 2, 2, blocks21, 1, remoteStorage.getPath());
assertEquals(blocks21.size(), manager.getCommittedBlockIds(appId,
2).getLongCardinality());
- assertEquals(3.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite.get(storageHost).get(),
0.5);
- assertEquals(3.0,
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.get(storageHost).get(),
0.5);
+ assertEquals(3.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(storageHost).get(),
0.5);
+ assertEquals(3.0,
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.labels(storageHost).get(),
0.5);
// test case for process event whose related app was cleared already
assertEquals(0, ShuffleServerMetrics.gaugeWriteHandler.get(), 0.5);
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index 99993eb4..93f48f8a 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -37,6 +37,7 @@ import org.apache.uniffle.common.metrics.TestUtils;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.util.StorageType;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -85,32 +86,32 @@ public class ShuffleServerMetricsTest {
@Test
public void testServerMetrics() throws Exception {
+
ShuffleServerMetrics.counterRemoteStorageFailedWrite.labels(STORAGE_HOST).inc(0);
+
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.labels(STORAGE_HOST).inc(0);
+
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(STORAGE_HOST).inc(0);
+
ShuffleServerMetrics.counterRemoteStorageRetryWrite.labels(STORAGE_HOST).inc(0);
String content = TestUtils.httpGet(SERVER_METRICS_URL);
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(content);
assertEquals(2, actualObj.size());
JsonNode metricsNode = actualObj.get("metrics");
-
List<String> expectedMetricNames = Lists.newArrayList(
- ShuffleServerMetrics.STORAGE_TOTAL_WRITE_REMOTE_PREFIX + STORAGE_HOST,
- ShuffleServerMetrics.STORAGE_SUCCESS_WRITE_REMOTE_PREFIX +
STORAGE_HOST,
- ShuffleServerMetrics.STORAGE_FAILED_WRITE_REMOTE_PREFIX + STORAGE_HOST,
- ShuffleServerMetrics.STORAGE_RETRY_WRITE_REMOTE_PREFIX + STORAGE_HOST);
+ ShuffleServerMetrics.STORAGE_TOTAL_WRITE_REMOTE,
+ ShuffleServerMetrics.STORAGE_SUCCESS_WRITE_REMOTE,
+ ShuffleServerMetrics.STORAGE_FAILED_WRITE_REMOTE,
+ ShuffleServerMetrics.STORAGE_RETRY_WRITE_REMOTE);
for (String expectMetricName : expectedMetricNames) {
- validateMetrics(metricsNode, expectMetricName);
+ validateMetrics(mapper, metricsNode, expectMetricName, STORAGE_HOST);
}
-
- // for duplicate register, IllegalArgumentException shouldn't be thrown
- String hostName = "duplicateHost";
- ShuffleServerMetrics.addDynamicCounterForRemoteStorage(hostName);
- ShuffleServerMetrics.addDynamicCounterForRemoteStorage(hostName);
}
- private void validateMetrics(JsonNode metricsNode, String
expectedMetricName) {
+ private void validateMetrics(ObjectMapper mapper, JsonNode metricsNode,
String expectedMetricName, String... labels) {
boolean bingo = false;
for (int i = 0; i < metricsNode.size(); i++) {
JsonNode metricsName = metricsNode.get(i).get("name");
if (expectedMetricName.equals(metricsName.textValue())) {
+ List<String> labelValues =
mapper.convertValue(metricsNode.get(i).get("labelValues"), ArrayList.class);
+ assertArrayEquals(labels, labelValues.toArray(new String[0]));
bingo = true;
break;
}
@@ -133,14 +134,14 @@ public class ShuffleServerMetricsTest {
// test for remote storage
ShuffleServerMetrics.incStorageRetryCounter(STORAGE_HOST);
- assertEquals(1.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite.get(STORAGE_HOST).get(),
0.5);
- assertEquals(1.0,
ShuffleServerMetrics.counterRemoteStorageRetryWrite.get(STORAGE_HOST).get(),
0.5);
+ assertEquals(1.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(STORAGE_HOST).get(),
0.5);
+ assertEquals(1.0,
ShuffleServerMetrics.counterRemoteStorageRetryWrite.labels(STORAGE_HOST).get(),
0.5);
ShuffleServerMetrics.incStorageSuccessCounter(STORAGE_HOST);
- assertEquals(2.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite.get(STORAGE_HOST).get(),
0.5);
- assertEquals(1.0,
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.get(STORAGE_HOST).get(),
0.5);
+ assertEquals(2.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(STORAGE_HOST).get(),
0.5);
+ assertEquals(1.0,
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.labels(STORAGE_HOST).get(),
0.5);
ShuffleServerMetrics.incStorageFailedCounter(STORAGE_HOST);
- assertEquals(3.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite.get(STORAGE_HOST).get(),
0.5);
- assertEquals(1.0,
ShuffleServerMetrics.counterRemoteStorageFailedWrite.get(STORAGE_HOST).get(),
0.5);
+ assertEquals(3.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(STORAGE_HOST).get(),
0.5);
+ assertEquals(1.0,
ShuffleServerMetrics.counterRemoteStorageFailedWrite.labels(STORAGE_HOST).get(),
0.5);
}
@Test