This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c775089c4 [CELEBORN-988][FOLLOWUP] Rename config key
`celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled`
c775089c4 is described below
commit c775089c4b894847d29245238b0bab2c74cb51a7
Author: Fu Chen <[email protected]>
AuthorDate: Sun Sep 24 22:28:32 2023 +0800
[CELEBORN-988][FOLLOWUP] Rename config key
`celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled`
### What changes were proposed in this pull request?
1. rename config key from
`celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled` to
`celeborn.worker.sortPartition.eagerlyRemoveOriginalFiles.enabled`
2. make this config as an internal config
### Why are the changes needed?
make the config key more clearly
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes #1934 from cfmcgrady/celeborn-988-followup.
Authored-by: Fu Chen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 24 ++++++++++++----------
docs/configuration/worker.md | 1 -
.../worker/storage/PartitionFilesSorter.java | 8 ++++----
3 files changed, 17 insertions(+), 16 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 00c153940..162e6e179 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -647,8 +647,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else
get(WORKER_COMMIT_THREADS)
def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE)
- def partitionSorterLazyRemovalOfOriginalFilesEnabled: Boolean =
- get(PARTITION_SORTER_LAZY_REMOVAL_OF_ORIGINAL_FILES_ENABLED)
+ def partitionSorterEagerlyRemoveOriginalFilesEnabled: Boolean =
+ get(PARTITION_SORTER_EAGERLY_REMOVE_ORIGINAL_FILES_ENABLED)
def partitionSorterSortPartitionTimeout: Long =
get(PARTITION_SORTER_SORT_TIMEOUT)
def partitionSorterReservedMemoryPerPartition: Long =
get(WORKER_PARTITION_SORTER_PER_PARTITION_RESERVED_MEMORY)
@@ -2211,20 +2211,22 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")
- val PARTITION_SORTER_LAZY_REMOVAL_OF_ORIGINAL_FILES_ENABLED:
ConfigEntry[Boolean] =
-
buildConf("celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled")
+ val PARTITION_SORTER_EAGERLY_REMOVE_ORIGINAL_FILES_ENABLED:
ConfigEntry[Boolean] =
+
buildConf("celeborn.worker.sortPartition.eagerlyRemoveOriginalFiles.enabled")
.categories("worker")
- .doc("When set to false, the PartitionSorter immediately removes the
original file once " +
+ .doc("When set to true, the PartitionSorter immediately removes the
original file once " +
"its partition has been successfully sorted. It is important to note
that this behavior " +
"may result in a potential issue with the ReusedExchange operation
when it triggers both " +
- "non-range and range fetch requests simultaneously. see CELEBORN-980
for more details." +
- "When set to true, the PartitionSorter will retain the original
unsorted file. However, " +
- "it's essential to be aware that enabling this option may lead to an
increase in storage " +
- "space usage during the range fetch phase, as both the original and
sorted files will be " +
- "retained until the shuffle is finished.")
+ "non-range and range fetch requests simultaneously. When set to false,
the " +
+ "PartitionSorter will retain the original unsorted file. However, it's
essential to be " +
+ "aware that enabling this option may lead to an increase in storage
space usage during " +
+ "the range fetch phase, as both the original and sorted files will be
retained until the " +
+ "shuffle is finished. Note that the default value is configured as
'false' as a " +
+ "temporary workaround for CELEBORN-980. see CELEBORN-980 for more
details.")
.version("0.3.2")
+ .internal
.booleanConf
- .createWithDefault(true)
+ .createWithDefault(false)
val PARTITION_SORTER_SORT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.sortPartition.timeout")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 8c0111523..3833726d5 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -93,7 +93,6 @@ license: |
| celeborn.worker.shuffle.partitionSplit.enabled | true | enable the partition
split on worker side | 0.3.0 |
| celeborn.worker.shuffle.partitionSplit.max | 2g | Specify the maximum
partition size for splitting, and ensure that individual partition files are
always smaller than this limit. | 0.3.0 |
| celeborn.worker.shuffle.partitionSplit.min | 1m | Min size for a partition
to split | 0.3.0 |
-| celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled | true |
When set to false, the PartitionSorter immediately removes the original file
once its partition has been successfully sorted. It is important to note that
this behavior may result in a potential issue with the ReusedExchange operation
when it triggers both non-range and range fetch requests simultaneously. see
CELEBORN-980 for more details.When set to true, the PartitionSorter will retain
the original unsorted fi [...]
| celeborn.worker.sortPartition.reservedMemoryPerPartition | 1mb | Reserved
memory when sorting a shuffle file off-heap. | 0.3.0 |
| celeborn.worker.sortPartition.threads | <undefined> |
PartitionSorter's thread counts. It's recommended to set at least `64` when
`HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 |
| celeborn.worker.sortPartition.timeout | 220s | Timeout for a shuffle file to
sort. | 0.3.0 |
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index ba6b76b56..91433a235 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -77,7 +77,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
private final AtomicInteger sortedFileCount = new AtomicInteger();
private final AtomicLong sortedFilesSize = new AtomicLong();
- protected final boolean lazyRemovalOfOriginalFilesEnabled;
+ protected final boolean eagerlyRemoveOriginalFilesEnabled;
protected final long sortTimeout;
protected final long shuffleChunkSize;
protected final long reservedMemoryPerPartition;
@@ -93,8 +93,8 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
public PartitionFilesSorter(
MemoryManager memoryManager, CelebornConf conf, AbstractSource source) {
- this.lazyRemovalOfOriginalFilesEnabled =
- conf.partitionSorterLazyRemovalOfOriginalFilesEnabled();
+ this.eagerlyRemoveOriginalFilesEnabled =
+ conf.partitionSorterEagerlyRemoveOriginalFilesEnabled();
this.sortTimeout = conf.partitionSorterSortPartitionTimeout();
this.shuffleChunkSize = conf.shuffleChunkSize();
this.reservedMemoryPerPartition =
conf.partitionSorterReservedMemoryPerPartition();
@@ -596,7 +596,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
writeIndex(sortedBlockInfoMap, indexFilePath, isHdfs);
updateSortedShuffleFiles(shuffleKey, fileId, originFileLen);
- if (!lazyRemovalOfOriginalFilesEnabled) {
+ if (eagerlyRemoveOriginalFilesEnabled) {
deleteOriginFiles();
}
logger.debug("sort complete for {} {}", shuffleKey, originFilePath);