This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ee68201 Introduce data cleanup mechanism on stage level (#249)
1ee68201 is described below

commit 1ee68201c1e2e8ade2b728831fc996ba9d8c0316
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Oct 12 14:36:02 2022 +0800

    Introduce data cleanup mechanism on stage level (#249)
    
    ### What changes were proposed in this pull request?
    Introduce data cleanup mechanism on stage level
    
    ### Why are the changes needed?
    This PR is to optimize the disk capacity. For example
    1. For some spark ML jobs, it will run multiple stages and reserve large 
unused shuffle data in shuffle-servers.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UTs
---
 .../hadoop/mapred/SortWriteBufferManagerTest.java  |   5 +
 .../hadoop/mapreduce/task/reduce/FetcherTest.java  |   5 +
 .../org/apache/spark/shuffle/RssSparkConfig.java   |   8 ++
 .../apache/spark/shuffle/RssShuffleManager.java    |  11 +-
 .../apache/spark/shuffle/RssShuffleManager.java    |  36 +++++-
 .../uniffle/client/api/ShuffleWriteClient.java     |   2 +
 .../client/factory/ShuffleClientFactory.java       |  27 +++-
 .../client/impl/ShuffleWriteClientImpl.java        |  57 ++++++++-
 .../client/impl/ShuffleWriteClientImplTest.java    |   2 +-
 docs/client_guide.md                               |   2 +
 .../uniffle/test/AssignmentWithTagsTest.java       |   2 +-
 .../uniffle/test/CoordinatorAssignmentTest.java    |   4 +-
 .../java/org/apache/uniffle/test/QuorumTest.java   |   2 +-
 .../apache/uniffle/test/ShuffleServerGrpcTest.java |   2 +-
 .../uniffle/test/ShuffleWithRssClientTest.java     |   2 +-
 .../test/ShuffleUnregisterWithHdfsTest.java        |  98 ++++++++++++++
 .../test/ShuffleUnregisterWithLocalfileTest.java   | 100 +++++++++++++++
 .../uniffle/client/api/ShuffleServerClient.java    |   4 +
 .../client/impl/grpc/ShuffleServerGrpcClient.java  |  32 +++++
 .../request/RssUnregisterShuffleRequest.java       |  36 ++++++
 .../response/RssUnregisterShuffleResponse.java     |  26 ++++
 proto/src/main/proto/Rss.proto                     |  11 ++
 .../apache/uniffle/server/ShuffleFlushManager.java |   6 +
 .../uniffle/server/ShuffleServerGrpcService.java   |  23 ++++
 .../apache/uniffle/server/ShuffleTaskManager.java  |  69 +++++++++-
 .../server/buffer/ShuffleBufferManager.java        |  49 ++++---
 .../apache/uniffle/server/event/AppPurgeEvent.java |  31 +++++
 .../apache/uniffle/server/event/PurgeEvent.java    |  55 ++++++++
 .../uniffle/server/event/ShufflePurgeEvent.java    |  27 ++++
 .../uniffle/server/storage/HdfsStorageManager.java |  32 ++++-
 .../server/storage/LocalStorageManager.java        |  34 ++++-
 .../server/storage/MultiStorageManager.java        |  10 +-
 .../uniffle/server/storage/StorageManager.java     |   5 +-
 .../ShuffleFlushManagerOnKerberizedHdfsTest.java   |  11 +-
 .../uniffle/server/ShuffleFlushManagerTest.java    |  21 ++-
 .../uniffle/server/ShuffleTaskManagerTest.java     | 141 +++++++++++++++++++++
 .../spark-2.4.6_dynamic_allocation_support.patch   |  15 ---
 .../spark-3.1.2_dynamic_allocation_support.patch   |  17 ---
 .../spark-3.2.1_dynamic_allocation_support.patch   |  17 ---
 .../handler/impl/HdfsShuffleDeleteHandler.java     |  51 ++++----
 .../handler/impl/LocalFileDeleteHandler.java       |   7 +-
 41 files changed, 953 insertions(+), 142 deletions(-)

diff --git 
a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
 
b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
index 97c58472..029b1e0e 100644
--- 
a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
+++ 
b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
@@ -336,5 +336,10 @@ public class SortWriteBufferManagerTest {
     public void close() {
 
     }
+
+    @Override
+    public void unregisterShuffle(String appId, int shuffleId) {
+
+    }
   }
 }
diff --git 
a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
 
b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
index 5f9713b2..ec630e24 100644
--- 
a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
+++ 
b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
@@ -426,6 +426,11 @@ public class FetcherTest {
     public void close() {
 
     }
+
+    @Override
+    public void unregisterShuffle(String appId, int shuffleId) {
+
+    }
   }
 
   static class MockedShuffleReadClient implements ShuffleReadClient {
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 5a46da3e..5f39eb5d 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -112,6 +112,14 @@ public class RssSparkConfig {
           .doc("The max data size sent to shuffle server"))
       .createWithDefault("16m");
 
+  public static final ConfigEntry<Integer> 
RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE = createIntegerBuilder(
+      new ConfigBuilder("spark.rss.client.unregister.thread.pool.size"))
+      .createWithDefault(10);
+
+  public static final ConfigEntry<Integer> 
RSS_CLIENT_UNREGISTER_REQUEST_TIMEOUT_SEC = createIntegerBuilder(
+      new ConfigBuilder("spark.rss.client.unregister.request.timeout.sec"))
+      .createWithDefault(10);
+
   // When the size of read buffer reaches the half of JVM region (i.e., 32m),
   // it will incur humongous allocation, so we set it to 14m.
   public static final ConfigEntry<String> RSS_CLIENT_READ_BUFFER_SIZE = 
createStringBuilder(
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index cdd58a27..625f8e83 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -160,11 +160,13 @@ public class RssShuffleManager implements ShuffleManager {
     long retryIntervalMax = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX);
     int heartBeatThreadNum = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM);
     this.dataCommitPoolSize = 
sparkConf.get(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE);
+    int unregisterThreadPoolSize = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE);
+    int unregisterRequestTimeoutSec = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_REQUEST_TIMEOUT_SEC);
     this.shuffleWriteClient = ShuffleClientFactory
         .getInstance()
         .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, 
heartBeatThreadNum,
             dataReplica, dataReplicaWrite, dataReplicaRead, 
dataReplicaSkipEnabled, dataTransferPoolSize,
-            dataCommitPoolSize);
+            dataCommitPoolSize, unregisterThreadPoolSize, 
unregisterRequestTimeoutSec);
     registerCoordinator();
     // fetch client conf and apply them if necessary and disable ESS
     if (isDriver && dynamicConfEnabled) {
@@ -370,6 +372,13 @@ public class RssShuffleManager implements ShuffleManager {
 
   @Override
   public boolean unregisterShuffle(int shuffleId) {
+    try {
+      if (SparkEnv.get().executorId().equals("driver")) {
+        shuffleWriteClient.unregisterShuffle(appId, shuffleId);
+      }
+    } catch (Exception e) {
+      LOG.warn("Errors on unregister to remote shuffle-servers", e);
+    }
     return true;
   }
 
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 6f04ccff..96c93ef7 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -160,12 +160,13 @@ public class RssShuffleManager implements ShuffleManager {
     int heartBeatThreadNum = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM);
     this.dataTransferPoolSize = 
sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE);
     this.dataCommitPoolSize = 
sparkConf.get(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE);
-
+    int unregisterThreadPoolSize = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE);
+    int unregisterRequestTimeoutSec = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_REQUEST_TIMEOUT_SEC);
     shuffleWriteClient = ShuffleClientFactory
         .getInstance()
         .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, 
heartBeatThreadNum,
             dataReplica, dataReplicaWrite, dataReplicaRead, 
dataReplicaSkipEnabled, dataTransferPoolSize,
-            dataCommitPoolSize);
+            dataCommitPoolSize, unregisterThreadPoolSize, 
unregisterRequestTimeoutSec);
     registerCoordinator();
     // fetch client conf and apply them if necessary and disable ESS
     if (isDriver && dynamicConfEnabled) {
@@ -220,12 +221,24 @@ public class RssShuffleManager implements ShuffleManager {
     int heartBeatThreadNum = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM);
     this.dataTransferPoolSize = 
sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE);
     this.dataCommitPoolSize = 
sparkConf.get(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE);
-
+    int unregisterThreadPoolSize = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE);
+    int unregisterRequestTimeoutSec = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_REQUEST_TIMEOUT_SEC);
     shuffleWriteClient = ShuffleClientFactory
         .getInstance()
