This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch branch-0.7
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/branch-0.7 by this push:
     new 13391213 [#881][0.7] fix(followup): Ensure LocalStorageMeta disk size 
is correctly updated when events are processed for 0.7.0 (#914)
13391213 is described below

commit 133912133687a7f9be10d195fa0c08c4a41607fb
Author: awdavidson <[email protected]>
AuthorDate: Tue Jun 6 16:39:35 2023 +0100

    [#881][0.7] fix(followup): Ensure LocalStorageMeta disk size is correctly 
updated when events are processed for 0.7.0 (#914)
    
    What changes were proposed in this pull request?
    Ensure all events are marked as understorage, this will result to the 
LocalStorageMeta being updated when events are processed.
    
    Why are the changes needed?
    Currently LocalStorageMeta is only update with metrics from the first event 
in a given shuffleId and partitionId, the first event updates metrics because 
there is no entry in partitionsOfStorage and the event get marked as 
underStorage, however, for future events in the same shuffleId and partitionId 
selectStorage returns the storage and does not mark the event as underStorage 
so when updateWriteMetrics is called, event.getUnderStorage() returns null and 
storage.updateWriteMetrics(met [...]
    
    As metrics are not updated correctly, LocalStorage.canWrite will not return 
the correct result.
    
    Does this PR introduce any user-facing change?
    No.
    
    How was this patch tested?
    Added unit test which covers multi events for the same shuffleId and 
partitionId
---
 .../server/storage/LocalStorageManager.java        |  5 ++-
 .../uniffle/server/ShuffleFlushManagerTest.java    | 36 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 151038fd..be68b510 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -179,6 +179,9 @@ public class LocalStorageManager extends 
SingleStorageManager {
               storage.getBasePath(), event);
         }
       } else {
+        if (event.getUnderStorage() == null) {
+          event.setUnderStorage(storage);
+        }
         return storage;
       }
     }
@@ -204,7 +207,7 @@ public class LocalStorageManager extends 
SingleStorageManager {
         (key, localStorage) -> {
           // If this is the first time to select storage or existing storage 
is corrupted,
           // we should refresh the cache.
-          if (localStorage == null || localStorage.isCorrupted()) {
+          if (localStorage == null || localStorage.isCorrupted() || 
event.getUnderStorage() == null) {
             event.setUnderStorage(selectedStorage);
             return selectedStorage;
           }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index ea0e236e..a316a0df 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -55,6 +55,7 @@ import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
 import org.apache.uniffle.server.event.AppPurgeEvent;
 import org.apache.uniffle.server.storage.HdfsStorageManager;
+import org.apache.uniffle.server.storage.LocalStorageManager;
 import org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy;
 import org.apache.uniffle.server.storage.MultiStorageManager;
 import org.apache.uniffle.server.storage.StorageManager;
@@ -68,6 +69,7 @@ import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -193,6 +195,34 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     waitForQueueClear(manager);
     waitForMetrics(ShuffleServerMetrics.gaugeWriteHandler, 0, 0.5);
   }
+  
+  @Test
+  public void localMetricsTest(@TempDir File tempDir) throws Exception {
+    shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
Arrays.asList(tempDir.getAbsolutePath()));
+    shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.MEMORY_LOCALFILE.name());
+
+    String appId = "localMetricsTest_appId";
+    StorageManager storageManager =
+            
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+    ShuffleFlushManager manager =
+            new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", 
mockShuffleServer, storageManager);
+    ShuffleDataFlushEvent event1 =
+            createShuffleDataFlushEvent(appId, 1, 1, 1, null);
+    manager.addToFlushQueue(event1);
+    // wait for write data
+    waitForFlush(manager, appId, 1, 5);
+
+    validateLocalMetadata(storageManager, 160L);
+
+    ShuffleDataFlushEvent event12 =
+            createShuffleDataFlushEvent(appId, 1, 1, 1, null);
+    manager.addToFlushQueue(event12);
+
+    // wait for write data
+    waitForFlush(manager, appId, 1, 10);
+
+    validateLocalMetadata(storageManager, 320L);
+  }
 
   @Test
   public void complexWriteTest() throws Exception {
@@ -554,4 +584,10 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
       fail();
     }
   }
+  
+  private void validateLocalMetadata(StorageManager storageManager, Long size) 
{
+    assertInstanceOf(LocalStorageManager.class, storageManager);
+    LocalStorage localStorage = ((LocalStorageManager) 
storageManager).getStorages().get(0);
+    assertEquals(size, localStorage.getMetaData().getDiskSize().longValue());
+  }
 }

Reply via email to