This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new c775089c4 [CELEBORN-988][FOLLOWUP] Rename config key 
`celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled`
c775089c4 is described below

commit c775089c4b894847d29245238b0bab2c74cb51a7
Author: Fu Chen <[email protected]>
AuthorDate: Sun Sep 24 22:28:32 2023 +0800

    [CELEBORN-988][FOLLOWUP] Rename config key 
`celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled`
    
    ### What changes were proposed in this pull request?
    1. rename config key from 
`celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled` to 
`celeborn.worker.sortPartition.eagerlyRemoveOriginalFiles.enabled`
    2. make this config as an internal config
    
    ### Why are the changes needed?
    
    make the config key more clearly
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Pass GA
    
    Closes #1934 from cfmcgrady/celeborn-988-followup.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 24 ++++++++++++----------
 docs/configuration/worker.md                       |  1 -
 .../worker/storage/PartitionFilesSorter.java       |  8 ++++----
 3 files changed, 17 insertions(+), 16 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 00c153940..162e6e179 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -647,8 +647,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
     if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else 
get(WORKER_COMMIT_THREADS)
   def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
   def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE)
-  def partitionSorterLazyRemovalOfOriginalFilesEnabled: Boolean =
-    get(PARTITION_SORTER_LAZY_REMOVAL_OF_ORIGINAL_FILES_ENABLED)
+  def partitionSorterEagerlyRemoveOriginalFilesEnabled: Boolean =
+    get(PARTITION_SORTER_EAGERLY_REMOVE_ORIGINAL_FILES_ENABLED)
   def partitionSorterSortPartitionTimeout: Long = 
get(PARTITION_SORTER_SORT_TIMEOUT)
   def partitionSorterReservedMemoryPerPartition: Long =
     get(WORKER_PARTITION_SORTER_PER_PARTITION_RESERVED_MEMORY)
@@ -2211,20 +2211,22 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("120s")
 
-  val PARTITION_SORTER_LAZY_REMOVAL_OF_ORIGINAL_FILES_ENABLED: 
ConfigEntry[Boolean] =
-    
buildConf("celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled")
+  val PARTITION_SORTER_EAGERLY_REMOVE_ORIGINAL_FILES_ENABLED: 
ConfigEntry[Boolean] =
+    
buildConf("celeborn.worker.sortPartition.eagerlyRemoveOriginalFiles.enabled")
       .categories("worker")
-      .doc("When set to false, the PartitionSorter immediately removes the 
original file once " +
+      .doc("When set to true, the PartitionSorter immediately removes the 
original file once " +
         "its partition has been successfully sorted. It is important to note 
that this behavior " +
         "may result in a potential issue with the ReusedExchange operation 
when it triggers both " +
-        "non-range and range fetch requests simultaneously. see CELEBORN-980 
for more details." +
-        "When set to true, the PartitionSorter will retain the original 
unsorted file. However, " +
-        "it's essential to be aware that enabling this option may lead to an 
increase in storage " +
-        "space usage during the range fetch phase, as both the original and 
sorted files will be " +
-        "retained until the shuffle is finished.")
+        "non-range and range fetch requests simultaneously. When set to false, 
the " +
+        "PartitionSorter will retain the original unsorted file. However, it's 
essential to be " +
+        "aware that enabling this option may lead to an increase in storage 
space usage during " +
+        "the range fetch phase, as both the original and sorted files will be 
retained until the " +
+        "shuffle is finished. Note that the default value is configured as 
'false' as a " +
+        "temporary workaround for CELEBORN-980. see CELEBORN-980 for more 
details.")
       .version("0.3.2")
+      .internal
       .booleanConf
-      .createWithDefault(true)
+      .createWithDefault(false)
 
   val PARTITION_SORTER_SORT_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.worker.sortPartition.timeout")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 8c0111523..3833726d5 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -93,7 +93,6 @@ license: |
 | celeborn.worker.shuffle.partitionSplit.enabled | true | enable the partition 
split on worker side | 0.3.0 | 
 | celeborn.worker.shuffle.partitionSplit.max | 2g | Specify the maximum 
partition size for splitting, and ensure that individual partition files are 
always smaller than this limit. | 0.3.0 | 
 | celeborn.worker.shuffle.partitionSplit.min | 1m | Min size for a partition 
to split | 0.3.0 | 
-| celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled | true | 
When set to false, the PartitionSorter immediately removes the original file 
once its partition has been successfully sorted. It is important to note that 
this behavior may result in a potential issue with the ReusedExchange operation 
when it triggers both non-range and range fetch requests simultaneously. see 
CELEBORN-980 for more details.When set to true, the PartitionSorter will retain 
the original unsorted fi [...]
 | celeborn.worker.sortPartition.reservedMemoryPerPartition | 1mb | Reserved 
memory when sorting a shuffle file off-heap. | 0.3.0 | 
 | celeborn.worker.sortPartition.threads | &lt;undefined&gt; | 
PartitionSorter's thread counts. It's recommended to set at least `64` when 
`HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 | 
 | celeborn.worker.sortPartition.timeout | 220s | Timeout for a shuffle file to 
sort. | 0.3.0 | 
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index ba6b76b56..91433a235 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -77,7 +77,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
 
   private final AtomicInteger sortedFileCount = new AtomicInteger();
   private final AtomicLong sortedFilesSize = new AtomicLong();
-  protected final boolean lazyRemovalOfOriginalFilesEnabled;
+  protected final boolean eagerlyRemoveOriginalFilesEnabled;
   protected final long sortTimeout;
   protected final long shuffleChunkSize;
   protected final long reservedMemoryPerPartition;
@@ -93,8 +93,8 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
 
   public PartitionFilesSorter(
       MemoryManager memoryManager, CelebornConf conf, AbstractSource source) {
-    this.lazyRemovalOfOriginalFilesEnabled =
-        conf.partitionSorterLazyRemovalOfOriginalFilesEnabled();
+    this.eagerlyRemoveOriginalFilesEnabled =
+        conf.partitionSorterEagerlyRemoveOriginalFilesEnabled();
     this.sortTimeout = conf.partitionSorterSortPartitionTimeout();
     this.shuffleChunkSize = conf.shuffleChunkSize();
     this.reservedMemoryPerPartition = 
conf.partitionSorterReservedMemoryPerPartition();
@@ -596,7 +596,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
 
         writeIndex(sortedBlockInfoMap, indexFilePath, isHdfs);
         updateSortedShuffleFiles(shuffleKey, fileId, originFileLen);
-        if (!lazyRemovalOfOriginalFilesEnabled) {
+        if (eagerlyRemoveOriginalFilesEnabled) {
           deleteOriginFiles();
         }
         logger.debug("sort complete for {} {}", shuffleKey, originFilePath);

Reply via email to