This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 94da28a2f [CELEBORN-988] Add config option to control original 
unsorted file deletion in `PartitionFilesSorter`
94da28a2f is described below

commit 94da28a2f514e86481c0345c9751a094949c0eb2
Author: Fu Chen <[email protected]>
AuthorDate: Tue Sep 19 11:14:51 2023 +0800

    [CELEBORN-988] Add config option to control original unsorted file deletion 
in `PartitionFilesSorter`
    
    ### What changes were proposed in this pull request?
    
    This PR adds a new configuration option, 
`celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled`, allowing 
users to control whether the `PartitionFilesSorter` deletes the original 
unsorted file.
    
    ### Why are the changes needed?
    
    
https://github.com/apache/incubator-celeborn/pull/1907#issuecomment-1723420513
    
    ### Does this PR introduce _any_ user-facing change?
    
    Users have the option to prevent the `PartitionSorter` from deleting the 
original unsorted file by configuring 
`celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled = false`.
    
    ### How was this patch tested?
    
    Pass GA
    
    Closes #1922 from cfmcgrady/make-delete-configurable.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit 1e49ff76f377dd441f6eac8196459c7e4854fdf9)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../scala/org/apache/celeborn/common/CelebornConf.scala | 17 +++++++++++++++++
 docs/configuration/worker.md                            |  1 +
 .../deploy/worker/storage/PartitionFilesSorter.java     |  7 ++++++-
 3 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 7a7768706..53f017ed1 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -647,6 +647,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
     if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else 
get(WORKER_COMMIT_THREADS)
   def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
   def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE)
+  def partitionSorterLazyRemovalOfOriginalFilesEnabled: Boolean =
+    get(PARTITION_SORTER_LAZY_REMOVAL_OF_ORIGINAL_FILES_ENABLED)
   def partitionSorterSortPartitionTimeout: Long = 
get(PARTITION_SORTER_SORT_TIMEOUT)
   def partitionSorterReservedMemoryPerPartition: Long =
     get(WORKER_PARTITION_SORTER_PER_PARTITION_RESERVED_MEMORY)
@@ -2206,6 +2208,21 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("120s")
 
+  val PARTITION_SORTER_LAZY_REMOVAL_OF_ORIGINAL_FILES_ENABLED: 
ConfigEntry[Boolean] =
+    
buildConf("celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled")
+      .categories("worker")
+      .doc("When set to false, the PartitionSorter immediately removes the 
original file once " +
+        "its partition has been successfully sorted. It is important to note 
that this behavior " +
+        "may result in a potential issue with the ReusedExchange operation 
when it triggers both " +
+        "non-range and range fetch requests simultaneously. see CELEBORN-980 
for more details." +
+        "When set to true, the PartitionSorter will retain the original 
unsorted file. However, " +
+        "it's essential to be aware that enabling this option may lead to an 
increase in storage " +
+        "space usage during the range fetch phase, as both the original and 
sorted files will be " +
+        "retained until the shuffle is finished.")
+      .version("0.3.2")
+      .booleanConf
+      .createWithDefault(true)
+
   val PARTITION_SORTER_SORT_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.worker.sortPartition.timeout")
       .withAlternative("celeborn.worker.partitionSorter.sort.timeout")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 123a766e4..553bfb2d4 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -92,6 +92,7 @@ license: |
 | celeborn.worker.shuffle.partitionSplit.enabled | true | enable the partition 
split on worker side | 0.3.0 | 
 | celeborn.worker.shuffle.partitionSplit.max | 2g | Specify the maximum 
partition size for splitting, and ensure that individual partition files are 
always smaller than this limit. | 0.3.0 | 
 | celeborn.worker.shuffle.partitionSplit.min | 1m | Min size for a partition 
to split | 0.3.0 | 
+| celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled | true | 
When set to false, the PartitionSorter immediately removes the original file 
once its partition has been successfully sorted. It is important to note that 
this behavior may result in a potential issue with the ReusedExchange operation 
when it triggers both non-range and range fetch requests simultaneously. see 
CELEBORN-980 for more details.When set to true, the PartitionSorter will retain 
the original unsorted fi [...]
 | celeborn.worker.sortPartition.reservedMemoryPerPartition | 1mb | Reserved 
memory when sorting a shuffle file off-heap. | 0.3.0 | 
 | celeborn.worker.sortPartition.threads | &lt;undefined&gt; | 
PartitionSorter's thread counts. It's recommended to set at least `64` when 
`HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 | 
 | celeborn.worker.sortPartition.timeout | 220s | Timeout for a shuffle file to 
sort. | 0.3.0 | 
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index ed1f3f130..7f121b853 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -76,6 +76,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
 
   private final AtomicInteger sortedFileCount = new AtomicInteger();
   private final AtomicLong sortedFilesSize = new AtomicLong();
+  protected final boolean lazyRemovalOfOriginalFilesEnabled;
   protected final long sortTimeout;
   protected final long shuffleChunkSize;
   protected final long reservedMemoryPerPartition;
@@ -91,6 +92,8 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
 
   public PartitionFilesSorter(
       MemoryManager memoryManager, CelebornConf conf, AbstractSource source) {
+    this.lazyRemovalOfOriginalFilesEnabled =
+        conf.partitionSorterLazyRemovalOfOriginalFilesEnabled();
     this.sortTimeout = conf.partitionSorterSortPartitionTimeout();
     this.shuffleChunkSize = conf.shuffleChunkSize();
     this.reservedMemoryPerPartition = 
conf.partitionSorterReservedMemoryPerPartition();
@@ -589,7 +592,9 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
 
         writeIndex(sortedBlockInfoMap, indexFilePath, isHdfs);
         updateSortedShuffleFiles(shuffleKey, fileId, originFileLen);
-        deleteOriginFiles();
+        if (!lazyRemovalOfOriginalFilesEnabled) {
+          deleteOriginFiles();
+        }
         logger.debug("sort complete for {} {}", shuffleKey, originFilePath);
       } catch (Exception e) {
         logger.error(

Reply via email to