This is an automated email from the ASF dual-hosted git repository.
csy pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 8b27c7cd6 [CELEBORN-2131] Add sorting duration logs in FileSorter
8b27c7cd6 is described below
commit 8b27c7cd6c43af7600da87f56a1d29cd1d1fff1c
Author: sychen <[email protected]>
AuthorDate: Wed Sep 24 19:40:27 2025 +0800
[CELEBORN-2131] Add sorting duration logs in FileSorter
### What changes were proposed in this pull request?
`celeborn.worker.sortPartition.sortTimeLogThreshold=220s`
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
```
25/08/26 15:18:43,520 INFO [worker-file-sorter-executor-0]
PartitionFilesSorter: File sorting took 3086ms for fileId:
application-1-/tmp/Celeborn369677075541712354sort-suite, shuffleKey:
application-1, originFilePath: /tmp/Celeborn369677075541712354sort-suite,
originFileLen: 2453073519
```
Closes #3451 from cxzl25/CELEBORN-2131.
Authored-by: sychen <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 10 ++++++++++
docs/configuration/worker.md | 1 +
.../deploy/worker/storage/PartitionFilesSorter.java | 18 ++++++++++++++++++
3 files changed, 29 insertions(+)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index e31206dc2..f178f1e33 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -849,6 +849,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
get(ESTIMATED_PARTITION_SIZE_MAX_SIZE).getOrElse(partitionSplitMaximumSize
* 2)
def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE)
def workerPartitionSorterSortPartitionTimeout: Long =
get(WORKER_PARTITION_SORTER_SORT_TIMEOUT)
+ def workerPartitionSorterSortTimeLogThreshold: Long =
+ get(WORKER_PARTITION_SORTER_SORT_TIME_LOG_THRESHOLD)
def workerPartitionSorterPrefetchEnabled: Boolean =
get(WORKER_PARTITION_SORTER_PREFETCH_ENABLED)
def workerPartitionSorterShuffleBlockCompactionFactor: Double =
@@ -3688,6 +3690,14 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("220s")
+ val WORKER_PARTITION_SORTER_SORT_TIME_LOG_THRESHOLD: ConfigEntry[Long] =
+ buildConf("celeborn.worker.sortPartition.sortTimeLogThreshold")
+ .categories("worker")
+ .doc("When sort time exceeds this threshold, log the file id and sort
duration. " +
+ "Set to 0 to disable logging.")
+ .version("0.6.2")
+ .fallbackConf(WORKER_PARTITION_SORTER_SORT_TIMEOUT)
+
val WORKER_PARTITION_SORTER_THREADS: OptionalConfigEntry[Int] =
buildConf("celeborn.worker.sortPartition.threads")
.withAlternative("celeborn.worker.partitionSorter.threads")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index a63d213fe..0c623fccf 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -180,6 +180,7 @@ license: |
| celeborn.worker.sortPartition.indexCache.maxWeight | 100000 | false |
PartitionSorter's cache max weight for index buffer. | 0.4.0 | |
| celeborn.worker.sortPartition.prefetch.enabled | true | false | When true,
partition sorter will prefetch the original partition files to page cache and
reserve memory configured by
`celeborn.worker.sortPartition.reservedMemoryPerPartition` to allocate a block
of memory for prefetching while sorting a shuffle file off-heap with page cache
for non-hdfs files. Otherwise, partition sorter seeks to position of each block
and does not prefetch for non-hdfs files. | 0.5.0 | |
| celeborn.worker.sortPartition.reservedMemoryPerPartition | 1mb | false |
Reserved memory when sorting a shuffle file off-heap. | 0.3.0 |
celeborn.worker.partitionSorter.reservedMemoryPerPartition |
+| celeborn.worker.sortPartition.sortTimeLogThreshold | <value of
celeborn.worker.sortPartition.timeout> | false | When sort time exceeds this
threshold, log the file id and sort duration. Set to 0 to disable logging. |
0.6.2 | |
| celeborn.worker.sortPartition.threads | <undefined> | false |
PartitionSorter's thread counts. It's recommended to set at least `64` when
`HDFS` is enabled in `celeborn.storage.availableTypes`. | 0.3.0 |
celeborn.worker.partitionSorter.threads |
| celeborn.worker.sortPartition.timeout | 220s | false | Timeout for a shuffle
file to sort. | 0.3.0 | celeborn.worker.partitionSorter.sort.timeout |
| celeborn.worker.storage.checkDirsEmpty.maxRetries | 3 | false | The number
of retries for a worker to check if the working directory is cleaned up before
registering with the master. | 0.3.0 |
celeborn.worker.disk.checkFileClean.maxRetries |
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 55ff7cc83..cdfb95c1e 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -92,6 +92,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
protected final boolean prefetchEnabled;
protected final long reservedMemoryPerPartition;
private final long partitionSorterShutdownAwaitTime;
+ private final long sortTimeLogThreshold;
private DB sortedFilesDb;
protected final AbstractSource source;
@@ -108,6 +109,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
this.reservedMemoryPerPartition =
conf.workerPartitionSorterReservedMemoryPerPartition();
this.partitionSorterShutdownAwaitTime =
conf.workerGracefulShutdownPartitionSorterCloseAwaitTimeMs();
+ this.sortTimeLogThreshold =
conf.workerPartitionSorterSortTimeLogThreshold();
long indexCacheMaxWeight = conf.workerPartitionSorterIndexCacheMaxWeight();
this.source = source;
this.cleaner = new PartitionFilesCleaner(this);
@@ -726,6 +728,10 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
public void sort() {
source.startTimer(WorkerSource.SORT_TIME(), fileId);
+ long sortStartTime = -1;
+ if (sortTimeLogThreshold > 0) {
+ sortStartTime = System.nanoTime();
+ }
try {
initializeFiles();
@@ -805,6 +811,18 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
sorting.remove(fileId);
}
}
+ if (sortTimeLogThreshold > 0) {
+ long sortDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
sortStartTime);
+ if (sortDuration > sortTimeLogThreshold) {
+ logger.info(
+ "File sorting took {}ms for fileId: {}, shuffleKey: {},
originFilePath: {}, originFileLen: {}",
+ sortDuration,
+ fileId,
+ shuffleKey,
+ originFilePath,
+ originFileLen);
+ }
+ }
source.stopTimer(WorkerSource.SORT_TIME(), fileId);
}