This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 48696301 [#1108] feat(server): Add labels with disk path for 
`total_localfile_write_data` metrics. (#1160)
48696301 is described below

commit 48696301289ee291c971b803e32a47704c364cb3
Author: Fantasy-Jay <[email protected]>
AuthorDate: Mon Aug 21 16:51:03 2023 +0800

    [#1108] feat(server): Add labels with disk path for 
`total_localfile_write_data` metrics. (#1160)
    
    ### What changes were proposed in this pull request?
    
    Add labels with disk path for local storage `total_localfile_write_data` 
metrics.
    
    ### Why are the changes needed?
    
    Fix: https://github.com/apache/incubator-uniffle/issues/1108
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add more unit test case.
---
 .../uniffle/server/ShuffleServerMetrics.java       |  6 +-
 .../server/storage/LocalStorageManager.java        |  9 ++-
 .../uniffle/server/ShuffleFlushManagerTest.java    | 82 ++++++++++++++++++++--
 3 files changed, 90 insertions(+), 7 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index ea7b445c..dbbe36a6 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -75,6 +75,8 @@ public class ShuffleServerMetrics {
   private static final String TOTAL_DROPPED_EVENT_NUM = 
"total_dropped_event_num";
   private static final String TOTAL_HADOOP_WRITE_DATA = 
"total_hadoop_write_data";
   private static final String TOTAL_LOCALFILE_WRITE_DATA = 
"total_localfile_write_data";
+  private static final String LOCAL_DISK_PATH_LABEL = "local_disk_path";
+  public static final String LOCAL_DISK_PATH_LABEL_ALL = "ALL";
   private static final String TOTAL_REQUIRE_BUFFER_FAILED = 
"total_require_buffer_failed";
   private static final String TOTAL_REQUIRE_BUFFER_FAILED_FOR_HUGE_PARTITION =
       "total_require_buffer_failed_for_huge_partition";
@@ -126,7 +128,6 @@ public class ShuffleServerMetrics {
   public static Counter.Child counterTotalReadTime;
   public static Counter.Child counterTotalFailedWrittenEventNum;
   public static Counter.Child counterTotalDroppedEventNum;
-  public static Counter.Child counterTotalLocalFileWriteDataSize;
   public static Counter.Child counterTotalRequireBufferFailed;
   public static Counter.Child counterTotalRequireBufferFailedForHugePartition;
   public static Counter.Child 
counterTotalRequireBufferFailedForRegularPartition;
@@ -164,6 +165,7 @@ public class ShuffleServerMetrics {
   public static Counter counterRemoteStorageFailedWrite;
   public static Counter counterRemoteStorageSuccessWrite;
   public static Counter counterTotalHadoopWriteDataSize;
+  public static Counter counterTotalLocalFileWriteDataSize;
 
   private static String tags;
   public static Counter counterLocalFileEventFlush;
@@ -268,7 +270,7 @@ public class ShuffleServerMetrics {
         metricsManager.addCounter(
             TOTAL_HADOOP_WRITE_DATA, Constants.METRICS_TAG_LABEL_NAME, 
STORAGE_HOST_LABEL);
     counterTotalLocalFileWriteDataSize =
-        metricsManager.addLabeledCounter(TOTAL_LOCALFILE_WRITE_DATA);
+        metricsManager.addCounter(TOTAL_LOCALFILE_WRITE_DATA, 
LOCAL_DISK_PATH_LABEL);
     counterTotalRequireBufferFailed = 
metricsManager.addLabeledCounter(TOTAL_REQUIRE_BUFFER_FAILED);
     counterTotalRequireBufferFailedForRegularPartition =
         
metricsManager.addLabeledCounter(TOTAL_REQUIRE_BUFFER_FAILED_FOR_REGULAR_PARTITION);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 635cc4c6..0752abcc 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -237,7 +237,14 @@ public class LocalStorageManager extends 
SingleStorageManager {
   @Override
   public void updateWriteMetrics(ShuffleDataFlushEvent event, long writeTime) {
     super.updateWriteMetrics(event, writeTime);
-    
ShuffleServerMetrics.counterTotalLocalFileWriteDataSize.inc(event.getSize());
+    ShuffleServerMetrics.counterTotalLocalFileWriteDataSize
+        .labels(ShuffleServerMetrics.LOCAL_DISK_PATH_LABEL_ALL)
+        .inc(event.getSize());
+    if (event.getUnderStorage() != null) {
+      ShuffleServerMetrics.counterTotalLocalFileWriteDataSize
+          .labels(event.getUnderStorage().getStoragePath())
+          .inc(event.getSize());
+    }
   }
 
   @Override
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 2df02033..b91e669e 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -283,7 +283,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase 
{
     // wait for write data
     waitForFlush(manager, appId, 1, 5);
 
-    validateLocalMetadata(storageManager, 160L);
+    validateLocalMetadata(storageManager, 0, 160L);
 
     ShuffleDataFlushEvent event12 = createShuffleDataFlushEvent(appId, 1, 1, 
1, null);
     manager.addToFlushQueue(event12);
@@ -291,7 +291,60 @@ public class ShuffleFlushManagerTest extends 
HadoopTestBase {
     // wait for write data
     waitForFlush(manager, appId, 1, 10);
 
-    validateLocalMetadata(storageManager, 320L);
+    validateLocalMetadata(storageManager, 0, 320L);
+  }
+
+  @Test
+  public void totalLocalFileWriteDataMetricTest() throws Exception {
+    List<String> storagePaths = Arrays.asList("/tmp/rss-data1", 
"/tmp/rss-data2", "/tmp/rss-data3");
+
+    shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
storagePaths);
+    shuffleServerConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
+    shuffleServerConf.setString(
+        ShuffleServerConf.RSS_STORAGE_TYPE.key(), 
StorageType.LOCALFILE.name());
+
+    String appId = "localMetricsTest_appId";
+    StorageManager storageManager =
+        
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+    ShuffleFlushManager manager =
+        new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, 
storageManager);
+
+    ShuffleDataFlushEvent flushEvent = createShuffleDataFlushEvent(appId, 1, 
1, 1, 10, 100, null);
+    manager.addToFlushQueue(flushEvent);
+    // wait for write data
+    waitForFlush(manager, appId, 1, 10);
+    int storageIndex = 
storagePaths.indexOf(flushEvent.getUnderStorage().getStoragePath());
+    validateLocalMetadata(storageManager, storageIndex, 1000L);
+
+    flushEvent = createShuffleDataFlushEvent(appId, 2, 1, 1, 10, 101, null);
+    manager.addToFlushQueue(flushEvent);
+    // wait for write data
+    waitForFlush(manager, appId, 2, 10);
+    int storageIndex1 = 
storagePaths.indexOf(flushEvent.getUnderStorage().getStoragePath());
+    validateLocalMetadata(storageManager, storageIndex1, 1010L);
+
+    flushEvent = createShuffleDataFlushEvent(appId, 3, 1, 1, 10, 102, null);
+    manager.addToFlushQueue(flushEvent);
+    // wait for write data
+    waitForFlush(manager, appId, 3, 10);
+    int storageIndex2 = 
storagePaths.indexOf(flushEvent.getUnderStorage().getStoragePath());
+    validateLocalMetadata(storageManager, storageIndex2, 1020L);
+
+    assertEquals(
+        1000L,
+        ShuffleServerMetrics.counterTotalLocalFileWriteDataSize
+            .labels(storagePaths.get(storageIndex))
+            .get());
+    assertEquals(
+        1010L,
+        ShuffleServerMetrics.counterTotalLocalFileWriteDataSize
+            .labels(storagePaths.get(storageIndex1))
+            .get());
+    assertEquals(
+        1020L,
+        ShuffleServerMetrics.counterTotalLocalFileWriteDataSize
+            .labels(storagePaths.get(storageIndex2))
+            .get());
   }
 
   @Test
@@ -515,6 +568,26 @@ public class ShuffleFlushManagerTest extends 
HadoopTestBase {
         null);
   }
 
+  public static ShuffleDataFlushEvent createShuffleDataFlushEvent(
+      String appId,
+      int shuffleId,
+      int startPartition,
+      int endPartition,
+      int blockNum,
+      int blockSize,
+      Supplier<Boolean> isValid) {
+    return new ShuffleDataFlushEvent(
+        ATOMIC_LONG.getAndIncrement(),
+        appId,
+        shuffleId,
+        startPartition,
+        endPartition,
+        (long) blockNum * blockSize,
+        createBlock(blockNum, blockSize),
+        isValid,
+        null);
+  }
+
   public static List<ShufflePartitionedBlock> createBlock(int num, int length) 
{
     List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
     for (int i = 0; i < num; i++) {
@@ -677,9 +750,10 @@ public class ShuffleFlushManagerTest extends 
HadoopTestBase {
     assertEquals(2, ShuffleServerMetrics.counterHadoopEventFlush.get());
   }
 
-  private void validateLocalMetadata(StorageManager storageManager, Long size) 
{
+  private void validateLocalMetadata(StorageManager storageManager, int 
storageIndex, Long size) {
     assertInstanceOf(LocalStorageManager.class, storageManager);
-    LocalStorage localStorage = ((LocalStorageManager) 
storageManager).getStorages().get(0);
+    LocalStorage localStorage =
+        ((LocalStorageManager) storageManager).getStorages().get(storageIndex);
     assertEquals(size, localStorage.getMetaData().getDiskSize().longValue());
   }
 }

Reply via email to