This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 576a925a2 [#1501] fix(server): storage selection cache accidentally 
deleted when clearing stage level data. (#1505)
576a925a2 is described below

commit 576a925a21576467ab8713ca1d4b569d0a0af5c8
Author: dingshun3016 <[email protected]>
AuthorDate: Sun Feb 4 16:54:18 2024 +0800

    [#1501] fix(server): storage selection cache accidentally deleted when 
clearing stage level data. (#1505)
    
    ### What changes were proposed in this pull request?
    
    avoid storage selection cache accidentally deleted
    
    ### Why are the changes needed?
    
    Fix: (#1501)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit tests
---
 .../uniffle/server/storage/LocalStorageManager.java     |  4 ++--
 .../apache/uniffle/server/ShuffleFlushManagerTest.java  | 17 ++++++++++++++++-
 2 files changed, 18 insertions(+), 3 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 229b309a1..772aae1d0 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -309,12 +309,12 @@ public class LocalStorageManager extends 
SingleStorageManager {
     Function<String, Boolean> deleteConditionFunc = null;
     String prefixKey = null;
     if (event instanceof AppPurgeEvent) {
-      prefixKey = UnionKey.buildKey(event.getAppId());
+      prefixKey = UnionKey.buildKey(event.getAppId(), "");
       deleteConditionFunc =
           partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, 
event.getAppId());
     } else if (event instanceof ShufflePurgeEvent) {
       int shuffleId = event.getShuffleIds().get(0);
-      prefixKey = UnionKey.buildKey(event.getAppId(), shuffleId);
+      prefixKey = UnionKey.buildKey(event.getAppId(), shuffleId, "");
       deleteConditionFunc =
           partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, 
event.getAppId(), shuffleId);
     }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index f2b0b3048..c17087073 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -72,6 +72,7 @@ import org.apache.uniffle.storage.util.StorageType;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -454,7 +455,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase 
{
   @Test
   public void clearLocalTest(@TempDir File tempDir) throws Exception {
     final String appId1 = "clearLocalTest_appId1";
-    final String appId2 = "clearLocalTest_appId2";
+    final String appId2 = "clearLocalTest_appId12";
     ShuffleServerConf serverConf = new ShuffleServerConf();
     serverConf.set(
         ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
Arrays.asList(tempDir.getAbsolutePath()));
@@ -470,14 +471,18 @@ public class ShuffleFlushManagerTest extends 
HadoopTestBase {
     manager.addToFlushQueue(event2);
     ShuffleDataFlushEvent event3 = createShuffleDataFlushEvent(appId2, 2, 0, 
1, null);
     manager.addToFlushQueue(event3);
+    ShuffleDataFlushEvent event5 = createShuffleDataFlushEvent(appId2, 11, 0, 
1, null);
+    manager.addToFlushQueue(event5);
     assertEquals(storageManager.selectStorage(event1), 
storageManager.selectStorage(event2));
     final AbstractStorage storage = (AbstractStorage) 
storageManager.selectStorage(event1);
     waitForFlush(manager, appId1, 1, 5);
     waitForFlush(manager, appId2, 1, 5);
     waitForFlush(manager, appId2, 2, 5);
+    waitForFlush(manager, appId2, 11, 5);
     assertEquals(5, manager.getCommittedBlockIds(appId1, 
1).getLongCardinality());
     assertEquals(5, manager.getCommittedBlockIds(appId2, 
1).getLongCardinality());
     assertEquals(5, manager.getCommittedBlockIds(appId2, 
2).getLongCardinality());
+    assertEquals(5, manager.getCommittedBlockIds(appId2, 
11).getLongCardinality());
     assertEquals(2, storage.getHandlerSize());
     File file = new File(tempDir, appId1);
     assertTrue(file.exists());
@@ -490,6 +495,10 @@ public class ShuffleFlushManagerTest extends 
HadoopTestBase {
         new AppPurgeEvent(appId1, StringUtils.EMPTY, Lists.newArrayList(1)));
     manager.removeResources(appId1);
     assertFalse(file.exists());
+
+    ShuffleDataReadEvent shuffleReadEvent = new ShuffleDataReadEvent(appId2, 
1, 0, 0);
+    assertNotNull(storageManager.selectStorage(shuffleReadEvent));
+
     assertEquals(0, manager.getCommittedBlockIds(appId1, 
1).getLongCardinality());
     assertEquals(5, manager.getCommittedBlockIds(appId2, 
1).getLongCardinality());
     assertEquals(5, manager.getCommittedBlockIds(appId2, 
2).getLongCardinality());
@@ -497,6 +506,12 @@ public class ShuffleFlushManagerTest extends 
HadoopTestBase {
     manager.removeResources(appId2);
     storageManager.removeResources(
         new ShufflePurgeEvent(appId2, StringUtils.EMPTY, 
Lists.newArrayList(1)));
+
+    ShuffleDataReadEvent shuffle1ReadEvent = new ShuffleDataReadEvent(appId2, 
1, 0, 0);
+    ShuffleDataReadEvent shuffle11ReadEvent = new ShuffleDataReadEvent(appId2, 
11, 0, 0);
+    assertNull(storageManager.selectStorage(shuffle1ReadEvent));
+    assertNotNull(storageManager.selectStorage(shuffle11ReadEvent));
+
     storageManager.removeResources(
         new ShufflePurgeEvent(appId2, StringUtils.EMPTY, 
Lists.newArrayList(2)));
     storageManager.removeResources(

Reply via email to