This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new cf8442036 [#1678] fix(server): disk size leak on removing resources by 
AppPurgeEvent (#1679) (#1689)
cf8442036 is described below

commit cf8442036709f7c73da44a4a888c1a0f9db55bd8
Author: Junfan Zhang <[email protected]>
AuthorDate: Sat May 11 10:20:40 2024 +0800

    [#1678] fix(server): disk size leak on removing resources by AppPurgeEvent 
(#1679) (#1689)
    
    ### What changes were proposed in this pull request?
    
    Descrease the disk size  that calculated by local storage self on removing 
resources with AppPurgeEvent
    
    ### Why are the changes needed?
    
    Fix: #1678
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit tests
---
 .../org/apache/uniffle/server/ShuffleTaskInfo.java |  4 +
 .../apache/uniffle/server/ShuffleTaskManager.java  |  5 +-
 .../uniffle/server/ShuffleTaskManagerTest.java     | 85 +++++++++++++++++-----
 3 files changed, 74 insertions(+), 20 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index e657ec4db..f45b1be94 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -194,6 +194,10 @@ public class ShuffleTaskInfo {
     }
   }
 
+  public Set<Integer> getShuffleIds() {
+    return partitionDataSizes.keySet();
+  }
+
   @Override
   public String toString() {
     return "ShuffleTaskInfo{"
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 11d654572..50ea004ad 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -18,6 +18,7 @@
 package org.apache.uniffle.server;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -771,7 +772,9 @@ public class ShuffleTaskManager {
       partitionsToBlockIds.remove(appId);
       shuffleBufferManager.removeBuffer(appId);
       shuffleFlushManager.removeResources(appId);
-      storageManager.removeResources(new AppPurgeEvent(appId, 
shuffleTaskInfo.getUser()));
+      storageManager.removeResources(
+          new AppPurgeEvent(
+              appId, shuffleTaskInfo.getUser(), new 
ArrayList<>(shuffleTaskInfo.getShuffleIds())));
       if (shuffleTaskInfo.hasHugePartition()) {
         ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec();
         ShuffleServerMetrics.gaugeHugePartitionNum.dec();
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 67410b3c0..75c49cd4d 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -104,6 +104,71 @@ public class ShuffleTaskManagerTest extends HadoopTestBase 
{
     ShuffleServerMetrics.clear();
   }
 
+  private ShuffleServerConf constructServerConfWithLocalfile() {
+    String confFile = ClassLoader.getSystemResource("server.conf").getFile();
+    ShuffleServerConf conf = new ShuffleServerConf(confFile);
+    conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
+    conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
+    conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
+    conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
+    conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 1000L);
+    conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 
50.0);
+    conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 
0.0);
+    conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
+    conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 100000L);
+    conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
+
+    conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), 
StorageType.LOCALFILE.name());
+    conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+    conf.setString(
+        ShuffleServerConf.RSS_STORAGE_BASE_PATH.key(),
+        tempDir1.getAbsolutePath() + "," + tempDir2.getAbsolutePath());
+    return conf;
+  }
+
+  /** Test the shuffleMeta's diskSize when app is removed. */
+  @Test
+  public void appPurgeWithLocalfileTest() throws Exception {
+    ShuffleServerConf conf = constructServerConfWithLocalfile();
+    shuffleServer = new ShuffleServer(conf);
+    ShuffleTaskManager shuffleTaskManager = 
shuffleServer.getShuffleTaskManager();
+
+    String appId = "removeShuffleDataWithLocalfileTest";
+
+    int shuffleNum = 4;
+    for (int i = 0; i < shuffleNum; i++) {
+      shuffleTaskManager.registerShuffle(
+          appId,
+          i,
+          Lists.newArrayList(new PartitionRange(0, 1)),
+          RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
+          StringUtils.EMPTY);
+
+      ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 
35);
+      shuffleTaskManager.requireBuffer(35);
+      shuffleTaskManager.cacheShuffleData(appId, i, false, partitionedData0);
+      shuffleTaskManager.updateCachedBlockIds(appId, i, 
partitionedData0.getBlockList());
+    }
+
+    assertEquals(1, shuffleTaskManager.getAppIds().size());
+    for (int i = 0; i < shuffleNum; i++) {
+      shuffleTaskManager.commitShuffle(appId, i);
+    }
+
+    shuffleTaskManager.removeResources(appId, false);
+    for (String path : conf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH)) {
+      String appPath = path + "/" + appId;
+      assertFalse(new File(appPath).exists());
+    }
+
+    // once the app is removed. the disk size should be 0
+    LocalStorageManager localStorageManager =
+        (LocalStorageManager) shuffleServer.getStorageManager();
+    for (LocalStorage localStorage : localStorageManager.getStorages()) {
+      assertEquals(0, localStorage.getMetaData().getDiskSize().get());
+    }
+  }
+
   @Test
   public void hugePartitionMemoryUsageLimitTest() throws Exception {
     String confFile = ClassLoader.getSystemResource("server.conf").getFile();
@@ -479,25 +544,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase 
{
 
   @Test
   public void removeShuffleDataWithLocalfileTest() throws Exception {
-    String confFile = ClassLoader.getSystemResource("server.conf").getFile();
-    ShuffleServerConf conf = new ShuffleServerConf(confFile);
-    conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
-    conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
-    conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
-    conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
-    conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 1000L);
-    conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 
50.0);
-    conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 
0.0);
-    conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
-    conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 100000L);
-    conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
-
-    conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), "LOCALFILE");
-    conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
-    conf.setString(
-        ShuffleServerConf.RSS_STORAGE_BASE_PATH.key(),
-        tempDir1.getAbsolutePath() + "," + tempDir2.getAbsolutePath());
-
+    ShuffleServerConf conf = constructServerConfWithLocalfile();
     shuffleServer = new ShuffleServer(conf);
     ShuffleTaskManager shuffleTaskManager = 
shuffleServer.getShuffleTaskManager();
 

Reply via email to