This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new ae7283161 [#1678] fix(server): disk size leak on removing resources by
AppPurgeEvent (#1679)
ae7283161 is described below
commit ae72831615b05b56055dcfda8f129189792130ba
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri May 10 10:54:15 2024 +0800
[#1678] fix(server): disk size leak on removing resources by AppPurgeEvent
(#1679)
### What changes were proposed in this pull request?
Descrease the disk size that calculated by local storage self on removing
resources with AppPurgeEvent
### Why are the changes needed?
Fix: #1678
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests
---
.../org/apache/uniffle/server/ShuffleTaskInfo.java | 4 +
.../apache/uniffle/server/ShuffleTaskManager.java | 5 +-
.../uniffle/server/ShuffleTaskManagerTest.java | 85 +++++++++++++++++-----
3 files changed, 74 insertions(+), 20 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index e657ec4db..f45b1be94 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -194,6 +194,10 @@ public class ShuffleTaskInfo {
}
}
+ public Set<Integer> getShuffleIds() {
+ return partitionDataSizes.keySet();
+ }
+
@Override
public String toString() {
return "ShuffleTaskInfo{"
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 11d654572..50ea004ad 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.server;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@@ -771,7 +772,9 @@ public class ShuffleTaskManager {
partitionsToBlockIds.remove(appId);
shuffleBufferManager.removeBuffer(appId);
shuffleFlushManager.removeResources(appId);
- storageManager.removeResources(new AppPurgeEvent(appId,
shuffleTaskInfo.getUser()));
+ storageManager.removeResources(
+ new AppPurgeEvent(
+ appId, shuffleTaskInfo.getUser(), new
ArrayList<>(shuffleTaskInfo.getShuffleIds())));
if (shuffleTaskInfo.hasHugePartition()) {
ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec();
ShuffleServerMetrics.gaugeHugePartitionNum.dec();
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 67410b3c0..75c49cd4d 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -104,6 +104,71 @@ public class ShuffleTaskManagerTest extends HadoopTestBase
{
ShuffleServerMetrics.clear();
}
+ private ShuffleServerConf constructServerConfWithLocalfile() {
+ String confFile = ClassLoader.getSystemResource("server.conf").getFile();
+ ShuffleServerConf conf = new ShuffleServerConf(confFile);
+ conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
+ conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
+ conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
+ conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
+ conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 1000L);
+ conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
50.0);
+ conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
0.0);
+ conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
+ conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 100000L);
+ conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
+
+ conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(),
StorageType.LOCALFILE.name());
+ conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+ conf.setString(
+ ShuffleServerConf.RSS_STORAGE_BASE_PATH.key(),
+ tempDir1.getAbsolutePath() + "," + tempDir2.getAbsolutePath());
+ return conf;
+ }
+
+ /** Test the shuffleMeta's diskSize when app is removed. */
+ @Test
+ public void appPurgeWithLocalfileTest() throws Exception {
+ ShuffleServerConf conf = constructServerConfWithLocalfile();
+ shuffleServer = new ShuffleServer(conf);
+ ShuffleTaskManager shuffleTaskManager =
shuffleServer.getShuffleTaskManager();
+
+ String appId = "removeShuffleDataWithLocalfileTest";
+
+ int shuffleNum = 4;
+ for (int i = 0; i < shuffleNum; i++) {
+ shuffleTaskManager.registerShuffle(
+ appId,
+ i,
+ Lists.newArrayList(new PartitionRange(0, 1)),
+ RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
+ StringUtils.EMPTY);
+
+ ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1,
35);
+ shuffleTaskManager.requireBuffer(35);
+ shuffleTaskManager.cacheShuffleData(appId, i, false, partitionedData0);
+ shuffleTaskManager.updateCachedBlockIds(appId, i,
partitionedData0.getBlockList());
+ }
+
+ assertEquals(1, shuffleTaskManager.getAppIds().size());
+ for (int i = 0; i < shuffleNum; i++) {
+ shuffleTaskManager.commitShuffle(appId, i);
+ }
+
+ shuffleTaskManager.removeResources(appId, false);
+ for (String path : conf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH)) {
+ String appPath = path + "/" + appId;
+ assertFalse(new File(appPath).exists());
+ }
+
+ // once the app is removed. the disk size should be 0
+ LocalStorageManager localStorageManager =
+ (LocalStorageManager) shuffleServer.getStorageManager();
+ for (LocalStorage localStorage : localStorageManager.getStorages()) {
+ assertEquals(0, localStorage.getMetaData().getDiskSize().get());
+ }
+ }
+
@Test
public void hugePartitionMemoryUsageLimitTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
@@ -479,25 +544,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase
{
@Test
public void removeShuffleDataWithLocalfileTest() throws Exception {
- String confFile = ClassLoader.getSystemResource("server.conf").getFile();
- ShuffleServerConf conf = new ShuffleServerConf(confFile);
- conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
- conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
- conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
- conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
- conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 1000L);
- conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
50.0);
- conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
0.0);
- conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
- conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 100000L);
- conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
-
- conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), "LOCALFILE");
- conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
- conf.setString(
- ShuffleServerConf.RSS_STORAGE_BASE_PATH.key(),
- tempDir1.getAbsolutePath() + "," + tempDir2.getAbsolutePath());
-
+ ShuffleServerConf conf = constructServerConfWithLocalfile();
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager =
shuffleServer.getShuffleTaskManager();