This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 1ee68201 Introduce data cleanup mechanism on stage level (#249)
1ee68201 is described below
commit 1ee68201c1e2e8ade2b728831fc996ba9d8c0316
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Oct 12 14:36:02 2022 +0800
Introduce data cleanup mechanism on stage level (#249)
### What changes were proposed in this pull request?
Introduce data cleanup mechanism on stage level
### Why are the changes needed?
This PR is to optimize the disk capacity. For example
1. For some spark ML jobs, it will run multiple stages and reserve large
unused shuffle data in shuffle-servers.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UTs
---
.../hadoop/mapred/SortWriteBufferManagerTest.java | 5 +
.../hadoop/mapreduce/task/reduce/FetcherTest.java | 5 +
.../org/apache/spark/shuffle/RssSparkConfig.java | 8 ++
.../apache/spark/shuffle/RssShuffleManager.java | 11 +-
.../apache/spark/shuffle/RssShuffleManager.java | 36 +++++-
.../uniffle/client/api/ShuffleWriteClient.java | 2 +
.../client/factory/ShuffleClientFactory.java | 27 +++-
.../client/impl/ShuffleWriteClientImpl.java | 57 ++++++++-
.../client/impl/ShuffleWriteClientImplTest.java | 2 +-
docs/client_guide.md | 2 +
.../uniffle/test/AssignmentWithTagsTest.java | 2 +-
.../uniffle/test/CoordinatorAssignmentTest.java | 4 +-
.../java/org/apache/uniffle/test/QuorumTest.java | 2 +-
.../apache/uniffle/test/ShuffleServerGrpcTest.java | 2 +-
.../uniffle/test/ShuffleWithRssClientTest.java | 2 +-
.../test/ShuffleUnregisterWithHdfsTest.java | 98 ++++++++++++++
.../test/ShuffleUnregisterWithLocalfileTest.java | 100 +++++++++++++++
.../uniffle/client/api/ShuffleServerClient.java | 4 +
.../client/impl/grpc/ShuffleServerGrpcClient.java | 32 +++++
.../request/RssUnregisterShuffleRequest.java | 36 ++++++
.../response/RssUnregisterShuffleResponse.java | 26 ++++
proto/src/main/proto/Rss.proto | 11 ++
.../apache/uniffle/server/ShuffleFlushManager.java | 6 +
.../uniffle/server/ShuffleServerGrpcService.java | 23 ++++
.../apache/uniffle/server/ShuffleTaskManager.java | 69 +++++++++-
.../server/buffer/ShuffleBufferManager.java | 49 ++++---
.../apache/uniffle/server/event/AppPurgeEvent.java | 31 +++++
.../apache/uniffle/server/event/PurgeEvent.java | 55 ++++++++
.../uniffle/server/event/ShufflePurgeEvent.java | 27 ++++
.../uniffle/server/storage/HdfsStorageManager.java | 32 ++++-
.../server/storage/LocalStorageManager.java | 34 ++++-
.../server/storage/MultiStorageManager.java | 10 +-
.../uniffle/server/storage/StorageManager.java | 5 +-
.../ShuffleFlushManagerOnKerberizedHdfsTest.java | 11 +-
.../uniffle/server/ShuffleFlushManagerTest.java | 21 ++-
.../uniffle/server/ShuffleTaskManagerTest.java | 141 +++++++++++++++++++++
.../spark-2.4.6_dynamic_allocation_support.patch | 15 ---
.../spark-3.1.2_dynamic_allocation_support.patch | 17 ---
.../spark-3.2.1_dynamic_allocation_support.patch | 17 ---
.../handler/impl/HdfsShuffleDeleteHandler.java | 51 ++++----
.../handler/impl/LocalFileDeleteHandler.java | 7 +-
41 files changed, 953 insertions(+), 142 deletions(-)
diff --git
a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
index 97c58472..029b1e0e 100644
---
a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
+++
b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
@@ -336,5 +336,10 @@ public class SortWriteBufferManagerTest {
public void close() {
}
+
+ @Override
+ public void unregisterShuffle(String appId, int shuffleId) {
+
+ }
}
}
diff --git
a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
index 5f9713b2..ec630e24 100644
---
a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
+++
b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
@@ -426,6 +426,11 @@ public class FetcherTest {
public void close() {
}
+
+ @Override
+ public void unregisterShuffle(String appId, int shuffleId) {
+
+ }
}
static class MockedShuffleReadClient implements ShuffleReadClient {
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 5a46da3e..5f39eb5d 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -112,6 +112,14 @@ public class RssSparkConfig {
.doc("The max data size sent to shuffle server"))
.createWithDefault("16m");
+ public static final ConfigEntry<Integer>
RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE = createIntegerBuilder(
+ new ConfigBuilder("spark.rss.client.unregister.thread.pool.size"))
+ .createWithDefault(10);
+
+ public static final ConfigEntry<Integer>
RSS_CLIENT_UNREGISTER_REQUEST_TIMEOUT_SEC = createIntegerBuilder(
+ new ConfigBuilder("spark.rss.client.unregister.request.timeout.sec"))
+ .createWithDefault(10);
+
// When the size of read buffer reaches the half of JVM region (i.e., 32m),
// it will incur humongous allocation, so we set it to 14m.
public static final ConfigEntry<String> RSS_CLIENT_READ_BUFFER_SIZE =
createStringBuilder(
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index cdd58a27..625f8e83 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -160,11 +160,13 @@ public class RssShuffleManager implements ShuffleManager {
long retryIntervalMax =
sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX);
int heartBeatThreadNum =
sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM);
this.dataCommitPoolSize =
sparkConf.get(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE);
+ int unregisterThreadPoolSize =
sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE);
+ int unregisterRequestTimeoutSec =
sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_REQUEST_TIMEOUT_SEC);
this.shuffleWriteClient = ShuffleClientFactory
.getInstance()
.createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
heartBeatThreadNum,
dataReplica, dataReplicaWrite, dataReplicaRead,
dataReplicaSkipEnabled, dataTransferPoolSize,
- dataCommitPoolSize);
+ dataCommitPoolSize, unregisterThreadPoolSize,
unregisterRequestTimeoutSec);
registerCoordinator();
// fetch client conf and apply them if necessary and disable ESS
if (isDriver && dynamicConfEnabled) {
@@ -370,6 +372,13 @@ public class RssShuffleManager implements ShuffleManager {
@Override
public boolean unregisterShuffle(int shuffleId) {
+ try {
+ if (SparkEnv.get().executorId().equals("driver")) {
+ shuffleWriteClient.unregisterShuffle(appId, shuffleId);
+ }
+ } catch (Exception e) {
+ LOG.warn("Errors on unregister to remote shuffle-servers", e);
+ }
return true;
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 6f04ccff..96c93ef7 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -160,12 +160,13 @@ public class RssShuffleManager implements ShuffleManager {
int heartBeatThreadNum =
sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM);
this.dataTransferPoolSize =
sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE);
this.dataCommitPoolSize =
sparkConf.get(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE);
-
+ int unregisterThreadPoolSize =
sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE);
+ int unregisterRequestTimeoutSec =
sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_REQUEST_TIMEOUT_SEC);
shuffleWriteClient = ShuffleClientFactory
.getInstance()
.createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
heartBeatThreadNum,
dataReplica, dataReplicaWrite, dataReplicaRead,
dataReplicaSkipEnabled, dataTransferPoolSize,
- dataCommitPoolSize);
+ dataCommitPoolSize, unregisterThreadPoolSize,
unregisterRequestTimeoutSec);
registerCoordinator();
// fetch client conf and apply them if necessary and disable ESS
if (isDriver && dynamicConfEnabled) {
@@ -220,12 +221,24 @@ public class RssShuffleManager implements ShuffleManager {
int heartBeatThreadNum =
sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM);
this.dataTransferPoolSize =
sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE);
this.dataCommitPoolSize =
sparkConf.get(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE);
-
+ int unregisterThreadPoolSize =
sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE);
+ int unregisterRequestTimeoutSec =
sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_REQUEST_TIMEOUT_SEC);
shuffleWriteClient = ShuffleClientFactory
.getInstance()
- .createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
heartBeatThreadNum,
- dataReplica, dataReplicaWrite, dataReplicaRead,
dataReplicaSkipEnabled, dataTransferPoolSize,
- dataCommitPoolSize);
+ .createShuffleWriteClient(
+ clientType,
+ retryMax,
+ retryIntervalMax,
+ heartBeatThreadNum,
+ dataReplica,
+ dataReplicaWrite,
+ dataReplicaRead,
+ dataReplicaSkipEnabled,
+ dataTransferPoolSize,
+ dataCommitPoolSize,
+ unregisterThreadPoolSize,
+ unregisterRequestTimeoutSec
+ );
this.taskToSuccessBlockIds = taskToSuccessBlockIds;
this.taskToFailedBlockIds = taskToFailedBlockIds;
if (loop != null) {
@@ -551,6 +564,13 @@ public class RssShuffleManager implements ShuffleManager {
@Override
public boolean unregisterShuffle(int shuffleId) {
+ try {
+ if (SparkEnv.get().executorId().equals("driver")) {
+ shuffleWriteClient.unregisterShuffle(id.get(), shuffleId);
+ }
+ } catch (Exception e) {
+ LOG.warn("Errors on unregister to remote shuffle-servers", e);
+ }
return true;
}
@@ -712,4 +732,8 @@ public class RssShuffleManager implements ShuffleManager {
public void setRemoteStorage(RemoteStorageInfo remoteStorage) {
this.remoteStorage = remoteStorage;
}
+
+ public String getId() {
+ return id.get();
+ }
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
index f507a96a..39df0029 100644
--- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
+++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
@@ -69,4 +69,6 @@ public interface ShuffleWriteClient {
Map<ShuffleServerInfo, Set<Integer>> serverToPartitions, String appId,
int shuffleId);
void close();
+
+ void unregisterShuffle(String appId, int shuffleId);
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
index d11a07f1..652f9923 100644
---
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
+++
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
@@ -34,12 +34,35 @@ public class ShuffleClientFactory {
return INSTANCE;
}
+ /**
+ * Only for MR engine, which wont used to unregister to remote
shuffle-servers
+ */
public ShuffleWriteClient createShuffleWriteClient(
String clientType, int retryMax, long retryIntervalMax, int
heartBeatThreadNum,
int replica, int replicaWrite, int replicaRead, boolean
replicaSkipEnabled, int dataTransferPoolSize,
int dataCommitPoolSize) {
- return new ShuffleWriteClientImpl(clientType, retryMax, retryIntervalMax,
heartBeatThreadNum,
- replica, replicaWrite, replicaRead, replicaSkipEnabled,
dataTransferPoolSize, dataCommitPoolSize);
+ return createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
heartBeatThreadNum, replica,
+ replicaWrite, replicaRead, replicaSkipEnabled, dataTransferPoolSize,
dataCommitPoolSize, 10, 10);
+ }
+
+ public ShuffleWriteClient createShuffleWriteClient(
+ String clientType, int retryMax, long retryIntervalMax, int
heartBeatThreadNum,
+ int replica, int replicaWrite, int replicaRead, boolean
replicaSkipEnabled, int dataTransferPoolSize,
+ int dataCommitPoolSize, int unregisterThreadPoolSize, int
unregisterRequestTimeoutSec) {
+ return new ShuffleWriteClientImpl(
+ clientType,
+ retryMax,
+ retryIntervalMax,
+ heartBeatThreadNum,
+ replica,
+ replicaWrite,
+ replicaRead,
+ replicaSkipEnabled,
+ dataTransferPoolSize,
+ dataCommitPoolSize,
+ unregisterThreadPoolSize,
+ unregisterRequestTimeoutSec
+ );
}
public ShuffleReadClient
createShuffleReadClient(CreateShuffleReadClientRequest request) {
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 7be84d4d..e8372d8e 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -54,6 +54,7 @@ import
org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
import org.apache.uniffle.client.response.ClientResponse;
import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
@@ -66,6 +67,7 @@ import
org.apache.uniffle.client.response.RssRegisterShuffleResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssSendCommitResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
@@ -78,6 +80,7 @@ import org.apache.uniffle.common.util.ThreadUtils;
public class ShuffleWriteClientImpl implements ShuffleWriteClient {
private static final Logger LOG =
LoggerFactory.getLogger(ShuffleWriteClientImpl.class);
+
private String clientType;
private int retryMax;
private long retryIntervalMax;
@@ -89,9 +92,10 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
private int replicaWrite;
private int replicaRead;
private boolean replicaSkipEnabled;
- private int dataTranferPoolSize;
private int dataCommitPoolSize = -1;
private final ForkJoinPool dataTransferPool;
+ private final int unregisterThreadPoolSize;
+ private final int unregisterRequestTimeSec;
public ShuffleWriteClientImpl(
String clientType,
@@ -103,7 +107,9 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
int replicaRead,
boolean replicaSkipEnabled,
int dataTranferPoolSize,
- int dataCommitPoolSize) {
+ int dataCommitPoolSize,
+ int unregisterThreadPoolSize,
+ int unregisterRequestTimeSec) {
this.clientType = clientType;
this.retryMax = retryMax;
this.retryIntervalMax = retryIntervalMax;
@@ -114,9 +120,10 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
this.replicaWrite = replicaWrite;
this.replicaRead = replicaRead;
this.replicaSkipEnabled = replicaSkipEnabled;
- this.dataTranferPoolSize = dataTranferPoolSize;
this.dataTransferPool = new ForkJoinPool(dataTranferPoolSize);
this.dataCommitPoolSize = dataCommitPoolSize;
+ this.unregisterThreadPoolSize = unregisterThreadPoolSize;
+ this.unregisterRequestTimeSec = unregisterRequestTimeSec;
}
private boolean sendShuffleDataAsync(
@@ -591,6 +598,50 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
dataTransferPool.shutdownNow();
}
+ @Override
+ public void unregisterShuffle(String appId, int shuffleId) {
+ RssUnregisterShuffleRequest request = new
RssUnregisterShuffleRequest(appId, shuffleId);
+ List<Callable<Void>> callableList = Lists.newArrayList();
+
+ shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+ callableList.add(() -> {
+ try {
+ ShuffleServerClient client =
+
ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType,
shuffleServerInfo);
+ RssUnregisterShuffleResponse response =
client.unregisterShuffle(request);
+ if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+ LOG.warn("Failed to unregister shuffle to " +
shuffleServerInfo);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error happened when unregistering to " +
shuffleServerInfo, e);
+ }
+ return null;
+ });
+ }
+ );
+
+ ExecutorService executorService = null;
+ try {
+ executorService =
+ Executors.newFixedThreadPool(
+ Math.min(unregisterThreadPoolSize, shuffleServerInfoSet.size()),
+ ThreadUtils.getThreadFactory("unregister-shuffle-%d")
+ );
+ List<Future<Void>> futures = executorService.invokeAll(callableList,
unregisterRequestTimeSec, TimeUnit.SECONDS);
+ for (Future<Void> future : futures) {
+ if (!future.isDone()) {
+ future.cancel(true);
+ }
+ }
+ } catch (InterruptedException ie) {
+ LOG.warn("Unregister shuffle is interrupted", ie);
+ } finally {
+ if (executorService != null) {
+ executorService.shutdownNow();
+ }
+ }
+ }
+
private void throwExceptionIfNecessary(ClientResponse response, String
errorMsg) {
if (response != null && response.getStatusCode() !=
ResponseStatusCode.SUCCESS) {
LOG.error(errorMsg);
diff --git
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
index 79661e76..71fdc637 100644
---
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
+++
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
@@ -41,7 +41,7 @@ public class ShuffleWriteClientImplTest {
@Test
public void testSendData() {
ShuffleWriteClientImpl shuffleWriteClient =
- new ShuffleWriteClientImpl("GRPC", 3, 2000, 4, 1, 1, 1, true, 1, 1);
+ new ShuffleWriteClientImpl("GRPC", 3, 2000, 4, 1, 1, 1, true, 1, 1,
10, 10);
ShuffleServerClient mockShuffleServerClient =
mock(ShuffleServerClient.class);
ShuffleWriteClientImpl spyClient = Mockito.spy(shuffleWriteClient);
doReturn(mockShuffleServerClient).when(spyClient).getShuffleServerClient(any());
diff --git a/docs/client_guide.md b/docs/client_guide.md
index b4c12193..9b5a208a 100644
--- a/docs/client_guide.md
+++ b/docs/client_guide.md
@@ -157,6 +157,8 @@ The important configuration is listed as following.
|---|---|---|
|spark.rss.writer.buffer.spill.size|128m|Buffer size for total partition data|
|spark.rss.client.send.size.limit|16m|The max data size sent to shuffle server|
+|spark.rss.client.unregister.thread.pool.size|10|The max size of thread pool
of unregistering|
+|spark.rss.client.unregister.request.timeout.sec|10|The max timeout sec when
doing unregister to remote shuffle-servers|
### MapReduce Specialized Setting
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
index c19a911f..2688704e 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
@@ -148,7 +148,7 @@ public class AssignmentWithTagsTest extends
CoordinatorTestBase {
@Test
public void testTags() throws Exception {
ShuffleWriteClientImpl shuffleWriteClient = new
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
- 1, 1, 1, true, 1, 1);
+ 1, 1, 1, true, 1, 1, 10, 10);
shuffleWriteClient.registerCoordinators(COORDINATOR_QUORUM);
// Case1 : only set the single default shuffle version tag
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
index c45c2e19..455bd69f 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
@@ -87,7 +87,7 @@ public class CoordinatorAssignmentTest extends
CoordinatorTestBase {
@Test
public void testSilentPeriod() throws Exception {
ShuffleWriteClientImpl shuffleWriteClient = new
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
- 1, 1, 1, true, 1, 1);
+ 1, 1, 1, true, 1, 1, 10, 10);
shuffleWriteClient.registerCoordinators(QUORUM);
// Case1: Disable silent period
@@ -112,7 +112,7 @@ public class CoordinatorAssignmentTest extends
CoordinatorTestBase {
@Test
public void testAssignmentServerNodesNumber() throws Exception {
ShuffleWriteClientImpl shuffleWriteClient = new
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
- 1, 1, 1, true, 1, 1);
+ 1, 1, 1, true, 1, 1, 10, 10);
shuffleWriteClient.registerCoordinators(COORDINATOR_QUORUM);
/**
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index 913f7726..2ea3d5c0 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -266,7 +266,7 @@ public class QuorumTest extends ShuffleReadWriteBase {
int replica, int replicaWrite, int replicaRead, boolean replicaSkip) {
shuffleWriteClientImpl = new
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
- replica, replicaWrite, replicaRead, replicaSkip, 1, 1);
+ replica, replicaWrite, replicaRead, replicaSkip, 1, 1, 10, 10);
List<ShuffleServerInfo> allServers =
Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1,
shuffleServerInfo2, shuffleServerInfo3, shuffleServerInfo4);
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index 280de66a..66eb4513 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -105,7 +105,7 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
public void clearResourceTest() throws Exception {
final ShuffleWriteClient shuffleWriteClient =
ShuffleClientFactory.getInstance().createShuffleWriteClient(
- "GRPC", 2, 10000L, 4, 1, 1, 1, true, 1, 1);
+ "GRPC", 2, 10000L, 4, 1, 1, 1, true, 1, 1, 10, 10);
shuffleWriteClient.registerCoordinators("127.0.0.1:19999");
shuffleWriteClient.registerShuffle(
new ShuffleServerInfo("127.0.0.1-20001", "127.0.0.1", 20001),
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 86e78cff..8df19926 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -93,7 +93,7 @@ public class ShuffleWithRssClientTest extends
ShuffleReadWriteBase {
@BeforeEach
public void createClient() {
shuffleWriteClientImpl = new
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
- 1, 1, 1, true, 1, 1);
+ 1, 1, 1, true, 1, 1, 10, 10);
}
@AfterEach
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHdfsTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHdfsTest.java
new file mode 100644
index 00000000..2eab10d4
--- /dev/null
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHdfsTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import scala.Tuple2;
+
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ShuffleUnregisterWithHdfsTest extends SparkIntegrationTestBase {
+
+ @BeforeAll
+ public static void setupServers() throws Exception {
+ CoordinatorConf coordinatorConf = getCoordinatorConf();
+ Map<String, String> dynamicConf = Maps.newHashMap();
+ dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
+ dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.HDFS.name());
+ addDynamicConf(coordinatorConf, dynamicConf);
+ createCoordinatorServer(coordinatorConf);
+ ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+ shuffleServerConf.setString("rss.storage.type", StorageType.HDFS.name());
+ createShuffleServer(shuffleServerConf);
+ startServers();
+ }
+
+ @Override
+ public void updateSparkConfCustomer(SparkConf sparkConf) {
+ }
+
+ private int runCounter = 0;
+
+ @Test
+ public void unregisterShuffleTest() throws Exception {
+ run();
+ }
+
+ @Override
+ public Map runTest(SparkSession spark, String fileName) throws Exception {
+ // take a rest to make sure shuffle server is registered
+ Thread.sleep(3000);
+ JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
+ JavaPairRDD<String, String> javaPairRDD1 =
jsc.parallelizePairs(Lists.newArrayList(
+ new Tuple2<>("a", "1"), new Tuple2<>("b", "2"),
+ new Tuple2<>("c", "3"), new Tuple2<>("d", "4")), 2);
+ JavaPairRDD<String, Iterable<String>> javaPairRDD =
javaPairRDD1.groupByKey().sortByKey();
+ Map map = javaPairRDD.collectAsMap();
+
+ // The second run will use the rss. and we should check the effectiveness
of unregisterShuffle method.
+ if (runCounter == 1) {
+ String basePath = HDFS_URI + "rss/test";
+ String appPath = fs.listStatus(new
Path(basePath))[0].getPath().toUri().getPath();
+
+ String shufflePath = appPath + "/0";
+ assertTrue(fs.exists(new Path(shufflePath)));
+
+ spark.sparkContext().env().blockManager().master().removeShuffle(0,
true);
+
+ // Wait some time to cleanup the shuffle resource for shuffle-server
+ Thread.sleep(1000);
+ assertFalse(fs.exists(new Path(shufflePath)));
+ assertTrue(fs.exists(new Path(appPath)));
+ } else {
+ runCounter++;
+ }
+ return map;
+ }
+}
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
new file mode 100644
index 00000000..a3dd18f5
--- /dev/null
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.io.File;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import scala.Tuple2;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ShuffleUnregisterWithLocalfileTest extends
SparkIntegrationTestBase {
+
+ @BeforeAll
+ public static void setupServers() throws Exception {
+ CoordinatorConf coordinatorConf = getCoordinatorConf();
+ Map<String, String> dynamicConf = Maps.newHashMap();
+ dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
+ dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.LOCALFILE.name());
+ addDynamicConf(coordinatorConf, dynamicConf);
+ createCoordinatorServer(coordinatorConf);
+ ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+ shuffleServerConf.setString("rss.storage.type",
StorageType.LOCALFILE.name());
+ createShuffleServer(shuffleServerConf);
+ startServers();
+ }
+
+ @Override
+ public void updateSparkConfCustomer(SparkConf sparkConf) {
+ }
+
+ private int runCounter = 0;
+
+ @Test
+ public void unregisterShuffleTest() throws Exception {
+ run();
+ }
+
+ @Override
+ public Map runTest(SparkSession spark, String fileName) throws Exception {
+ // take a rest to make sure shuffle server is registered
+ Thread.sleep(3000);
+ JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
+ JavaPairRDD<String, String> javaPairRDD1 =
jsc.parallelizePairs(Lists.newArrayList(
+ new Tuple2<>("a", "1"), new Tuple2<>("b", "2"),
+ new Tuple2<>("c", "3"), new Tuple2<>("d", "4")), 2);
+ JavaPairRDD<String, Iterable<String>> javaPairRDD =
javaPairRDD1.groupByKey().sortByKey();
+ Map map = javaPairRDD.collectAsMap();
+
+ // The second run will use the rss. and we should check the effectiveness
of unregisterShuffle method.
+ if (runCounter == 1) {
+ String path = shuffleServers.get(0).getShuffleServerConf()
+ .get(RssBaseConf.RSS_STORAGE_BASE_PATH).get(0);
+ String appPath = new File(path).listFiles()[0].getAbsolutePath();
+
+ String shufflePath = appPath + "/0";
+ assertTrue(new File(shufflePath).exists());
+
+ spark.sparkContext().env().blockManager().master().removeShuffle(0,
true);
+
+ // Wait some time to cleanup the shuffle resource for shuffle-server
+ Thread.sleep(1000);
+ assertFalse(new File(shufflePath).exists());
+ assertTrue(new File(appPath).exists());
+ } else {
+ runCounter++;
+ }
+ return map;
+ }
+}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
index f9873c8b..ab165072 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
@@ -28,6 +28,7 @@ import
org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
import org.apache.uniffle.client.response.RssFinishShuffleResponse;
import org.apache.uniffle.client.response.RssGetInMemoryShuffleDataResponse;
@@ -38,9 +39,12 @@ import
org.apache.uniffle.client.response.RssRegisterShuffleResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssSendCommitResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
public interface ShuffleServerClient {
+ RssUnregisterShuffleResponse unregisterShuffle(RssUnregisterShuffleRequest
request);
+
RssRegisterShuffleResponse registerShuffle(RssRegisterShuffleRequest
request);
RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest
request);
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 07658246..7f4fa076 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -40,6 +40,7 @@ import
org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
import org.apache.uniffle.client.response.RssFinishShuffleResponse;
@@ -51,6 +52,7 @@ import
org.apache.uniffle.client.response.RssRegisterShuffleResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssSendCommitResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
@@ -58,6 +60,7 @@ import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.exception.NotRetryException;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.RetryUtils;
+import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.RssProtos.AppHeartBeatRequest;
import org.apache.uniffle.proto.RssProtos.AppHeartBeatResponse;
import org.apache.uniffle.proto.RssProtos.FinishShuffleRequest;
@@ -206,6 +209,35 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
return result;
}
+ private RssProtos.ShuffleUnregisterResponse doUnregisterShuffle(String
appId, int shuffleId) {
+ RssProtos.ShuffleUnregisterRequest request =
RssProtos.ShuffleUnregisterRequest.newBuilder()
+ .setAppId(appId)
+ .setShuffleId(shuffleId)
+ .build();
+ return blockingStub.unregisterShuffle(request);
+ }
+
+ @Override
+ public RssUnregisterShuffleResponse
unregisterShuffle(RssUnregisterShuffleRequest request) {
+ RssProtos.ShuffleUnregisterResponse rpcResponse =
doUnregisterShuffle(request.getAppId(), request.getShuffleId());
+
+ RssUnregisterShuffleResponse response;
+ StatusCode statusCode = rpcResponse.getStatus();
+
+ switch (statusCode) {
+ case SUCCESS:
+ response = new
RssUnregisterShuffleResponse(ResponseStatusCode.SUCCESS);
+ break;
+ default:
+ String msg = String.format("Errors on unregister shuffle to %s:%s for
appId[%s].shuffleId[%], error: %s",
+ host, port, request.getAppId(), request.getShuffleId(),
rpcResponse.getRetMsg());
+ LOG.error(msg);
+ throw new RssException(msg);
+ }
+
+ return response;
+ }
+
@Override
public RssRegisterShuffleResponse registerShuffle(RssRegisterShuffleRequest
request) {
ShuffleRegisterResponse rpcResponse = doRegisterShuffle(
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleRequest.java
new file mode 100644
index 00000000..317e27ab
--- /dev/null
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleRequest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.request;
+
+public class RssUnregisterShuffleRequest {
+ private String appId;
+ private int shuffleId;
+
+ public RssUnregisterShuffleRequest(String appId, int shuffleId) {
+ this.appId = appId;
+ this.shuffleId = shuffleId;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public int getShuffleId() {
+ return shuffleId;
+ }
+}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleResponse.java
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleResponse.java
new file mode 100644
index 00000000..b872522e
--- /dev/null
+++
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleResponse.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+public class RssUnregisterShuffleResponse extends ClientResponse {
+
+ public RssUnregisterShuffleResponse(ResponseStatusCode statusCode) {
+ super(statusCode);
+ }
+
+}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 26cd8b51..c2111bb0 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -25,6 +25,7 @@ package rss.common;
service ShuffleServer {
rpc registerShuffle (ShuffleRegisterRequest) returns
(ShuffleRegisterResponse);
+ rpc unregisterShuffle(ShuffleUnregisterRequest) returns
(ShuffleUnregisterResponse);
rpc sendShuffleData (SendShuffleDataRequest) returns
(SendShuffleDataResponse);
rpc getLocalShuffleIndex (GetLocalShuffleIndexRequest) returns
(GetLocalShuffleIndexResponse);
rpc getLocalShuffleData (GetLocalShuffleDataRequest) returns
(GetLocalShuffleDataResponse);
@@ -167,6 +168,16 @@ message ShuffleRegisterRequest {
string user = 5;
}
+message ShuffleUnregisterRequest {
+ string appId = 1;
+ int32 shuffleId = 2;
+}
+
+message ShuffleUnregisterResponse {
+ StatusCode status = 1;
+ string retMsg = 2;
+}
+
message ShuffleRegisterResponse {
StatusCode status = 1;
string retMsg = 2;
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index daae92df..6c6ece9e 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.server;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -337,6 +338,11 @@ public class ShuffleFlushManager {
}
}
+ public void removeResourcesOfShuffleId(String appId, int shuffleId) {
+ Optional.ofNullable(handlers.get(appId)).ifPresent(x ->
x.remove(shuffleId));
+ Optional.ofNullable(committedBlockIds.get(appId)).ifPresent(x ->
x.remove(shuffleId));
+ }
+
private static class PendingShuffleFlushEvent {
private final ShuffleDataFlushEvent event;
private final long createTimeStamp = System.currentTimeMillis();
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index e08fd1c8..b7a8ab20 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -105,6 +105,29 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
}
}
+ @Override
+ public void unregisterShuffle(RssProtos.ShuffleUnregisterRequest request,
+ StreamObserver<RssProtos.ShuffleUnregisterResponse>
responseStreamObserver) {
+ String appId = request.getAppId();
+ int shuffleId = request.getShuffleId();
+
+ StatusCode result = StatusCode.SUCCESS;
+ String responseMessage = "OK";
+ try {
+ shuffleServer.getShuffleTaskManager().removeShuffleDataAsync(appId,
shuffleId);
+ } catch (Exception e) {
+ result = StatusCode.INTERNAL_ERROR;
+ }
+
+ RssProtos.ShuffleUnregisterResponse reply =
RssProtos.ShuffleUnregisterResponse
+ .newBuilder()
+ .setStatus(valueOf(result))
+ .setRetMsg(responseMessage)
+ .build();
+ responseStreamObserver.onNext(reply);
+ responseStreamObserver.onCompleted();
+ }
+
@Override
public void registerShuffle(ShuffleRegisterRequest req,
StreamObserver<ShuffleRegisterResponse> responseObserver) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index a26006ae..2ec3d6c9 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -18,9 +18,12 @@
package org.apache.uniffle.server;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
@@ -34,6 +37,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
import org.roaringbitmap.longlong.LongIterator;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
@@ -51,6 +55,9 @@ import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+import org.apache.uniffle.server.event.AppPurgeEvent;
+import org.apache.uniffle.server.event.PurgeEvent;
+import org.apache.uniffle.server.event.ShufflePurgeEvent;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.common.StorageReadMetrics;
@@ -78,7 +85,7 @@ public class ShuffleTaskManager {
private Map<String, ShuffleTaskInfo> shuffleTaskInfos =
Maps.newConcurrentMap();
private Map<Long, PreAllocatedBufferInfo> requireBufferIds =
Maps.newConcurrentMap();
private Runnable clearResourceThread;
- private BlockingQueue<String> expiredAppIdQueue =
Queues.newLinkedBlockingQueue();
+ private BlockingQueue<PurgeEvent> expiredAppIdQueue =
Queues.newLinkedBlockingQueue();
// appId -> shuffleId -> serverReadHandler
public ShuffleTaskManager(
@@ -109,8 +116,13 @@ public class ShuffleTaskManager {
clearResourceThread = () -> {
while (true) {
try {
- String appId = expiredAppIdQueue.take();
- removeResources(appId);
+ PurgeEvent event = expiredAppIdQueue.take();
+ if (event instanceof AppPurgeEvent) {
+ removeResources(event.getAppId());
+ }
+ if (event instanceof ShufflePurgeEvent) {
+ removeResourcesByShuffleIds(event.getAppId(), ((ShufflePurgeEvent)
event).getShuffleIds());
+ }
} catch (Exception e) {
LOG.error("Exception happened when clear resource for expired
application", e);
}
@@ -369,7 +381,7 @@ public class ShuffleTaskManager {
if (System.currentTimeMillis() -
shuffleTaskInfos.get(appId).getCurrentTimes() > appExpiredWithoutHB) {
LOG.info("Detect expired appId[" + appId + "] according "
+ "to rss.server.app.expired.withoutHeartbeat");
- expiredAppIdQueue.add(appId);
+ expiredAppIdQueue.add(new AppPurgeEvent(appId,
getUserByAppId(appId)));
}
}
ShuffleServerMetrics.gaugeAppNum.set(shuffleTaskInfos.size());
@@ -378,6 +390,42 @@ public class ShuffleTaskManager {
}
}
+ /**
+ * Clear up the partial resources of shuffleIds of App.
+ * @param appId
+ * @param shuffleIds
+ */
+ public void removeResourcesByShuffleIds(String appId, List<Integer>
shuffleIds) {
+ if (CollectionUtils.isEmpty(shuffleIds)) {
+ return;
+ }
+
+ LOG.info("Start remove resource for appId[{}], shuffleIds[{}]", appId,
shuffleIds);
+ final long start = System.currentTimeMillis();
+ final ShuffleTaskInfo taskInfo = shuffleTaskInfos.get(appId);
+ if (taskInfo != null) {
+ for (Integer shuffleId : shuffleIds) {
+ Optional.ofNullable(taskInfo).ifPresent(x ->
x.getCachedBlockIds().remove(shuffleId));
+ Optional.ofNullable(taskInfo).ifPresent(x ->
x.getCommitCounts().remove(shuffleId));
+ Optional.ofNullable(taskInfo).ifPresent(x ->
x.getCommitLocks().remove(shuffleId));
+ }
+ }
+ Optional.ofNullable(partitionsToBlockIds.get(appId)).ifPresent(x -> {
+ for (Integer shuffleId : shuffleIds) {
+ x.remove(shuffleId);
+ }
+ });
+ shuffleBufferManager.removeBufferByShuffleId(appId, shuffleIds.toArray(new
Integer[0]));
+ for (Integer shuffleId : shuffleIds) {
+ shuffleFlushManager.removeResourcesOfShuffleId(appId, shuffleId);
+ }
+ storageManager.removeResources(
+ new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds)
+ );
+ LOG.info("Finish remove resource for appId[{}], shuffleIds[{}], cost[{}]",
+ appId, shuffleIds, System.currentTimeMillis() - start);
+ }
+
@VisibleForTesting
public void removeResources(String appId) {
LOG.info("Start remove resource for appId[" + appId + "]");
@@ -387,7 +435,9 @@ public class ShuffleTaskManager {
shuffleBufferManager.removeBuffer(appId);
shuffleFlushManager.removeResources(appId);
if (!shuffleToCachedBlockIds.isEmpty()) {
- storageManager.removeResources(appId, shuffleToCachedBlockIds.keySet(),
getUserByAppId(appId));
+ storageManager.removeResources(
+ new AppPurgeEvent(appId, getUserByAppId(appId), new
ArrayList<>(shuffleToCachedBlockIds.keySet()))
+ );
}
shuffleTaskInfos.remove(appId);
LOG.info("Finish remove resource for appId[" + appId + "] cost " +
(System.currentTimeMillis() - start) + " ms");
@@ -443,4 +493,13 @@ public class ShuffleTaskManager {
public Map<String, Map<Integer, Roaring64NavigableMap[]>>
getPartitionsToBlockIds() {
return partitionsToBlockIds;
}
+
+ public void removeShuffleDataAsync(String appId, int shuffleId) {
+ expiredAppIdQueue.add(new ShufflePurgeEvent(appId, getUserByAppId(appId),
Arrays.asList(shuffleId)));
+ }
+
+ @VisibleForTesting
+ void removeShuffleDataSync(String appId, int shuffleId) {
+ removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId));
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index c0ce2f4f..99ae9c6b 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -24,6 +24,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@@ -209,21 +210,10 @@ public class ShuffleBufferManager {
if (shuffleIdToBuffers == null) {
return;
}
- // calculate released size
- long size = 0;
- for (RangeMap<Integer, ShuffleBuffer> rangeMap :
shuffleIdToBuffers.values()) {
- if (rangeMap != null) {
- Collection<ShuffleBuffer> buffers = rangeMap.asMapOfRanges().values();
- if (buffers != null) {
- for (ShuffleBuffer buffer : buffers) {
- ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
- size += buffer.getSize();
- }
- }
- }
- }
- // release memory
- releaseMemory(size, false, false);
+ removeBufferByShuffleId(
+ appId,
+
shuffleIdToBuffers.keySet().stream().collect(Collectors.toList()).toArray(new
Integer[0])
+ );
shuffleSizeMap.remove(appId);
bufferPool.remove(appId);
}
@@ -459,4 +449,33 @@ public class ShuffleBufferManager {
Set<Integer> shuffleIdSet = pickedShuffle.get(appId);
shuffleIdSet.add(shuffleId);
}
+
+ public void removeBufferByShuffleId(String appId, Integer... shuffleIds) {
+ Map<Integer, RangeMap<Integer, ShuffleBuffer>> shuffleIdToBuffers =
bufferPool.get(appId);
+ if (shuffleIdToBuffers == null) {
+ return;
+ }
+
+ Map<Integer, AtomicLong> shuffleIdToSizeMap = shuffleSizeMap.get(appId);
+ for (int shuffleId : shuffleIds) {
+ long size = 0;
+
+ RangeMap<Integer, ShuffleBuffer> bufferRangeMap =
shuffleIdToBuffers.get(shuffleId);
+ if (bufferRangeMap == null) {
+ continue;
+ }
+ Collection<ShuffleBuffer> buffers =
bufferRangeMap.asMapOfRanges().values();
+ if (buffers != null) {
+ for (ShuffleBuffer buffer : buffers) {
+ ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
+ size += buffer.getSize();
+ }
+ }
+ releaseMemory(size, false, false);
+ if (shuffleIdToSizeMap != null) {
+ shuffleIdToSizeMap.remove(shuffleId);
+ }
+ shuffleIdToBuffers.remove(shuffleId);
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java
b/server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java
new file mode 100644
index 00000000..804d3849
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.event;
+
+import java.util.List;
+
+public class AppPurgeEvent extends PurgeEvent {
+
+ public AppPurgeEvent(String appId, String user, List<Integer> shuffleIds) {
+ super(appId, user, shuffleIds);
+ }
+
+ public AppPurgeEvent(String appId, String user) {
+ super(appId, user, null);
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java
b/server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java
new file mode 100644
index 00000000..a63f04c7
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.event;
+
+import java.util.List;
+
+// todo: introduce the unified abstract dispatcher to handle events,
+// mentioned in
https://github.com/apache/incubator-uniffle/pull/249#discussion_r983001435
+public abstract class PurgeEvent {
+ private String appId;
+ private String user;
+ private List<Integer> shuffleIds;
+
+ public PurgeEvent(String appId, String user, List<Integer> shuffleIds) {
+ this.appId = appId;
+ this.user = user;
+ this.shuffleIds = shuffleIds;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public List<Integer> getShuffleIds() {
+ return shuffleIds;
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName() + "{"
+ + "appId='" + appId + '\''
+ + ", user='" + user + '\''
+ + ", shuffleIds=" + shuffleIds
+ + '}';
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java
b/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java
new file mode 100644
index 00000000..cbc39aab
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.event;
+
+import java.util.List;
+
+public class ShufflePurgeEvent extends PurgeEvent {
+
+ public ShufflePurgeEvent(String appId, String user, List<Integer>
shuffleIds) {
+ super(appId, user, shuffleIds);
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
index 7c0c0dd3..3087952e 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
@@ -17,9 +17,9 @@
package org.apache.uniffle.server.storage;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
@@ -38,11 +38,14 @@ import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleDataReadEvent;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.server.event.AppPurgeEvent;
+import org.apache.uniffle.server.event.PurgeEvent;
import org.apache.uniffle.storage.common.HdfsStorage;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.factory.ShuffleHandlerFactory;
import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
import org.apache.uniffle.storage.request.CreateShuffleDeleteHandlerRequest;
+import org.apache.uniffle.storage.util.ShuffleStorageUtils;
import org.apache.uniffle.storage.util.StorageType;
public class HdfsStorageManager extends SingleStorageManager {
@@ -75,15 +78,32 @@ public class HdfsStorageManager extends
SingleStorageManager {
}
@Override
- public void removeResources(String appId, Set<Integer> shuffleSet, String
user) {
+ public void removeResources(PurgeEvent event) {
+ String appId = event.getAppId();
+ String user = event.getUser();
HdfsStorage storage = getStorageByAppId(appId);
if (storage != null) {
- storage.removeHandlers(appId);
+ if (event instanceof AppPurgeEvent) {
+ storage.removeHandlers(appId);
+ }
appIdToStorages.remove(appId);
- ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory.getInstance()
+ ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory
+ .getInstance()
.createShuffleDeleteHandler(
- new CreateShuffleDeleteHandlerRequest(StorageType.HDFS.name(),
storage.getConf()));
- deleteHandler.delete(new String[] {storage.getStoragePath()}, appId,
user);
+ new CreateShuffleDeleteHandlerRequest(StorageType.HDFS.name(),
storage.getConf())
+ );
+
+ String basicPath =
ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId);
+ List<String> deletePaths = new ArrayList<>();
+
+ if (event instanceof AppPurgeEvent) {
+ deletePaths.add(basicPath);
+ } else {
+ for (Integer shuffleId : event.getShuffleIds()) {
+
deletePaths.add(ShuffleStorageUtils.getFullShuffleDataFolder(basicPath,
String.valueOf(shuffleId)));
+ }
+ }
+ deleteHandler.delete(deletePaths.toArray(new String[0]), appId, user);
}
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 1bbc6298..5c47e632 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -17,9 +17,12 @@
package org.apache.uniffle.server.storage;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -44,6 +47,9 @@ import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleDataReadEvent;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.server.event.AppPurgeEvent;
+import org.apache.uniffle.server.event.PurgeEvent;
+import org.apache.uniffle.server.event.ShufflePurgeEvent;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.factory.ShuffleHandlerFactory;
@@ -171,10 +177,16 @@ public class LocalStorageManager extends
SingleStorageManager {
}
@Override
- public void removeResources(String appId, Set<Integer> shuffleSet, String
user) {
+ public void removeResources(PurgeEvent event) {
+ String appId = event.getAppId();
+ String user = event.getUser();
+ List<Integer> shuffleSet =
Optional.ofNullable(event.getShuffleIds()).orElse(Collections.emptyList());
+
for (LocalStorage storage : localStorages) {
- for (Integer shuffleId : shuffleSet) {
+ if (event instanceof AppPurgeEvent) {
storage.removeHandlers(appId);
+ }
+ for (Integer shuffleId : shuffleSet) {
storage.removeResources(RssUtils.generateShuffleKey(appId, shuffleId));
}
}
@@ -182,7 +194,23 @@ public class LocalStorageManager extends
SingleStorageManager {
ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory.getInstance()
.createShuffleDeleteHandler(
new
CreateShuffleDeleteHandlerRequest(StorageType.LOCALFILE.name(), new
Configuration()));
- deleteHandler.delete(storageBasePaths.toArray(new
String[storageBasePaths.size()]), appId, user);
+
+ List<String> deletePaths = storageBasePaths.stream().flatMap(path -> {
+ String basicPath = ShuffleStorageUtils.getFullShuffleDataFolder(path,
appId);
+ if (event instanceof ShufflePurgeEvent) {
+ List<String> paths = new ArrayList<>();
+ for (int shuffleId : shuffleSet) {
+ paths.add(
+ ShuffleStorageUtils.getFullShuffleDataFolder(basicPath,
String.valueOf(shuffleId))
+ );
+ }
+ return paths.stream();
+ } else {
+ return Arrays.asList(basicPath).stream();
+ }
+ }).collect(Collectors.toList());
+
+ deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]),
appId, user);
}
@Override
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
index d4fff0a9..2699a3f9 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
@@ -18,7 +18,6 @@
package org.apache.uniffle.server.storage;
import java.io.IOException;
-import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,6 +27,7 @@ import org.apache.uniffle.server.Checker;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleDataReadEvent;
import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.event.PurgeEvent;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
@@ -112,10 +112,10 @@ public class MultiStorageManager implements
StorageManager {
}
@Override
- public void removeResources(String appId, Set<Integer> shuffleSet, String
user) {
- LOG.info("Start to remove resource of appId: {}, shuffles: {}", appId,
shuffleSet.toString());
- warmStorageManager.removeResources(appId, shuffleSet, user);
- coldStorageManager.removeResources(appId, shuffleSet, user);
+ public void removeResources(PurgeEvent event) {
+ LOG.info("Start to remove resource of {}", event);
+ warmStorageManager.removeResources(event);
+ coldStorageManager.removeResources(event);
}
public StorageManager getColdStorageManager() {
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
index d8fe5c9e..0960ca37 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
@@ -17,12 +17,11 @@
package org.apache.uniffle.server.storage;
-import java.util.Set;
-
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.server.Checker;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleDataReadEvent;
+import org.apache.uniffle.server.event.PurgeEvent;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
@@ -39,7 +38,7 @@ public interface StorageManager {
// todo: add an interface for updateReadMetrics
- void removeResources(String appId, Set<Integer> shuffleSet, String user);
+ void removeResources(PurgeEvent event);
void start();
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
index 4800c4d2..69d55fb0 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
@@ -18,11 +18,11 @@
package org.apache.uniffle.server;
import java.io.FileNotFoundException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import com.google.common.collect.Sets;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Level;
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.KerberizedHdfsBase;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+import org.apache.uniffle.server.event.AppPurgeEvent;
import org.apache.uniffle.server.storage.HdfsStorageManager;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.server.storage.StorageManagerFactory;
@@ -134,7 +135,9 @@ public class ShuffleFlushManagerOnKerberizedHdfsTest
extends KerberizedHdfsBase
manager.removeResources(appId1);
assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
- storageManager.removeResources(appId1, Sets.newHashSet(1), "alex");
+ storageManager.removeResources(
+ new AppPurgeEvent(appId1, "alex", Arrays.asList(1))
+ );
assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
try {
kerberizedHdfs.getFileSystem().listStatus(new
Path(remoteStorage.getPath() + "/" + appId1 + "/"));
@@ -151,7 +154,9 @@ public class ShuffleFlushManagerOnKerberizedHdfsTest
extends KerberizedHdfsBase
assertEquals(1, size);
manager.removeResources(appId2);
assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
- storageManager.removeResources(appId2, Sets.newHashSet(1), "alex");
+ storageManager.removeResources(
+ new AppPurgeEvent(appId2, "alex", Arrays.asList(1))
+ );
assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
assertEquals(0, manager.getCommittedBlockIds(appId2,
1).getLongCardinality());
size = storage.getHandlerSize();
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 2ae581e7..5f1bb47d 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -52,6 +52,7 @@ import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+import org.apache.uniffle.server.event.AppPurgeEvent;
import org.apache.uniffle.server.storage.HdfsStorageManager;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.server.storage.StorageManagerFactory;
@@ -229,7 +230,9 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
manager.removeResources(appId1);
assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
- storageManager.removeResources(appId1, Sets.newHashSet(1),
StringUtils.EMPTY);
+ storageManager.removeResources(
+ new AppPurgeEvent(appId1, StringUtils.EMPTY, Arrays.asList(1))
+ );
assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
try {
fs.listStatus(new Path(remoteStorage.getPath() + "/" + appId1 + "/"));
@@ -244,7 +247,9 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
assertEquals(1, size);
manager.removeResources(appId2);
assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
- storageManager.removeResources(appId2, Sets.newHashSet(1),
StringUtils.EMPTY);
+ storageManager.removeResources(
+ new AppPurgeEvent(appId2, StringUtils.EMPTY, Arrays.asList(1))
+ );
assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
assertEquals(0, manager.getCommittedBlockIds(appId2,
1).getLongCardinality());
size = storage.getHandlerSize();
@@ -253,7 +258,9 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
// but thecache from appIdToStorages has removed, so we need to delete
this path in hdfs
Path path = new Path(remoteStorage.getPath() + "/" + appId2 + "/");
assertTrue(fs.mkdirs(path));
- storageManager.removeResources(appId2, Sets.newHashSet(1),
StringUtils.EMPTY);
+ storageManager.removeResources(
+ new AppPurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(1))
+ );
assertFalse(fs.exists(path));
HdfsStorage storageByAppId = ((HdfsStorageManager)
storageManager).getStorageByAppId(appId2);
assertNull(storageByAppId);
@@ -286,7 +293,9 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
assertEquals(2, storage.getHandlerSize());
File file = new File(tempDir, appId1);
assertTrue(file.exists());
- storageManager.removeResources(appId1, Sets.newHashSet(1),
StringUtils.EMPTY);
+ storageManager.removeResources(
+ new AppPurgeEvent(appId1, StringUtils.EMPTY, Lists.newArrayList(1))
+ );
manager.removeResources(appId1);
assertFalse(file.exists());
ShuffleDataFlushEvent event3 =
@@ -297,7 +306,9 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
assertEquals(5, manager.getCommittedBlockIds(appId2,
1).getLongCardinality());
assertEquals(1, storage.getHandlerSize());
manager.removeResources(appId2);
- storageManager.removeResources(appId2, Sets.newHashSet(1),
StringUtils.EMPTY);
+ storageManager.removeResources(
+ new AppPurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(1))
+ );
assertEquals(0, manager.getCommittedBlockIds(appId2,
1).getLongCardinality());
assertEquals(0, storage.getHandlerSize());
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 2c8f54a3..5a869b43 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -17,6 +17,8 @@
package org.apache.uniffle.server;
+import java.io.File;
+import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -30,6 +32,7 @@ import com.google.common.collect.RangeMap;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -49,10 +52,13 @@ import
org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.storage.HdfsTestBase;
import org.apache.uniffle.storage.handler.impl.HdfsClientReadHandler;
+import org.apache.uniffle.storage.util.ShuffleStorageUtils;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -251,6 +257,141 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
}
}
+ /**
+ * Clean up the shuffle data of stage level for one app
+ * @throws Exception
+ */
+ @Test
+ public void removeShuffleDataWithHdfsTest() throws Exception {
+ String confFile = ClassLoader.getSystemResource("server.conf").getFile();
+ ShuffleServerConf conf = new ShuffleServerConf(confFile);
+ String storageBasePath = HDFS_URI + "rss/clearTest";
+ conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
+ conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
+ conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
+ conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
+ conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
+ conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
50.0);
+ conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
0.0);
+ conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
+ conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
+ conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
+
+ ShuffleServer shuffleServer = new ShuffleServer(conf);
+
+ ShuffleBufferManager shuffleBufferManager =
shuffleServer.getShuffleBufferManager();
+ ShuffleFlushManager shuffleFlushManager =
shuffleServer.getShuffleFlushManager();
+ StorageManager storageManager = shuffleServer.getStorageManager();
+ ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(
+ conf, shuffleFlushManager, shuffleBufferManager, storageManager);
+
+ String appId = "removeShuffleDataTest1";
+ for (int i = 0; i < 4; i++) {
+ shuffleTaskManager.registerShuffle(
+ appId,
+ i,
+ Lists.newArrayList(new PartitionRange(0, 1)),
+ new RemoteStorageInfo(storageBasePath, Maps.newHashMap()),
+ StringUtils.EMPTY
+ );
+ }
+ shuffleTaskManager.refreshAppId(appId);
+
+ assertEquals(1, shuffleTaskManager.getAppIds().size());
+
+ ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
+
+ shuffleTaskManager.requireBuffer(35);
+ shuffleTaskManager.requireBuffer(35);
+ shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0);
+ shuffleTaskManager.updateCachedBlockIds(appId, 0,
partitionedData0.getBlockList());
+ shuffleTaskManager.cacheShuffleData(appId, 1, false, partitionedData0);
+ shuffleTaskManager.updateCachedBlockIds(appId, 1,
partitionedData0.getBlockList());
+ shuffleTaskManager.refreshAppId(appId);
+ shuffleTaskManager.checkResourceStatus();
+
+ assertEquals(1, shuffleTaskManager.getAppIds().size());
+
+ RangeMap<Integer, ShuffleBuffer> rangeMap =
shuffleBufferManager.getBufferPool().get(appId).get(0);
+ assertFalse(rangeMap.asMapOfRanges().isEmpty());
+ shuffleTaskManager.commitShuffle(appId, 0);
+
+ // Before removing shuffle resources
+ String appBasePath =
ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath, appId);
+ String shufflePath0 =
ShuffleStorageUtils.getFullShuffleDataFolder(appBasePath, "0");
+ assertTrue(fs.exists(new Path(shufflePath0)));
+
+ // After removing the shuffle id of 0 resources
+ shuffleTaskManager.removeShuffleDataSync(appId, 0);
+ assertFalse(fs.exists(new Path(shufflePath0)));
+ assertTrue(fs.exists(new Path(appBasePath)));
+ assertNull(shuffleBufferManager.getBufferPool().get(appId).get(0));
+ assertNotNull(shuffleBufferManager.getBufferPool().get(appId).get(1));
+ }
+
+ @Test
+ public void removeShuffleDataWithLocalfileTest() throws Exception {
+ String confFile = ClassLoader.getSystemResource("server.conf").getFile();
+ ShuffleServerConf conf = new ShuffleServerConf(confFile);
+ conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
+ conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
+ conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
+ conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
+ conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 1000L);
+ conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
50.0);
+ conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
0.0);
+ conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
+ conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 100000L);
+ conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
+
+ conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, "LOCALFILE");
+ java.nio.file.Path path1 =
Files.createTempDirectory("removeShuffleDataWithLocalfileTest");
+ java.nio.file.Path path2 =
Files.createTempDirectory("removeShuffleDataWithLocalfileTest");
+ conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH.key(),
+ path1.toAbsolutePath().toString() + "," +
path2.toAbsolutePath().toString());
+
+ ShuffleServer shuffleServer = new ShuffleServer(conf);
+
+ ShuffleBufferManager shuffleBufferManager =
shuffleServer.getShuffleBufferManager();
+ ShuffleFlushManager shuffleFlushManager =
shuffleServer.getShuffleFlushManager();
+ StorageManager storageManager = shuffleServer.getStorageManager();
+ ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(
+ conf, shuffleFlushManager, shuffleBufferManager, storageManager);
+
+ String appId = "removeShuffleDataWithLocalfileTest";
+
+ int shuffleNum = 4;
+ for (int i = 0; i < shuffleNum; i++) {
+ shuffleTaskManager.registerShuffle(
+ appId,
+ i,
+ Lists.newArrayList(new PartitionRange(0, 1)),
+ RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
+ StringUtils.EMPTY
+ );
+
+ ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1,
35);
+ shuffleTaskManager.requireBuffer(35);
+ shuffleTaskManager.cacheShuffleData(appId, i, false, partitionedData0);
+ shuffleTaskManager.updateCachedBlockIds(appId, i,
partitionedData0.getBlockList());
+ }
+
+ assertEquals(1, shuffleTaskManager.getAppIds().size());
+
+ for (int i = 0; i < shuffleNum; i++) {
+ shuffleTaskManager.commitShuffle(appId, i);
+ shuffleTaskManager.removeShuffleDataSync(appId, i);
+ }
+
+ for (String path : conf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH)) {
+ String appPath = path + "/" + appId;
+ File[] files = new File(appPath).listFiles();
+ if (files != null) {
+ assertEquals(0, files.length);
+ }
+ }
+ }
+
@Test
public void clearTest() throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
diff --git a/spark-patches/spark-2.4.6_dynamic_allocation_support.patch
b/spark-patches/spark-2.4.6_dynamic_allocation_support.patch
index 88f895c5..22581d46 100644
--- a/spark-patches/spark-2.4.6_dynamic_allocation_support.patch
+++ b/spark-patches/spark-2.4.6_dynamic_allocation_support.patch
@@ -15,21 +15,6 @@
# limitations under the License.
#
-diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala
b/core/src/main/scala/org/apache/spark/Dependency.scala
-index 9ea6d2fa2f..d6101cdcef 100644
---- a/core/src/main/scala/org/apache/spark/Dependency.scala
-+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
-@@ -93,7 +93,9 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C:
ClassTag](
- val shuffleHandle: ShuffleHandle =
_rdd.context.env.shuffleManager.registerShuffle(
- shuffleId, _rdd.partitions.length, this)
-
-- _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
-+ if (!_rdd.context.getConf.isRssEnable()) {
-+ _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
-+ }
- }
-
-
diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 38bed797a0..a95d4c7d77 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
diff --git a/spark-patches/spark-3.1.2_dynamic_allocation_support.patch
b/spark-patches/spark-3.1.2_dynamic_allocation_support.patch
index c3431dca..d35c3fcd 100644
--- a/spark-patches/spark-3.1.2_dynamic_allocation_support.patch
+++ b/spark-patches/spark-3.1.2_dynamic_allocation_support.patch
@@ -15,23 +15,6 @@
# limitations under the License.
#
-diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala
b/core/src/main/scala/org/apache/spark/Dependency.scala
-index d21b9d9833..fe4507f81f 100644
---- a/core/src/main/scala/org/apache/spark/Dependency.scala
-+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
-@@ -110,8 +110,10 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C:
ClassTag](
-
- def getMergerLocs: Seq[BlockManagerId] = mergerLocs
-
-- _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
-- _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
-+ if (!_rdd.context.getConf.isRssEnable()) {
-+ _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
-+ _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
-+ }
- }
-
-
diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index bdb768ed5a..ffde0643c5 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
diff --git a/spark-patches/spark-3.2.1_dynamic_allocation_support.patch
b/spark-patches/spark-3.2.1_dynamic_allocation_support.patch
index e170ae28..81046dec 100644
--- a/spark-patches/spark-3.2.1_dynamic_allocation_support.patch
+++ b/spark-patches/spark-3.2.1_dynamic_allocation_support.patch
@@ -15,23 +15,6 @@
# limitations under the License.
#
-diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala
b/core/src/main/scala/org/apache/spark/Dependency.scala
-index 1b4e7ba5106..95818ff72ca 100644
---- a/core/src/main/scala/org/apache/spark/Dependency.scala
-+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
-@@ -174,8 +174,10 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C:
ClassTag](
- !rdd.isBarrier()
- }
-
-- _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
-- _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
-+ if (!_rdd.context.getConf.isRssEnable()) {
-+ _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
-+ _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
-+ }
- }
-
-
diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index c4b619300b5..821a01985d9 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java
index d1201a87..43994d69 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java
@@ -25,7 +25,6 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
-import org.apache.uniffle.storage.util.ShuffleStorageUtils;
public class HdfsShuffleDeleteHandler implements ShuffleDeleteHandler {
@@ -39,33 +38,35 @@ public class HdfsShuffleDeleteHandler implements
ShuffleDeleteHandler {
@Override
public void delete(String[] storageBasePaths, String appId, String user) {
- Path path = new
Path(ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePaths[0], appId));
- boolean isSuccess = false;
- int times = 0;
- int retryMax = 5;
- long start = System.currentTimeMillis();
- LOG.info("Try delete shuffle data in HDFS for appId[{}] of user[{}] with
{}",appId, user, path);
- while (!isSuccess && times < retryMax) {
- try {
- FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(user,
path, hadoopConf);
- fileSystem.delete(path, true);
- isSuccess = true;
- } catch (Exception e) {
- times++;
- LOG.warn("Can't delete shuffle data for appId[" + appId + "] with " +
times + " times", e);
+ for (String deletePath : storageBasePaths) {
+ final Path path = new Path(deletePath);
+ boolean isSuccess = false;
+ int times = 0;
+ int retryMax = 5;
+ long start = System.currentTimeMillis();
+ LOG.info("Try delete shuffle data in HDFS for appId[{}] of user[{}] with
{}",appId, user, path);
+ while (!isSuccess && times < retryMax) {
try {
- Thread.sleep(1000);
- } catch (Exception ex) {
- LOG.warn("Exception happened when Thread.sleep", ex);
+ FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(user,
path, hadoopConf);
+ fileSystem.delete(path, true);
+ isSuccess = true;
+ } catch (Exception e) {
+ times++;
+ LOG.warn("Can't delete shuffle data for appId[" + appId + "] with "
+ times + " times", e);
+ try {
+ Thread.sleep(1000);
+ } catch (Exception ex) {
+ LOG.warn("Exception happened when Thread.sleep", ex);
+ }
}
}
- }
- if (isSuccess) {
- LOG.info("Delete shuffle data in HDFS for appId[" + appId + "] with " +
path + " successfully in "
- + (System.currentTimeMillis() - start) + " ms");
- } else {
- LOG.info("Failed to delete shuffle data in HDFS for appId[" + appId + "]
with " + path + " in "
- + (System.currentTimeMillis() - start) + " ms");
+ if (isSuccess) {
+ LOG.info("Delete shuffle data in HDFS for appId[" + appId + "] with "
+ path + " successfully in "
+ + (System.currentTimeMillis() - start) + " ms");
+ } else {
+ LOG.info("Failed to delete shuffle data in HDFS for appId[" + appId +
"] with " + path + " in "
+ + (System.currentTimeMillis() - start) + " ms");
+ }
}
}
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java
index 97bf3864..e2fb88b2 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java
@@ -24,16 +24,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
-import org.apache.uniffle.storage.util.ShuffleStorageUtils;
public class LocalFileDeleteHandler implements ShuffleDeleteHandler {
private static final Logger LOG =
LoggerFactory.getLogger(LocalFileDeleteHandler.class);
@Override
- public void delete(String[] storageBasePaths, String appId, String user) {
- for (String basePath : storageBasePaths) {
- String shufflePath =
ShuffleStorageUtils.getFullShuffleDataFolder(basePath, appId);
+ public void delete(String[] shuffleDataStoredPath, String appId, String
user) {
+ for (String basePath : shuffleDataStoredPath) {
+ final String shufflePath = basePath;
long start = System.currentTimeMillis();
try {
File baseFolder = new File(shufflePath);