This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 48696301 [#1108] feat(server): Add labels with disk path for
`total_localfile_write_data` metrics. (#1160)
48696301 is described below
commit 48696301289ee291c971b803e32a47704c364cb3
Author: Fantasy-Jay <[email protected]>
AuthorDate: Mon Aug 21 16:51:03 2023 +0800
[#1108] feat(server): Add labels with disk path for
`total_localfile_write_data` metrics. (#1160)
### What changes were proposed in this pull request?
Add labels with disk path for local storage `total_localfile_write_data`
metrics.
### Why are the changes needed?
Fix: https://github.com/apache/incubator-uniffle/issues/1108
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add more unit test case.
---
.../uniffle/server/ShuffleServerMetrics.java | 6 +-
.../server/storage/LocalStorageManager.java | 9 ++-
.../uniffle/server/ShuffleFlushManagerTest.java | 82 ++++++++++++++++++++--
3 files changed, 90 insertions(+), 7 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index ea7b445c..dbbe36a6 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -75,6 +75,8 @@ public class ShuffleServerMetrics {
private static final String TOTAL_DROPPED_EVENT_NUM =
"total_dropped_event_num";
private static final String TOTAL_HADOOP_WRITE_DATA =
"total_hadoop_write_data";
private static final String TOTAL_LOCALFILE_WRITE_DATA =
"total_localfile_write_data";
+ private static final String LOCAL_DISK_PATH_LABEL = "local_disk_path";
+ public static final String LOCAL_DISK_PATH_LABEL_ALL = "ALL";
private static final String TOTAL_REQUIRE_BUFFER_FAILED =
"total_require_buffer_failed";
private static final String TOTAL_REQUIRE_BUFFER_FAILED_FOR_HUGE_PARTITION =
"total_require_buffer_failed_for_huge_partition";
@@ -126,7 +128,6 @@ public class ShuffleServerMetrics {
public static Counter.Child counterTotalReadTime;
public static Counter.Child counterTotalFailedWrittenEventNum;
public static Counter.Child counterTotalDroppedEventNum;
- public static Counter.Child counterTotalLocalFileWriteDataSize;
public static Counter.Child counterTotalRequireBufferFailed;
public static Counter.Child counterTotalRequireBufferFailedForHugePartition;
public static Counter.Child
counterTotalRequireBufferFailedForRegularPartition;
@@ -164,6 +165,7 @@ public class ShuffleServerMetrics {
public static Counter counterRemoteStorageFailedWrite;
public static Counter counterRemoteStorageSuccessWrite;
public static Counter counterTotalHadoopWriteDataSize;
+ public static Counter counterTotalLocalFileWriteDataSize;
private static String tags;
public static Counter counterLocalFileEventFlush;
@@ -268,7 +270,7 @@ public class ShuffleServerMetrics {
metricsManager.addCounter(
TOTAL_HADOOP_WRITE_DATA, Constants.METRICS_TAG_LABEL_NAME,
STORAGE_HOST_LABEL);
counterTotalLocalFileWriteDataSize =
- metricsManager.addLabeledCounter(TOTAL_LOCALFILE_WRITE_DATA);
+ metricsManager.addCounter(TOTAL_LOCALFILE_WRITE_DATA,
LOCAL_DISK_PATH_LABEL);
counterTotalRequireBufferFailed =
metricsManager.addLabeledCounter(TOTAL_REQUIRE_BUFFER_FAILED);
counterTotalRequireBufferFailedForRegularPartition =
metricsManager.addLabeledCounter(TOTAL_REQUIRE_BUFFER_FAILED_FOR_REGULAR_PARTITION);
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 635cc4c6..0752abcc 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -237,7 +237,14 @@ public class LocalStorageManager extends
SingleStorageManager {
@Override
public void updateWriteMetrics(ShuffleDataFlushEvent event, long writeTime) {
super.updateWriteMetrics(event, writeTime);
-
ShuffleServerMetrics.counterTotalLocalFileWriteDataSize.inc(event.getSize());
+ ShuffleServerMetrics.counterTotalLocalFileWriteDataSize
+ .labels(ShuffleServerMetrics.LOCAL_DISK_PATH_LABEL_ALL)
+ .inc(event.getSize());
+ if (event.getUnderStorage() != null) {
+ ShuffleServerMetrics.counterTotalLocalFileWriteDataSize
+ .labels(event.getUnderStorage().getStoragePath())
+ .inc(event.getSize());
+ }
}
@Override
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 2df02033..b91e669e 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -283,7 +283,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase
{
// wait for write data
waitForFlush(manager, appId, 1, 5);
- validateLocalMetadata(storageManager, 160L);
+ validateLocalMetadata(storageManager, 0, 160L);
ShuffleDataFlushEvent event12 = createShuffleDataFlushEvent(appId, 1, 1,
1, null);
manager.addToFlushQueue(event12);
@@ -291,7 +291,60 @@ public class ShuffleFlushManagerTest extends
HadoopTestBase {
// wait for write data
waitForFlush(manager, appId, 1, 10);
- validateLocalMetadata(storageManager, 320L);
+ validateLocalMetadata(storageManager, 0, 320L);
+ }
+
+ @Test
+ public void totalLocalFileWriteDataMetricTest() throws Exception {
+ List<String> storagePaths = Arrays.asList("/tmp/rss-data1",
"/tmp/rss-data2", "/tmp/rss-data3");
+
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
storagePaths);
+ shuffleServerConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
+ shuffleServerConf.setString(
+ ShuffleServerConf.RSS_STORAGE_TYPE.key(),
StorageType.LOCALFILE.name());
+
+ String appId = "localMetricsTest_appId";
+ StorageManager storageManager =
+
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+ ShuffleFlushManager manager =
+ new ShuffleFlushManager(shuffleServerConf, mockShuffleServer,
storageManager);
+
+ ShuffleDataFlushEvent flushEvent = createShuffleDataFlushEvent(appId, 1,
1, 1, 10, 100, null);
+ manager.addToFlushQueue(flushEvent);
+ // wait for write data
+ waitForFlush(manager, appId, 1, 10);
+ int storageIndex =
storagePaths.indexOf(flushEvent.getUnderStorage().getStoragePath());
+ validateLocalMetadata(storageManager, storageIndex, 1000L);
+
+ flushEvent = createShuffleDataFlushEvent(appId, 2, 1, 1, 10, 101, null);
+ manager.addToFlushQueue(flushEvent);
+ // wait for write data
+ waitForFlush(manager, appId, 2, 10);
+ int storageIndex1 =
storagePaths.indexOf(flushEvent.getUnderStorage().getStoragePath());
+ validateLocalMetadata(storageManager, storageIndex1, 1010L);
+
+ flushEvent = createShuffleDataFlushEvent(appId, 3, 1, 1, 10, 102, null);
+ manager.addToFlushQueue(flushEvent);
+ // wait for write data
+ waitForFlush(manager, appId, 3, 10);
+ int storageIndex2 =
storagePaths.indexOf(flushEvent.getUnderStorage().getStoragePath());
+ validateLocalMetadata(storageManager, storageIndex2, 1020L);
+
+ assertEquals(
+ 1000L,
+ ShuffleServerMetrics.counterTotalLocalFileWriteDataSize
+ .labels(storagePaths.get(storageIndex))
+ .get());
+ assertEquals(
+ 1010L,
+ ShuffleServerMetrics.counterTotalLocalFileWriteDataSize
+ .labels(storagePaths.get(storageIndex1))
+ .get());
+ assertEquals(
+ 1020L,
+ ShuffleServerMetrics.counterTotalLocalFileWriteDataSize
+ .labels(storagePaths.get(storageIndex2))
+ .get());
}
@Test
@@ -515,6 +568,26 @@ public class ShuffleFlushManagerTest extends
HadoopTestBase {
null);
}
+ public static ShuffleDataFlushEvent createShuffleDataFlushEvent(
+ String appId,
+ int shuffleId,
+ int startPartition,
+ int endPartition,
+ int blockNum,
+ int blockSize,
+ Supplier<Boolean> isValid) {
+ return new ShuffleDataFlushEvent(
+ ATOMIC_LONG.getAndIncrement(),
+ appId,
+ shuffleId,
+ startPartition,
+ endPartition,
+ (long) blockNum * blockSize,
+ createBlock(blockNum, blockSize),
+ isValid,
+ null);
+ }
+
public static List<ShufflePartitionedBlock> createBlock(int num, int length)
{
List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
for (int i = 0; i < num; i++) {
@@ -677,9 +750,10 @@ public class ShuffleFlushManagerTest extends
HadoopTestBase {
assertEquals(2, ShuffleServerMetrics.counterHadoopEventFlush.get());
}
- private void validateLocalMetadata(StorageManager storageManager, Long size)
{
+ private void validateLocalMetadata(StorageManager storageManager, int
storageIndex, Long size) {
assertInstanceOf(LocalStorageManager.class, storageManager);
- LocalStorage localStorage = ((LocalStorageManager)
storageManager).getStorages().get(0);
+ LocalStorage localStorage =
+ ((LocalStorageManager) storageManager).getStorages().get(storageIndex);
assertEquals(size, localStorage.getMetaData().getDiskSize().longValue());
}
}