-        .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, 
heartBeatThreadNum,
-            dataReplica, dataReplicaWrite, dataReplicaRead, 
dataReplicaSkipEnabled, dataTransferPoolSize,
-            dataCommitPoolSize);
+        .createShuffleWriteClient(
+            clientType,
+            retryMax,
+            retryIntervalMax,
+            heartBeatThreadNum,
+            dataReplica,
+            dataReplicaWrite,
+            dataReplicaRead,
+            dataReplicaSkipEnabled,
+            dataTransferPoolSize,
+            dataCommitPoolSize,
+            unregisterThreadPoolSize,
+            unregisterRequestTimeoutSec
+        );
     this.taskToSuccessBlockIds = taskToSuccessBlockIds;
     this.taskToFailedBlockIds = taskToFailedBlockIds;
     if (loop != null) {
@@ -551,6 +564,13 @@ public class RssShuffleManager implements ShuffleManager {
 
   @Override
   public boolean unregisterShuffle(int shuffleId) {
+    try {
+      if (SparkEnv.get().executorId().equals("driver")) {
+        shuffleWriteClient.unregisterShuffle(id.get(), shuffleId);
+      }
+    } catch (Exception e) {
+      LOG.warn("Errors on unregister to remote shuffle-servers", e);
+    }
     return true;
   }
 
@@ -712,4 +732,8 @@ public class RssShuffleManager implements ShuffleManager {
   public void setRemoteStorage(RemoteStorageInfo remoteStorage) {
     this.remoteStorage = remoteStorage;
   }
+
+  public String getId() {
+    return id.get();
+  }
 }
diff --git 
a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java 
b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
index f507a96a..39df0029 100644
--- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
+++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
@@ -69,4 +69,6 @@ public interface ShuffleWriteClient {
       Map<ShuffleServerInfo, Set<Integer>> serverToPartitions, String appId, 
int shuffleId);
 
   void close();
+
+  void unregisterShuffle(String appId, int shuffleId);
 }
diff --git 
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
 
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
index d11a07f1..652f9923 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
@@ -34,12 +34,35 @@ public class ShuffleClientFactory {
     return INSTANCE;
   }
 
+  /**
+   * Only for MR engine, which wont used to unregister to remote 
shuffle-servers
+   */
   public ShuffleWriteClient createShuffleWriteClient(
       String clientType, int retryMax, long retryIntervalMax, int 
heartBeatThreadNum,
       int replica, int replicaWrite, int replicaRead, boolean 
replicaSkipEnabled, int dataTransferPoolSize,
       int dataCommitPoolSize) {
-    return new ShuffleWriteClientImpl(clientType, retryMax, retryIntervalMax, 
heartBeatThreadNum,
-      replica, replicaWrite, replicaRead, replicaSkipEnabled, 
dataTransferPoolSize, dataCommitPoolSize);
+    return createShuffleWriteClient(clientType, retryMax, retryIntervalMax, 
heartBeatThreadNum, replica,
+        replicaWrite, replicaRead, replicaSkipEnabled, dataTransferPoolSize, 
dataCommitPoolSize, 10, 10);
+  }
+
+  public ShuffleWriteClient createShuffleWriteClient(
+      String clientType, int retryMax, long retryIntervalMax, int 
heartBeatThreadNum,
+      int replica, int replicaWrite, int replicaRead, boolean 
replicaSkipEnabled, int dataTransferPoolSize,
+      int dataCommitPoolSize, int unregisterThreadPoolSize, int 
unregisterRequestTimeoutSec) {
+    return new ShuffleWriteClientImpl(
+        clientType,
+        retryMax,
+        retryIntervalMax,
+        heartBeatThreadNum,
+        replica,
+        replicaWrite,
+        replicaRead,
+        replicaSkipEnabled,
+        dataTransferPoolSize,
+        dataCommitPoolSize,
+        unregisterThreadPoolSize,
+        unregisterRequestTimeoutSec
+    );
   }
 
   public ShuffleReadClient 
createShuffleReadClient(CreateShuffleReadClientRequest request) {
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 7be84d4d..e8372d8e 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -54,6 +54,7 @@ import 
org.apache.uniffle.client.request.RssRegisterShuffleRequest;
 import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
 import org.apache.uniffle.client.request.RssSendCommitRequest;
 import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
 import org.apache.uniffle.client.response.ClientResponse;
 import org.apache.uniffle.client.response.ResponseStatusCode;
 import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
@@ -66,6 +67,7 @@ import 
org.apache.uniffle.client.response.RssRegisterShuffleResponse;
 import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
 import org.apache.uniffle.client.response.RssSendCommitResponse;
 import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
 import org.apache.uniffle.client.response.SendShuffleDataResult;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
@@ -78,6 +80,7 @@ import org.apache.uniffle.common.util.ThreadUtils;
 public class ShuffleWriteClientImpl implements ShuffleWriteClient {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleWriteClientImpl.class);
+
   private String clientType;
   private int retryMax;
   private long retryIntervalMax;
@@ -89,9 +92,10 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
   private int replicaWrite;
   private int replicaRead;
   private boolean replicaSkipEnabled;
-  private int dataTranferPoolSize;
   private int dataCommitPoolSize = -1;
   private final ForkJoinPool dataTransferPool;
+  private final int unregisterThreadPoolSize;
+  private final int unregisterRequestTimeSec;
 
   public ShuffleWriteClientImpl(
       String clientType,
@@ -103,7 +107,9 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
       int replicaRead,
       boolean replicaSkipEnabled,
       int dataTranferPoolSize,
-      int dataCommitPoolSize) {
+      int dataCommitPoolSize,
+      int unregisterThreadPoolSize,
+      int unregisterRequestTimeSec) {
     this.clientType = clientType;
     this.retryMax = retryMax;
     this.retryIntervalMax = retryIntervalMax;
@@ -114,9 +120,10 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
     this.replicaWrite = replicaWrite;
     this.replicaRead = replicaRead;
     this.replicaSkipEnabled = replicaSkipEnabled;
-    this.dataTranferPoolSize = dataTranferPoolSize;
     this.dataTransferPool = new ForkJoinPool(dataTranferPoolSize);
     this.dataCommitPoolSize = dataCommitPoolSize;
+    this.unregisterThreadPoolSize = unregisterThreadPoolSize;
+    this.unregisterRequestTimeSec = unregisterRequestTimeSec;
   }
 
   private boolean sendShuffleDataAsync(
@@ -591,6 +598,50 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
     dataTransferPool.shutdownNow();
   }
 
+  @Override
+  public void unregisterShuffle(String appId, int shuffleId) {
+    RssUnregisterShuffleRequest request = new 
RssUnregisterShuffleRequest(appId, shuffleId);
+    List<Callable<Void>> callableList = Lists.newArrayList();
+
+    shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+          callableList.add(() -> {
+            try {
+              ShuffleServerClient client =
+                  
ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, 
shuffleServerInfo);
+              RssUnregisterShuffleResponse response = 
client.unregisterShuffle(request);
+              if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                LOG.warn("Failed to unregister shuffle to " + 
shuffleServerInfo);
+              }
+            } catch (Exception e) {
+              LOG.warn("Error happened when unregistering to " + 
shuffleServerInfo, e);
+            }
+            return null;
+          });
+        }
+    );
+
+    ExecutorService executorService = null;
+    try {
+      executorService =
+          Executors.newFixedThreadPool(
+              Math.min(unregisterThreadPoolSize, shuffleServerInfoSet.size()),
+              ThreadUtils.getThreadFactory("unregister-shuffle-%d")
+          );
+      List<Future<Void>> futures = executorService.invokeAll(callableList, 
unregisterRequestTimeSec, TimeUnit.SECONDS);
+      for (Future<Void> future : futures) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
+    } catch (InterruptedException ie) {
+      LOG.warn("Unregister shuffle is interrupted", ie);
+    } finally {
+      if (executorService != null) {
+        executorService.shutdownNow();
+      }
+    }
+  }
+
   private void throwExceptionIfNecessary(ClientResponse response, String 
errorMsg) {
     if (response != null && response.getStatusCode() != 
ResponseStatusCode.SUCCESS) {
       LOG.error(errorMsg);
diff --git 
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
 
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
index 79661e76..71fdc637 100644
--- 
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
+++ 
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
@@ -41,7 +41,7 @@ public class ShuffleWriteClientImplTest {
   @Test
   public void testSendData() {
     ShuffleWriteClientImpl shuffleWriteClient =
-        new ShuffleWriteClientImpl("GRPC", 3, 2000, 4, 1, 1, 1, true, 1, 1);
+        new ShuffleWriteClientImpl("GRPC", 3, 2000, 4, 1, 1, 1, true, 1, 1, 
10, 10);
     ShuffleServerClient mockShuffleServerClient = 
mock(ShuffleServerClient.class);
     ShuffleWriteClientImpl spyClient = Mockito.spy(shuffleWriteClient);
     
doReturn(mockShuffleServerClient).when(spyClient).getShuffleServerClient(any());
diff --git a/docs/client_guide.md b/docs/client_guide.md
index b4c12193..9b5a208a 100644
--- a/docs/client_guide.md
+++ b/docs/client_guide.md
@@ -157,6 +157,8 @@ The important configuration is listed as following.
 |---|---|---|
 |spark.rss.writer.buffer.spill.size|128m|Buffer size for total partition data|
 |spark.rss.client.send.size.limit|16m|The max data size sent to shuffle server|
+|spark.rss.client.unregister.thread.pool.size|10|The max size of thread pool 
of unregistering|
+|spark.rss.client.unregister.request.timeout.sec|10|The max timeout sec when 
doing unregister to remote shuffle-servers|
 
 
 ### MapReduce Specialized Setting
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
index c19a911f..2688704e 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
@@ -148,7 +148,7 @@ public class AssignmentWithTagsTest extends 
CoordinatorTestBase {
   @Test
   public void testTags() throws Exception {
     ShuffleWriteClientImpl shuffleWriteClient = new 
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
-        1, 1, 1, true, 1, 1);
+        1, 1, 1, true, 1, 1, 10, 10);
     shuffleWriteClient.registerCoordinators(COORDINATOR_QUORUM);
 
     // Case1 : only set the single default shuffle version tag
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
index c45c2e19..455bd69f 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
@@ -87,7 +87,7 @@ public class CoordinatorAssignmentTest extends 
CoordinatorTestBase {
   @Test
   public void testSilentPeriod() throws Exception {
     ShuffleWriteClientImpl shuffleWriteClient = new 
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
-        1, 1, 1, true, 1, 1);
+        1, 1, 1, true, 1, 1, 10, 10);
     shuffleWriteClient.registerCoordinators(QUORUM);
 
     // Case1: Disable silent period
@@ -112,7 +112,7 @@ public class CoordinatorAssignmentTest extends 
CoordinatorTestBase {
   @Test
   public void testAssignmentServerNodesNumber() throws Exception {
     ShuffleWriteClientImpl shuffleWriteClient = new 
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
-        1, 1, 1, true, 1, 1);
+        1, 1, 1, true, 1, 1, 10, 10);
     shuffleWriteClient.registerCoordinators(COORDINATOR_QUORUM);
 
     /**
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java 
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index 913f7726..2ea3d5c0 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -266,7 +266,7 @@ public class QuorumTest extends ShuffleReadWriteBase {
       int replica, int replicaWrite, int replicaRead, boolean replicaSkip) {
 
     shuffleWriteClientImpl = new 
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
-      replica, replicaWrite, replicaRead, replicaSkip, 1, 1);
+      replica, replicaWrite, replicaRead, replicaSkip, 1, 1, 10, 10);
 
     List<ShuffleServerInfo> allServers = 
Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1,
         shuffleServerInfo2, shuffleServerInfo3, shuffleServerInfo4);
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index 280de66a..66eb4513 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -105,7 +105,7 @@ public class ShuffleServerGrpcTest extends 
IntegrationTestBase {
   public void clearResourceTest() throws Exception {
     final ShuffleWriteClient shuffleWriteClient =
         ShuffleClientFactory.getInstance().createShuffleWriteClient(
-            "GRPC", 2, 10000L, 4, 1, 1, 1, true, 1, 1);
+            "GRPC", 2, 10000L, 4, 1, 1, 1, true, 1, 1, 10, 10);
     shuffleWriteClient.registerCoordinators("127.0.0.1:19999");
     shuffleWriteClient.registerShuffle(
         new ShuffleServerInfo("127.0.0.1-20001", "127.0.0.1", 20001),
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 86e78cff..8df19926 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -93,7 +93,7 @@ public class ShuffleWithRssClientTest extends 
ShuffleReadWriteBase {
   @BeforeEach
   public void createClient() {
     shuffleWriteClientImpl = new 
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
-      1, 1, 1, true, 1, 1);
+      1, 1, 1, true, 1, 1, 10, 10);
   }
 
   @AfterEach
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHdfsTest.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHdfsTest.java
new file mode 100644
index 00000000..2eab10d4
--- /dev/null
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHdfsTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import scala.Tuple2;
+
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ShuffleUnregisterWithHdfsTest extends SparkIntegrationTestBase {
+
+  @BeforeAll
+  public static void setupServers() throws Exception {
+    CoordinatorConf coordinatorConf = getCoordinatorConf();
+    Map<String, String> dynamicConf = Maps.newHashMap();
+    dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), 
HDFS_URI + "rss/test");
+    dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), 
StorageType.HDFS.name());
+    addDynamicConf(coordinatorConf, dynamicConf);
+    createCoordinatorServer(coordinatorConf);
+    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+    shuffleServerConf.setString("rss.storage.type", StorageType.HDFS.name());
+    createShuffleServer(shuffleServerConf);
+    startServers();
+  }
+
+  @Override
+  public void updateSparkConfCustomer(SparkConf sparkConf) {
+  }
+
+  private int runCounter = 0;
+
+  @Test
+  public void unregisterShuffleTest() throws Exception {
+    run();
+  }
+
+  @Override
+  public Map runTest(SparkSession spark, String fileName) throws Exception {
+    // take a rest to make sure shuffle server is registered
+    Thread.sleep(3000);
+    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
+    JavaPairRDD<String, String> javaPairRDD1 = 
jsc.parallelizePairs(Lists.newArrayList(
+        new Tuple2<>("a", "1"), new Tuple2<>("b", "2"),
+        new Tuple2<>("c", "3"), new Tuple2<>("d", "4")), 2);
+    JavaPairRDD<String, Iterable<String>> javaPairRDD = 
javaPairRDD1.groupByKey().sortByKey();
+    Map map = javaPairRDD.collectAsMap();
+
+    // The second run will use the rss. and we should check the effectiveness 
of unregisterShuffle method.
+    if (runCounter == 1) {
+      String basePath = HDFS_URI + "rss/test";
+      String appPath = fs.listStatus(new 
Path(basePath))[0].getPath().toUri().getPath();
+
+      String shufflePath = appPath + "/0";
+      assertTrue(fs.exists(new Path(shufflePath)));
+
+      spark.sparkContext().env().blockManager().master().removeShuffle(0, 
true);
+
+      // Wait some time to cleanup the shuffle resource for shuffle-server
+      Thread.sleep(1000);
+      assertFalse(fs.exists(new Path(shufflePath)));
+      assertTrue(fs.exists(new Path(appPath)));
+    } else {
+      runCounter++;
+    }
+    return map;
+  }
+}
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
new file mode 100644
index 00000000..a3dd18f5
--- /dev/null
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.io.File;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import scala.Tuple2;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ShuffleUnregisterWithLocalfileTest extends 
SparkIntegrationTestBase {
+
+  @BeforeAll
+  public static void setupServers() throws Exception {
+    CoordinatorConf coordinatorConf = getCoordinatorConf();
+    Map<String, String> dynamicConf = Maps.newHashMap();
+    dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), 
HDFS_URI + "rss/test");
+    dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), 
StorageType.LOCALFILE.name());
+    addDynamicConf(coordinatorConf, dynamicConf);
+    createCoordinatorServer(coordinatorConf);
+    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+    shuffleServerConf.setString("rss.storage.type", 
StorageType.LOCALFILE.name());
+    createShuffleServer(shuffleServerConf);
+    startServers();
+  }
+
+  @Override
+  public void updateSparkConfCustomer(SparkConf sparkConf) {
+  }
+
+  private int runCounter = 0;
+
+  @Test
+  public void unregisterShuffleTest() throws Exception {
+    run();
+  }
+
+  @Override
+  public Map runTest(SparkSession spark, String fileName) throws Exception {
+    // take a rest to make sure shuffle server is registered
+    Thread.sleep(3000);
+    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
+    JavaPairRDD<String, String> javaPairRDD1 = 
jsc.parallelizePairs(Lists.newArrayList(
+        new Tuple2<>("a", "1"), new Tuple2<>("b", "2"),
+        new Tuple2<>("c", "3"), new Tuple2<>("d", "4")), 2);
+    JavaPairRDD<String, Iterable<String>> javaPairRDD = 
javaPairRDD1.groupByKey().sortByKey();
+    Map map = javaPairRDD.collectAsMap();
+
+    // The second run will use the rss. and we should check the effectiveness 
of unregisterShuffle method.
+    if (runCounter == 1) {
+      String path = shuffleServers.get(0).getShuffleServerConf()
+          .get(RssBaseConf.RSS_STORAGE_BASE_PATH).get(0);
+      String appPath = new File(path).listFiles()[0].getAbsolutePath();
+
+      String shufflePath = appPath + "/0";
+      assertTrue(new File(shufflePath).exists());
+
+      spark.sparkContext().env().blockManager().master().removeShuffle(0, 
true);
+
+      // Wait some time to cleanup the shuffle resource for shuffle-server
+      Thread.sleep(1000);
+      assertFalse(new File(shufflePath).exists());
+      assertTrue(new File(appPath).exists());
+    } else {
+      runCounter++;
+    }
+    return map;
+  }
+}
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
index f9873c8b..ab165072 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
@@ -28,6 +28,7 @@ import 
org.apache.uniffle.client.request.RssRegisterShuffleRequest;
 import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
 import org.apache.uniffle.client.request.RssSendCommitRequest;
 import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
 import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
 import org.apache.uniffle.client.response.RssFinishShuffleResponse;
 import org.apache.uniffle.client.response.RssGetInMemoryShuffleDataResponse;
@@ -38,9 +39,12 @@ import 
org.apache.uniffle.client.response.RssRegisterShuffleResponse;
 import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
 import org.apache.uniffle.client.response.RssSendCommitResponse;
 import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
 
 public interface ShuffleServerClient {
 
+  RssUnregisterShuffleResponse unregisterShuffle(RssUnregisterShuffleRequest 
request);
+
   RssRegisterShuffleResponse registerShuffle(RssRegisterShuffleRequest 
request);
 
   RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest 
request);
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 07658246..7f4fa076 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -40,6 +40,7 @@ import 
org.apache.uniffle.client.request.RssRegisterShuffleRequest;
 import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
 import org.apache.uniffle.client.request.RssSendCommitRequest;
 import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
 import org.apache.uniffle.client.response.ResponseStatusCode;
 import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
 import org.apache.uniffle.client.response.RssFinishShuffleResponse;
@@ -51,6 +52,7 @@ import 
org.apache.uniffle.client.response.RssRegisterShuffleResponse;
 import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
 import org.apache.uniffle.client.response.RssSendCommitResponse;
 import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
@@ -58,6 +60,7 @@ import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.exception.NotRetryException;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.RetryUtils;
+import org.apache.uniffle.proto.RssProtos;
 import org.apache.uniffle.proto.RssProtos.AppHeartBeatRequest;
 import org.apache.uniffle.proto.RssProtos.AppHeartBeatResponse;
 import org.apache.uniffle.proto.RssProtos.FinishShuffleRequest;
@@ -206,6 +209,35 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
     return result;
   }
 
+  private RssProtos.ShuffleUnregisterResponse doUnregisterShuffle(String 
appId, int shuffleId) {
+    RssProtos.ShuffleUnregisterRequest request = 
RssProtos.ShuffleUnregisterRequest.newBuilder()
+        .setAppId(appId)
+        .setShuffleId(shuffleId)
+        .build();
+    return blockingStub.unregisterShuffle(request);
+  }
+
+  @Override
+  public RssUnregisterShuffleResponse 
unregisterShuffle(RssUnregisterShuffleRequest request) {
+    RssProtos.ShuffleUnregisterResponse rpcResponse = 
doUnregisterShuffle(request.getAppId(), request.getShuffleId());
+
+    RssUnregisterShuffleResponse response;
+    StatusCode statusCode = rpcResponse.getStatus();
+
+    switch (statusCode) {
+      case SUCCESS:
+        response = new 
RssUnregisterShuffleResponse(ResponseStatusCode.SUCCESS);
+        break;
+      default:
+        String msg = String.format("Errors on unregister shuffle to %s:%s for 
appId[%s].shuffleId[%], error: %s",
+            host, port, request.getAppId(), request.getShuffleId(), 
rpcResponse.getRetMsg());
+        LOG.error(msg);
+        throw new RssException(msg);
+    }
+
+    return response;
+  }
+
   @Override
   public RssRegisterShuffleResponse registerShuffle(RssRegisterShuffleRequest 
request) {
     ShuffleRegisterResponse rpcResponse = doRegisterShuffle(
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleRequest.java
new file mode 100644
index 00000000..317e27ab
--- /dev/null
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleRequest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.request;
+
+public class RssUnregisterShuffleRequest {
+  private String appId;
+  private int shuffleId;
+
+  public RssUnregisterShuffleRequest(String appId, int shuffleId) {
+    this.appId = appId;
+    this.shuffleId = shuffleId;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
+  public int getShuffleId() {
+    return shuffleId;
+  }
+}
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleResponse.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleResponse.java
new file mode 100644
index 00000000..b872522e
--- /dev/null
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleResponse.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+public class RssUnregisterShuffleResponse extends ClientResponse {
+
+  public RssUnregisterShuffleResponse(ResponseStatusCode statusCode) {
+    super(statusCode);
+  }
+
+}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 26cd8b51..c2111bb0 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -25,6 +25,7 @@ package rss.common;
 
 service ShuffleServer {
   rpc registerShuffle (ShuffleRegisterRequest) returns 
(ShuffleRegisterResponse);
+  rpc unregisterShuffle(ShuffleUnregisterRequest) returns 
(ShuffleUnregisterResponse);
   rpc sendShuffleData (SendShuffleDataRequest) returns 
(SendShuffleDataResponse);
   rpc getLocalShuffleIndex (GetLocalShuffleIndexRequest) returns 
(GetLocalShuffleIndexResponse);
   rpc getLocalShuffleData (GetLocalShuffleDataRequest) returns 
(GetLocalShuffleDataResponse);
@@ -167,6 +168,16 @@ message ShuffleRegisterRequest {
   string user = 5;
 }
 
+message ShuffleUnregisterRequest {
+  string appId = 1;
+  int32 shuffleId = 2;
+}
+
+message ShuffleUnregisterResponse {
+  StatusCode status = 1;
+  string retMsg = 2;
+}
+
 message ShuffleRegisterResponse {
   StatusCode status = 1;
   string retMsg = 2;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index daae92df..6c6ece9e 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.server;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -337,6 +338,11 @@ public class ShuffleFlushManager {
     }
   }
 
+  public void removeResourcesOfShuffleId(String appId, int shuffleId) {
+    Optional.ofNullable(handlers.get(appId)).ifPresent(x -> 
x.remove(shuffleId));
+    Optional.ofNullable(committedBlockIds.get(appId)).ifPresent(x -> 
x.remove(shuffleId));
+  }
+
   private static class PendingShuffleFlushEvent {
     private final ShuffleDataFlushEvent event;
     private final long createTimeStamp = System.currentTimeMillis();
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index e08fd1c8..b7a8ab20 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -105,6 +105,29 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
     }
   }
 
+  @Override
+  public void unregisterShuffle(RssProtos.ShuffleUnregisterRequest request,
+      StreamObserver<RssProtos.ShuffleUnregisterResponse> 
responseStreamObserver) {
+    String appId = request.getAppId();
+    int shuffleId = request.getShuffleId();
+
+    StatusCode result = StatusCode.SUCCESS;
+    String responseMessage = "OK";
+    try {
+      shuffleServer.getShuffleTaskManager().removeShuffleDataAsync(appId, 
shuffleId);
+    } catch (Exception e) {
+      result = StatusCode.INTERNAL_ERROR;
+    }
+
+    RssProtos.ShuffleUnregisterResponse reply = 
RssProtos.ShuffleUnregisterResponse
+        .newBuilder()
+        .setStatus(valueOf(result))
+        .setRetMsg(responseMessage)
+        .build();
+    responseStreamObserver.onNext(reply);
+    responseStreamObserver.onCompleted();
+  }
+
   @Override
   public void registerShuffle(ShuffleRegisterRequest req,
       StreamObserver<ShuffleRegisterResponse> responseObserver) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index a26006ae..2ec3d6c9 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -18,9 +18,12 @@
 package org.apache.uniffle.server;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
@@ -34,6 +37,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
 import org.roaringbitmap.longlong.LongIterator;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
@@ -51,6 +55,9 @@ import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+import org.apache.uniffle.server.event.AppPurgeEvent;
+import org.apache.uniffle.server.event.PurgeEvent;
+import org.apache.uniffle.server.event.ShufflePurgeEvent;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.common.StorageReadMetrics;
@@ -78,7 +85,7 @@ public class ShuffleTaskManager {
   private Map<String, ShuffleTaskInfo> shuffleTaskInfos = 
Maps.newConcurrentMap();
   private Map<Long, PreAllocatedBufferInfo> requireBufferIds = 
Maps.newConcurrentMap();
   private Runnable clearResourceThread;
-  private BlockingQueue<String> expiredAppIdQueue = 
Queues.newLinkedBlockingQueue();
+  private BlockingQueue<PurgeEvent> expiredAppIdQueue = 
Queues.newLinkedBlockingQueue();
   // appId -> shuffleId -> serverReadHandler
 
   public ShuffleTaskManager(
@@ -109,8 +116,13 @@ public class ShuffleTaskManager {
     clearResourceThread = () -> {
       while (true) {
         try {
-          String appId = expiredAppIdQueue.take();
-          removeResources(appId);
+          PurgeEvent event = expiredAppIdQueue.take();
+          if (event instanceof AppPurgeEvent) {
+            removeResources(event.getAppId());
+          }
+          if (event instanceof ShufflePurgeEvent) {
+            removeResourcesByShuffleIds(event.getAppId(), ((ShufflePurgeEvent) 
event).getShuffleIds());
+          }
         } catch (Exception e) {
           LOG.error("Exception happened when clear resource for expired 
application", e);
         }
@@ -369,7 +381,7 @@ public class ShuffleTaskManager {
         if (System.currentTimeMillis() - 
shuffleTaskInfos.get(appId).getCurrentTimes() > appExpiredWithoutHB) {
           LOG.info("Detect expired appId[" + appId + "] according "
               + "to rss.server.app.expired.withoutHeartbeat");
-          expiredAppIdQueue.add(appId);
+          expiredAppIdQueue.add(new AppPurgeEvent(appId, 
getUserByAppId(appId)));
         }
       }
       ShuffleServerMetrics.gaugeAppNum.set(shuffleTaskInfos.size());
@@ -378,6 +390,42 @@ public class ShuffleTaskManager {
     }
   }
 
+  /**
+   * Clear up the partial resources of shuffleIds of App.
+   * @param appId
+   * @param shuffleIds
+   */
+  public void removeResourcesByShuffleIds(String appId, List<Integer> 
shuffleIds) {
+    if (CollectionUtils.isEmpty(shuffleIds)) {
+      return;
+    }
+
+    LOG.info("Start remove resource for appId[{}], shuffleIds[{}]", appId, 
shuffleIds);
+    final long start = System.currentTimeMillis();
+    final ShuffleTaskInfo taskInfo = shuffleTaskInfos.get(appId);
+    if (taskInfo != null) {
+      for (Integer shuffleId : shuffleIds) {
+        Optional.ofNullable(taskInfo).ifPresent(x -> 
x.getCachedBlockIds().remove(shuffleId));
+        Optional.ofNullable(taskInfo).ifPresent(x -> 
x.getCommitCounts().remove(shuffleId));
+        Optional.ofNullable(taskInfo).ifPresent(x -> 
x.getCommitLocks().remove(shuffleId));
+      }
+    }
+    Optional.ofNullable(partitionsToBlockIds.get(appId)).ifPresent(x -> {
+      for (Integer shuffleId : shuffleIds) {
+        x.remove(shuffleId);
+      }
+    });
+    shuffleBufferManager.removeBufferByShuffleId(appId, shuffleIds.toArray(new 
Integer[0]));
+    for (Integer shuffleId : shuffleIds) {
+      shuffleFlushManager.removeResourcesOfShuffleId(appId, shuffleId);
+    }
+    storageManager.removeResources(
+        new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds)
+    );
+    LOG.info("Finish remove resource for appId[{}], shuffleIds[{}], cost[{}]",
+        appId, shuffleIds, System.currentTimeMillis() - start);
+  }
+
   @VisibleForTesting
   public void removeResources(String appId) {
     LOG.info("Start remove resource for appId[" + appId + "]");
@@ -387,7 +435,9 @@ public class ShuffleTaskManager {
     shuffleBufferManager.removeBuffer(appId);
     shuffleFlushManager.removeResources(appId);
     if (!shuffleToCachedBlockIds.isEmpty()) {
-      storageManager.removeResources(appId, shuffleToCachedBlockIds.keySet(), 
getUserByAppId(appId));
+      storageManager.removeResources(
+          new AppPurgeEvent(appId, getUserByAppId(appId), new 
ArrayList<>(shuffleToCachedBlockIds.keySet()))
+      );
     }
     shuffleTaskInfos.remove(appId);
     LOG.info("Finish remove resource for appId[" + appId + "] cost " + 
(System.currentTimeMillis() - start) + " ms");
@@ -443,4 +493,13 @@ public class ShuffleTaskManager {
   public Map<String, Map<Integer, Roaring64NavigableMap[]>> 
getPartitionsToBlockIds() {
     return partitionsToBlockIds;
   }
+
+  public void removeShuffleDataAsync(String appId, int shuffleId) {
+    expiredAppIdQueue.add(new ShufflePurgeEvent(appId, getUserByAppId(appId), 
Arrays.asList(shuffleId)));
+  }
+
+  @VisibleForTesting
+  void removeShuffleDataSync(String appId, int shuffleId) {
+    removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId));
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index c0ce2f4f..99ae9c6b 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -24,6 +24,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -209,21 +210,10 @@ public class ShuffleBufferManager {
     if (shuffleIdToBuffers == null) {
       return;
     }
-    // calculate released size
-    long size = 0;
-    for (RangeMap<Integer, ShuffleBuffer> rangeMap : 
shuffleIdToBuffers.values()) {
-      if (rangeMap != null) {
-        Collection<ShuffleBuffer> buffers = rangeMap.asMapOfRanges().values();
-        if (buffers != null) {
-          for (ShuffleBuffer buffer : buffers) {
-            ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
-            size += buffer.getSize();
-          }
-        }
-      }
-    }
-    // release memory
-    releaseMemory(size, false, false);
+    removeBufferByShuffleId(
+        appId,
+        
shuffleIdToBuffers.keySet().stream().collect(Collectors.toList()).toArray(new 
Integer[0])
+    );
     shuffleSizeMap.remove(appId);
     bufferPool.remove(appId);
   }
@@ -459,4 +449,33 @@ public class ShuffleBufferManager {
     Set<Integer> shuffleIdSet = pickedShuffle.get(appId);
     shuffleIdSet.add(shuffleId);
   }
+
+  public void removeBufferByShuffleId(String appId, Integer... shuffleIds) {
+    Map<Integer, RangeMap<Integer, ShuffleBuffer>> shuffleIdToBuffers = 
bufferPool.get(appId);
+    if (shuffleIdToBuffers == null) {
+      return;
+    }
+
+    Map<Integer, AtomicLong> shuffleIdToSizeMap = shuffleSizeMap.get(appId);
+    for (int shuffleId : shuffleIds) {
+      long size = 0;
+
+      RangeMap<Integer, ShuffleBuffer> bufferRangeMap = 
shuffleIdToBuffers.get(shuffleId);
+      if (bufferRangeMap == null) {
+        continue;
+      }
+      Collection<ShuffleBuffer> buffers = 
bufferRangeMap.asMapOfRanges().values();
+      if (buffers != null) {
+        for (ShuffleBuffer buffer : buffers) {
+          ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
+          size += buffer.getSize();
+        }
+      }
+      releaseMemory(size, false, false);
+      if (shuffleIdToSizeMap != null) {
+        shuffleIdToSizeMap.remove(shuffleId);
+      }
+      shuffleIdToBuffers.remove(shuffleId);
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java 
b/server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java
new file mode 100644
index 00000000..804d3849
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.event;
+
+import java.util.List;
+
+public class AppPurgeEvent extends PurgeEvent {
+
+  public AppPurgeEvent(String appId, String user, List<Integer> shuffleIds) {
+    super(appId, user, shuffleIds);
+  }
+
+  public AppPurgeEvent(String appId, String user) {
+    super(appId, user, null);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java 
b/server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java
new file mode 100644
index 00000000..a63f04c7
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.event;
+
+import java.util.List;
+
+// todo: introduce the unified abstract dispatcher to handle events,
+// mentioned in 
https://github.com/apache/incubator-uniffle/pull/249#discussion_r983001435
+public abstract class PurgeEvent {
+  private String appId;
+  private String user;
+  private List<Integer> shuffleIds;
+
+  public PurgeEvent(String appId, String user, List<Integer> shuffleIds) {
+    this.appId = appId;
+    this.user = user;
+    this.shuffleIds = shuffleIds;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public List<Integer> getShuffleIds() {
+    return shuffleIds;
+  }
+
+  @Override
+  public String toString() {
+    return this.getClass().getSimpleName() + "{"
+        + "appId='" + appId + '\''
+        + ", user='" + user + '\''
+        + ", shuffleIds=" + shuffleIds
+        + '}';
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java 
b/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java
new file mode 100644
index 00000000..cbc39aab
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.event;
+
+import java.util.List;
+
+public class ShufflePurgeEvent extends PurgeEvent {
+
+  public ShufflePurgeEvent(String appId, String user, List<Integer> 
shuffleIds) {
+    super(appId, user, shuffleIds);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
index 7c0c0dd3..3087952e 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
@@ -17,9 +17,9 @@
 
 package org.apache.uniffle.server.storage;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -38,11 +38,14 @@ import org.apache.uniffle.server.ShuffleDataFlushEvent;
 import org.apache.uniffle.server.ShuffleDataReadEvent;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.server.event.AppPurgeEvent;
+import org.apache.uniffle.server.event.PurgeEvent;
 import org.apache.uniffle.storage.common.HdfsStorage;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.factory.ShuffleHandlerFactory;
 import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
 import org.apache.uniffle.storage.request.CreateShuffleDeleteHandlerRequest;
+import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 import org.apache.uniffle.storage.util.StorageType;
 
 public class HdfsStorageManager extends SingleStorageManager {
@@ -75,15 +78,32 @@ public class HdfsStorageManager extends 
SingleStorageManager {
   }
 
   @Override
-  public void removeResources(String appId, Set<Integer> shuffleSet, String 
user) {
+  public void removeResources(PurgeEvent event) {
+    String appId = event.getAppId();
+    String user = event.getUser();
     HdfsStorage storage = getStorageByAppId(appId);
     if (storage != null) {
-      storage.removeHandlers(appId);
+      if (event instanceof AppPurgeEvent) {
+        storage.removeHandlers(appId);
+      }
       appIdToStorages.remove(appId);
-      ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory.getInstance()
+      ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory
+          .getInstance()
           .createShuffleDeleteHandler(
-              new CreateShuffleDeleteHandlerRequest(StorageType.HDFS.name(), 
storage.getConf()));
-      deleteHandler.delete(new String[] {storage.getStoragePath()}, appId, 
user);
+              new CreateShuffleDeleteHandlerRequest(StorageType.HDFS.name(), 
storage.getConf())
+          );
+
+      String basicPath = 
ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId);
+      List<String> deletePaths = new ArrayList<>();
+
+      if (event instanceof AppPurgeEvent) {
+        deletePaths.add(basicPath);
+      } else {
+        for (Integer shuffleId : event.getShuffleIds()) {
+          
deletePaths.add(ShuffleStorageUtils.getFullShuffleDataFolder(basicPath, 
String.valueOf(shuffleId)));
+        }
+      }
+      deleteHandler.delete(deletePaths.toArray(new String[0]), appId, user);
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 1bbc6298..5c47e632 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -17,9 +17,12 @@
 
 package org.apache.uniffle.server.storage;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -44,6 +47,9 @@ import org.apache.uniffle.server.ShuffleDataFlushEvent;
 import org.apache.uniffle.server.ShuffleDataReadEvent;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.server.event.AppPurgeEvent;
+import org.apache.uniffle.server.event.PurgeEvent;
+import org.apache.uniffle.server.event.ShufflePurgeEvent;
 import org.apache.uniffle.storage.common.LocalStorage;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.factory.ShuffleHandlerFactory;
@@ -171,10 +177,16 @@ public class LocalStorageManager extends 
SingleStorageManager {
   }
 
   @Override
-  public void removeResources(String appId, Set<Integer> shuffleSet, String 
user) {
+  public void removeResources(PurgeEvent event) {
+    String appId = event.getAppId();
+    String user = event.getUser();
+    List<Integer> shuffleSet = 
Optional.ofNullable(event.getShuffleIds()).orElse(Collections.emptyList());
+
     for (LocalStorage storage : localStorages) {
-      for (Integer shuffleId : shuffleSet) {
+      if (event instanceof AppPurgeEvent) {
         storage.removeHandlers(appId);
+      }
+      for (Integer shuffleId : shuffleSet) {
         storage.removeResources(RssUtils.generateShuffleKey(appId, shuffleId));
       }
     }
@@ -182,7 +194,23 @@ public class LocalStorageManager extends 
SingleStorageManager {
     ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory.getInstance()
         .createShuffleDeleteHandler(
             new 
CreateShuffleDeleteHandlerRequest(StorageType.LOCALFILE.name(), new 
Configuration()));
-    deleteHandler.delete(storageBasePaths.toArray(new 
String[storageBasePaths.size()]), appId, user);
+
+    List<String> deletePaths = storageBasePaths.stream().flatMap(path -> {
+      String basicPath = ShuffleStorageUtils.getFullShuffleDataFolder(path, 
appId);
+      if (event instanceof ShufflePurgeEvent) {
+        List<String> paths = new ArrayList<>();
+        for (int shuffleId : shuffleSet) {
+          paths.add(
+              ShuffleStorageUtils.getFullShuffleDataFolder(basicPath, 
String.valueOf(shuffleId))
+          );
+        }
+        return paths.stream();
+      } else {
+        return Arrays.asList(basicPath).stream();
+      }
+    }).collect(Collectors.toList());
+
+    deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), 
appId, user);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
index d4fff0a9..2699a3f9 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
@@ -18,7 +18,6 @@
 package org.apache.uniffle.server.storage;
 
 import java.io.IOException;
-import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,6 +27,7 @@ import org.apache.uniffle.server.Checker;
 import org.apache.uniffle.server.ShuffleDataFlushEvent;
 import org.apache.uniffle.server.ShuffleDataReadEvent;
 import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.event.PurgeEvent;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
 import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
@@ -112,10 +112,10 @@ public class MultiStorageManager implements 
StorageManager {
   }
 
   @Override
-  public void removeResources(String appId, Set<Integer> shuffleSet, String 
user) {
-    LOG.info("Start to remove resource of appId: {}, shuffles: {}", appId, 
shuffleSet.toString());
-    warmStorageManager.removeResources(appId, shuffleSet, user);
-    coldStorageManager.removeResources(appId, shuffleSet, user);
+  public void removeResources(PurgeEvent event) {
+    LOG.info("Start to remove resource of {}", event);
+    warmStorageManager.removeResources(event);
+    coldStorageManager.removeResources(event);
   }
 
   public StorageManager getColdStorageManager() {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java 
b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
index d8fe5c9e..0960ca37 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
@@ -17,12 +17,11 @@
 
 package org.apache.uniffle.server.storage;
 
-import java.util.Set;
-
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.server.Checker;
 import org.apache.uniffle.server.ShuffleDataFlushEvent;
 import org.apache.uniffle.server.ShuffleDataReadEvent;
+import org.apache.uniffle.server.event.PurgeEvent;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
 
@@ -39,7 +38,7 @@ public interface StorageManager {
 
   // todo: add an interface for updateReadMetrics
 
-  void removeResources(String appId, Set<Integer> shuffleSet, String user);
+  void removeResources(PurgeEvent event);
 
   void start();
 
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
index 4800c4d2..69d55fb0 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
@@ -18,11 +18,11 @@
 package org.apache.uniffle.server;
 
 import java.io.FileNotFoundException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import com.google.common.collect.Sets;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Level;
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.common.KerberizedHdfsBase;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+import org.apache.uniffle.server.event.AppPurgeEvent;
 import org.apache.uniffle.server.storage.HdfsStorageManager;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.server.storage.StorageManagerFactory;
@@ -134,7 +135,9 @@ public class ShuffleFlushManagerOnKerberizedHdfsTest 
extends KerberizedHdfsBase
     manager.removeResources(appId1);
 
     
assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
-    storageManager.removeResources(appId1, Sets.newHashSet(1), "alex");
+    storageManager.removeResources(
+        new AppPurgeEvent(appId1, "alex", Arrays.asList(1))
+    );
     
assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
     try {
       kerberizedHdfs.getFileSystem().listStatus(new 
Path(remoteStorage.getPath() + "/" + appId1 + "/"));
@@ -151,7 +154,9 @@ public class ShuffleFlushManagerOnKerberizedHdfsTest 
extends KerberizedHdfsBase
     assertEquals(1, size);
     manager.removeResources(appId2);
     
assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
-    storageManager.removeResources(appId2, Sets.newHashSet(1), "alex");
+    storageManager.removeResources(
+        new AppPurgeEvent(appId2, "alex", Arrays.asList(1))
+    );
     
assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
     assertEquals(0, manager.getCommittedBlockIds(appId2, 
1).getLongCardinality());
     size = storage.getHandlerSize();
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 2ae581e7..5f1bb47d 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -52,6 +52,7 @@ import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+import org.apache.uniffle.server.event.AppPurgeEvent;
 import org.apache.uniffle.server.storage.HdfsStorageManager;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.server.storage.StorageManagerFactory;
@@ -229,7 +230,9 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     manager.removeResources(appId1);
 
     
assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
-    storageManager.removeResources(appId1, Sets.newHashSet(1), 
StringUtils.EMPTY);
+    storageManager.removeResources(
+        new AppPurgeEvent(appId1, StringUtils.EMPTY, Arrays.asList(1))
+    );
     
assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
     try {
       fs.listStatus(new Path(remoteStorage.getPath() + "/" + appId1 + "/"));
@@ -244,7 +247,9 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     assertEquals(1, size);
     manager.removeResources(appId2);
     
assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
-    storageManager.removeResources(appId2, Sets.newHashSet(1), 
StringUtils.EMPTY);
+    storageManager.removeResources(
+        new AppPurgeEvent(appId2, StringUtils.EMPTY, Arrays.asList(1))
+    );
     
assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
     assertEquals(0, manager.getCommittedBlockIds(appId2, 
1).getLongCardinality());
     size = storage.getHandlerSize();
@@ -253,7 +258,9 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     // but thecache from appIdToStorages has removed, so we need to delete 
this path in hdfs
     Path path = new Path(remoteStorage.getPath() + "/" + appId2 + "/");
     assertTrue(fs.mkdirs(path));
-    storageManager.removeResources(appId2, Sets.newHashSet(1), 
StringUtils.EMPTY);
+    storageManager.removeResources(
+        new AppPurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(1))
+    );
     assertFalse(fs.exists(path));
     HdfsStorage storageByAppId = ((HdfsStorageManager) 
storageManager).getStorageByAppId(appId2);
     assertNull(storageByAppId);
@@ -286,7 +293,9 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     assertEquals(2, storage.getHandlerSize());
     File file = new File(tempDir, appId1);
     assertTrue(file.exists());
-    storageManager.removeResources(appId1, Sets.newHashSet(1), 
StringUtils.EMPTY);
+    storageManager.removeResources(
+        new AppPurgeEvent(appId1, StringUtils.EMPTY, Lists.newArrayList(1))
+    );
     manager.removeResources(appId1);
     assertFalse(file.exists());
     ShuffleDataFlushEvent event3 =
@@ -297,7 +306,9 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     assertEquals(5, manager.getCommittedBlockIds(appId2, 
1).getLongCardinality());
     assertEquals(1, storage.getHandlerSize());
     manager.removeResources(appId2);
-    storageManager.removeResources(appId2, Sets.newHashSet(1), 
StringUtils.EMPTY);
+    storageManager.removeResources(
+        new AppPurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(1))
+    );
     assertEquals(0, manager.getCommittedBlockIds(appId2, 
1).getLongCardinality());
     assertEquals(0, storage.getHandlerSize());
   }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 2c8f54a3..5a869b43 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.uniffle.server;
 
+import java.io.File;
+import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -30,6 +32,7 @@ import com.google.common.collect.RangeMap;
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Test;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -49,10 +52,13 @@ import 
org.apache.uniffle.server.buffer.ShuffleBufferManager;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.storage.HdfsTestBase;
 import org.apache.uniffle.storage.handler.impl.HdfsClientReadHandler;
+import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -251,6 +257,141 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     }
   }
 
+  /**
+   * Clean up the shuffle data of stage level for one app
+   * @throws Exception
+   */
+  @Test
+  public void removeShuffleDataWithHdfsTest() throws Exception {
+    String confFile = ClassLoader.getSystemResource("server.conf").getFile();
+    ShuffleServerConf conf = new ShuffleServerConf(confFile);
+    String storageBasePath = HDFS_URI + "rss/clearTest";
+    conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
+    conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
+    conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
+    conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
+    conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
+    conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 
50.0);
+    conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 
0.0);
+    conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
+    conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
+    conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
+
+    ShuffleServer shuffleServer = new ShuffleServer(conf);
+
+    ShuffleBufferManager shuffleBufferManager = 
shuffleServer.getShuffleBufferManager();
+    ShuffleFlushManager shuffleFlushManager = 
shuffleServer.getShuffleFlushManager();
+    StorageManager storageManager = shuffleServer.getStorageManager();
+    ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(
+        conf, shuffleFlushManager, shuffleBufferManager, storageManager);
+
+    String appId = "removeShuffleDataTest1";
+    for (int i = 0; i < 4; i++) {
+      shuffleTaskManager.registerShuffle(
+          appId,
+          i,
+          Lists.newArrayList(new PartitionRange(0, 1)),
+          new RemoteStorageInfo(storageBasePath, Maps.newHashMap()),
+          StringUtils.EMPTY
+      );
+    }
+    shuffleTaskManager.refreshAppId(appId);
+
+    assertEquals(1, shuffleTaskManager.getAppIds().size());
+
+    ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
+
+    shuffleTaskManager.requireBuffer(35);
+    shuffleTaskManager.requireBuffer(35);
+    shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0);
+    shuffleTaskManager.updateCachedBlockIds(appId, 0, 
partitionedData0.getBlockList());
+    shuffleTaskManager.cacheShuffleData(appId, 1, false, partitionedData0);
+    shuffleTaskManager.updateCachedBlockIds(appId, 1, 
partitionedData0.getBlockList());
+    shuffleTaskManager.refreshAppId(appId);
+    shuffleTaskManager.checkResourceStatus();
+
+    assertEquals(1, shuffleTaskManager.getAppIds().size());
+
+    RangeMap<Integer, ShuffleBuffer> rangeMap = 
shuffleBufferManager.getBufferPool().get(appId).get(0);
+    assertFalse(rangeMap.asMapOfRanges().isEmpty());
+    shuffleTaskManager.commitShuffle(appId, 0);
+
+    // Before removing shuffle resources
+    String appBasePath = 
ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath, appId);
+    String shufflePath0 = 
ShuffleStorageUtils.getFullShuffleDataFolder(appBasePath, "0");
+    assertTrue(fs.exists(new Path(shufflePath0)));
+
+    // After removing the shuffle id of 0 resources
+    shuffleTaskManager.removeShuffleDataSync(appId, 0);
+    assertFalse(fs.exists(new Path(shufflePath0)));
+    assertTrue(fs.exists(new Path(appBasePath)));
+    assertNull(shuffleBufferManager.getBufferPool().get(appId).get(0));
+    assertNotNull(shuffleBufferManager.getBufferPool().get(appId).get(1));
+  }
+
+  @Test
+  public void removeShuffleDataWithLocalfileTest() throws Exception {
+    String confFile = ClassLoader.getSystemResource("server.conf").getFile();
+    ShuffleServerConf conf = new ShuffleServerConf(confFile);
+    conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
+    conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
+    conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
+    conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
+    conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 1000L);
+    conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 
50.0);
+    conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 
0.0);
+    conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
+    conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 100000L);
+    conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
+
+    conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, "LOCALFILE");
+    java.nio.file.Path path1 = 
Files.createTempDirectory("removeShuffleDataWithLocalfileTest");
+    java.nio.file.Path path2 = 
Files.createTempDirectory("removeShuffleDataWithLocalfileTest");
+    conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH.key(),
+        path1.toAbsolutePath().toString() + "," + 
path2.toAbsolutePath().toString());
+
+    ShuffleServer shuffleServer = new ShuffleServer(conf);
+
+    ShuffleBufferManager shuffleBufferManager = 
shuffleServer.getShuffleBufferManager();
+    ShuffleFlushManager shuffleFlushManager = 
shuffleServer.getShuffleFlushManager();
+    StorageManager storageManager = shuffleServer.getStorageManager();
+    ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(
+        conf, shuffleFlushManager, shuffleBufferManager, storageManager);
+
+    String appId = "removeShuffleDataWithLocalfileTest";
+
+    int shuffleNum = 4;
+    for (int i = 0; i < shuffleNum; i++) {
+      shuffleTaskManager.registerShuffle(
+          appId,
+          i,
+          Lists.newArrayList(new PartitionRange(0, 1)),
+          RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
+          StringUtils.EMPTY
+      );
+
+      ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 
35);
+      shuffleTaskManager.requireBuffer(35);
+      shuffleTaskManager.cacheShuffleData(appId, i, false, partitionedData0);
+      shuffleTaskManager.updateCachedBlockIds(appId, i, 
partitionedData0.getBlockList());
+    }
+
+    assertEquals(1, shuffleTaskManager.getAppIds().size());
+
+    for (int i = 0; i < shuffleNum; i++) {
+      shuffleTaskManager.commitShuffle(appId, i);
+      shuffleTaskManager.removeShuffleDataSync(appId, i);
+    }
+
+    for (String path : conf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH)) {
+      String appPath = path + "/" + appId;
+      File[] files = new File(appPath).listFiles();
+      if (files != null) {
+        assertEquals(0, files.length);
+      }
+    }
+  }
+
   @Test
   public void clearTest() throws Exception {
     ShuffleServerConf conf = new ShuffleServerConf();
diff --git a/spark-patches/spark-2.4.6_dynamic_allocation_support.patch 
b/spark-patches/spark-2.4.6_dynamic_allocation_support.patch
index 88f895c5..22581d46 100644
--- a/spark-patches/spark-2.4.6_dynamic_allocation_support.patch
+++ b/spark-patches/spark-2.4.6_dynamic_allocation_support.patch
@@ -15,21 +15,6 @@
 # limitations under the License.
 #
 
-diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala 
b/core/src/main/scala/org/apache/spark/Dependency.scala
-index 9ea6d2fa2f..d6101cdcef 100644
---- a/core/src/main/scala/org/apache/spark/Dependency.scala
-+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
-@@ -93,7 +93,9 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
-   val shuffleHandle: ShuffleHandle = 
_rdd.context.env.shuffleManager.registerShuffle(
-     shuffleId, _rdd.partitions.length, this)
- 
--  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
-+  if (!_rdd.context.getConf.isRssEnable()) {
-+    _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
-+  }
- }
- 
- 
 diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
 index 38bed797a0..a95d4c7d77 100644
 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
diff --git a/spark-patches/spark-3.1.2_dynamic_allocation_support.patch 
b/spark-patches/spark-3.1.2_dynamic_allocation_support.patch
index c3431dca..d35c3fcd 100644
--- a/spark-patches/spark-3.1.2_dynamic_allocation_support.patch
+++ b/spark-patches/spark-3.1.2_dynamic_allocation_support.patch
@@ -15,23 +15,6 @@
 # limitations under the License.
 #
 
-diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala 
b/core/src/main/scala/org/apache/spark/Dependency.scala
-index d21b9d9833..fe4507f81f 100644
---- a/core/src/main/scala/org/apache/spark/Dependency.scala
-+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
-@@ -110,8 +110,10 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
- 
-   def getMergerLocs: Seq[BlockManagerId] = mergerLocs
- 
--  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
--  _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
-+  if (!_rdd.context.getConf.isRssEnable()) {
-+    _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
-+    _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
-+  }
- }
- 
- 
 diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
 index bdb768ed5a..ffde0643c5 100644
 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
diff --git a/spark-patches/spark-3.2.1_dynamic_allocation_support.patch 
b/spark-patches/spark-3.2.1_dynamic_allocation_support.patch
index e170ae28..81046dec 100644
--- a/spark-patches/spark-3.2.1_dynamic_allocation_support.patch
+++ b/spark-patches/spark-3.2.1_dynamic_allocation_support.patch
@@ -15,23 +15,6 @@
 # limitations under the License.
 #
 
-diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala 
b/core/src/main/scala/org/apache/spark/Dependency.scala
-index 1b4e7ba5106..95818ff72ca 100644
---- a/core/src/main/scala/org/apache/spark/Dependency.scala
-+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
-@@ -174,8 +174,10 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
-       !rdd.isBarrier()
-   }
- 
--  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
--  _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
-+  if (!_rdd.context.getConf.isRssEnable()) {
-+    _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
-+    _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
-+  }
- }
- 
- 
 diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
 index c4b619300b5..821a01985d9 100644
 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java
index d1201a87..43994d69 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java
@@ -25,7 +25,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
 import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
-import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 
 public class HdfsShuffleDeleteHandler implements ShuffleDeleteHandler {
 
@@ -39,33 +38,35 @@ public class HdfsShuffleDeleteHandler implements 
ShuffleDeleteHandler {
 
   @Override
   public void delete(String[] storageBasePaths, String appId, String user) {
-    Path path = new 
Path(ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePaths[0], appId));
-    boolean isSuccess = false;
-    int times = 0;
-    int retryMax = 5;
-    long start = System.currentTimeMillis();
-    LOG.info("Try delete shuffle data in HDFS for appId[{}] of user[{}] with 
{}",appId, user, path);
-    while (!isSuccess && times < retryMax) {
-      try {
-        FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(user, 
path, hadoopConf);
-        fileSystem.delete(path, true);
-        isSuccess = true;
-      } catch (Exception e) {
-        times++;
-        LOG.warn("Can't delete shuffle data for appId[" + appId + "] with " + 
times + " times", e);
+    for (String deletePath : storageBasePaths) {
+      final Path path = new Path(deletePath);
+      boolean isSuccess = false;
+      int times = 0;
+      int retryMax = 5;
+      long start = System.currentTimeMillis();
+      LOG.info("Try delete shuffle data in HDFS for appId[{}] of user[{}] with 
{}",appId, user, path);
+      while (!isSuccess && times < retryMax) {
         try {
-          Thread.sleep(1000);
-        } catch (Exception ex) {
-          LOG.warn("Exception happened when Thread.sleep", ex);
+          FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(user, 
path, hadoopConf);
+          fileSystem.delete(path, true);
+          isSuccess = true;
+        } catch (Exception e) {
+          times++;
+          LOG.warn("Can't delete shuffle data for appId[" + appId + "] with " 
+ times + " times", e);
+          try {
+            Thread.sleep(1000);
+          } catch (Exception ex) {
+            LOG.warn("Exception happened when Thread.sleep", ex);
+          }
         }
       }
-    }
-    if (isSuccess) {
-      LOG.info("Delete shuffle data in HDFS for appId[" + appId + "] with " + 
path + " successfully in "
-          + (System.currentTimeMillis() - start) + " ms");
-    } else {
-      LOG.info("Failed to delete shuffle data in HDFS for appId[" + appId + "] 
with " + path + " in "
-          + (System.currentTimeMillis() - start) + " ms");
+      if (isSuccess) {
+        LOG.info("Delete shuffle data in HDFS for appId[" + appId + "] with " 
+ path + " successfully in "
+            + (System.currentTimeMillis() - start) + " ms");
+      } else {
+        LOG.info("Failed to delete shuffle data in HDFS for appId[" + appId + 
"] with " + path + " in "
+            + (System.currentTimeMillis() - start) + " ms");
+      }
     }
   }
 }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java
index 97bf3864..e2fb88b2 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java
@@ -24,16 +24,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
-import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 
 public class LocalFileDeleteHandler implements ShuffleDeleteHandler {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(LocalFileDeleteHandler.class);
 
   @Override
-  public void delete(String[] storageBasePaths, String appId, String user) {
-    for (String basePath : storageBasePaths) {
-      String shufflePath = 
ShuffleStorageUtils.getFullShuffleDataFolder(basePath, appId);
+  public void delete(String[] shuffleDataStoredPath, String appId, String 
user) {
+    for (String basePath : shuffleDataStoredPath) {
+      final String shufflePath = basePath;
       long start = System.currentTimeMillis();
       try {
         File baseFolder = new File(shufflePath);


Reply via email to