This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 94da28a2f [CELEBORN-988] Add config option to control original
unsorted file deletion in `PartitionFilesSorter`
94da28a2f is described below
commit 94da28a2f514e86481c0345c9751a094949c0eb2
Author: Fu Chen <[email protected]>
AuthorDate: Tue Sep 19 11:14:51 2023 +0800
[CELEBORN-988] Add config option to control original unsorted file deletion
in `PartitionFilesSorter`
### What changes were proposed in this pull request?
This PR adds a new configuration option,
`celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled`, allowing
users to control whether the `PartitionFilesSorter` deletes the original
unsorted file.
### Why are the changes needed?
https://github.com/apache/incubator-celeborn/pull/1907#issuecomment-1723420513
### Does this PR introduce _any_ user-facing change?
Users have the option to prevent the `PartitionSorter` from deleting the
original unsorted file by configuring
`celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled = false`.
### How was this patch tested?
Pass GA
Closes #1922 from cfmcgrady/make-delete-configurable.
Authored-by: Fu Chen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit 1e49ff76f377dd441f6eac8196459c7e4854fdf9)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../scala/org/apache/celeborn/common/CelebornConf.scala | 17 +++++++++++++++++
docs/configuration/worker.md | 1 +
.../deploy/worker/storage/PartitionFilesSorter.java | 7 ++++++-
3 files changed, 24 insertions(+), 1 deletion(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 7a7768706..53f017ed1 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -647,6 +647,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else
get(WORKER_COMMIT_THREADS)
def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE)
+ def partitionSorterLazyRemovalOfOriginalFilesEnabled: Boolean =
+ get(PARTITION_SORTER_LAZY_REMOVAL_OF_ORIGINAL_FILES_ENABLED)
def partitionSorterSortPartitionTimeout: Long =
get(PARTITION_SORTER_SORT_TIMEOUT)
def partitionSorterReservedMemoryPerPartition: Long =
get(WORKER_PARTITION_SORTER_PER_PARTITION_RESERVED_MEMORY)
@@ -2206,6 +2208,21 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")
+ val PARTITION_SORTER_LAZY_REMOVAL_OF_ORIGINAL_FILES_ENABLED:
ConfigEntry[Boolean] =
+
buildConf("celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled")
+ .categories("worker")
+ .doc("When set to false, the PartitionSorter immediately removes the
original file once " +
+ "its partition has been successfully sorted. It is important to note
that this behavior " +
+ "may result in a potential issue with the ReusedExchange operation
when it triggers both " +
+ "non-range and range fetch requests simultaneously. see CELEBORN-980
for more details." +
+ "When set to true, the PartitionSorter will retain the original
unsorted file. However, " +
+ "it's essential to be aware that enabling this option may lead to an
increase in storage " +
+ "space usage during the range fetch phase, as both the original and
sorted files will be " +
+ "retained until the shuffle is finished.")
+ .version("0.3.2")
+ .booleanConf
+ .createWithDefault(true)
+
val PARTITION_SORTER_SORT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.sortPartition.timeout")
.withAlternative("celeborn.worker.partitionSorter.sort.timeout")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 123a766e4..553bfb2d4 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -92,6 +92,7 @@ license: |
| celeborn.worker.shuffle.partitionSplit.enabled | true | enable the partition
split on worker side | 0.3.0 |
| celeborn.worker.shuffle.partitionSplit.max | 2g | Specify the maximum
partition size for splitting, and ensure that individual partition files are
always smaller than this limit. | 0.3.0 |
| celeborn.worker.shuffle.partitionSplit.min | 1m | Min size for a partition
to split | 0.3.0 |
+| celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled | true |
When set to false, the PartitionSorter immediately removes the original file
once its partition has been successfully sorted. It is important to note that
this behavior may result in a potential issue with the ReusedExchange operation
when it triggers both non-range and range fetch requests simultaneously. see
CELEBORN-980 for more details.When set to true, the PartitionSorter will retain
the original unsorted fi [...]
| celeborn.worker.sortPartition.reservedMemoryPerPartition | 1mb | Reserved
memory when sorting a shuffle file off-heap. | 0.3.0 |
| celeborn.worker.sortPartition.threads | <undefined> |
PartitionSorter's thread counts. It's recommended to set at least `64` when
`HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 |
| celeborn.worker.sortPartition.timeout | 220s | Timeout for a shuffle file to
sort. | 0.3.0 |
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index ed1f3f130..7f121b853 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -76,6 +76,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
private final AtomicInteger sortedFileCount = new AtomicInteger();
private final AtomicLong sortedFilesSize = new AtomicLong();
+ protected final boolean lazyRemovalOfOriginalFilesEnabled;
protected final long sortTimeout;
protected final long shuffleChunkSize;
protected final long reservedMemoryPerPartition;
@@ -91,6 +92,8 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
public PartitionFilesSorter(
MemoryManager memoryManager, CelebornConf conf, AbstractSource source) {
+ this.lazyRemovalOfOriginalFilesEnabled =
+ conf.partitionSorterLazyRemovalOfOriginalFilesEnabled();
this.sortTimeout = conf.partitionSorterSortPartitionTimeout();
this.shuffleChunkSize = conf.shuffleChunkSize();
this.reservedMemoryPerPartition =
conf.partitionSorterReservedMemoryPerPartition();
@@ -589,7 +592,9 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
writeIndex(sortedBlockInfoMap, indexFilePath, isHdfs);
updateSortedShuffleFiles(shuffleKey, fileId, originFileLen);
- deleteOriginFiles();
+ if (!lazyRemovalOfOriginalFilesEnabled) {
+ deleteOriginFiles();
+ }
logger.debug("sort complete for {} {}", shuffleKey, originFilePath);
} catch (Exception e) {
logger.error(