This is an automated email from the ASF dual-hosted git repository.

csy pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 8b27c7cd6 [CELEBORN-2131] Add sorting duration logs in FileSorter
8b27c7cd6 is described below

commit 8b27c7cd6c43af7600da87f56a1d29cd1d1fff1c
Author: sychen <[email protected]>
AuthorDate: Wed Sep 24 19:40:27 2025 +0800

    [CELEBORN-2131] Add sorting duration logs in FileSorter
    
    ### What changes were proposed in this pull request?
    `celeborn.worker.sortPartition.sortTimeLogThreshold=220s`
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    ```
    25/08/26 15:18:43,520 INFO [worker-file-sorter-executor-0] 
PartitionFilesSorter: File sorting took 3086ms for fileId: 
application-1-/tmp/Celeborn369677075541712354sort-suite, shuffleKey: 
application-1, originFilePath: /tmp/Celeborn369677075541712354sort-suite, 
originFileLen: 2453073519
    ```
    
    Closes #3451 from cxzl25/CELEBORN-2131.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala      | 10 ++++++++++
 docs/configuration/worker.md                           |  1 +
 .../deploy/worker/storage/PartitionFilesSorter.java    | 18 ++++++++++++++++++
 3 files changed, 29 insertions(+)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index e31206dc2..f178f1e33 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -849,6 +849,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
     get(ESTIMATED_PARTITION_SIZE_MAX_SIZE).getOrElse(partitionSplitMaximumSize 
* 2)
   def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE)
   def workerPartitionSorterSortPartitionTimeout: Long = 
get(WORKER_PARTITION_SORTER_SORT_TIMEOUT)
+  def workerPartitionSorterSortTimeLogThreshold: Long =
+    get(WORKER_PARTITION_SORTER_SORT_TIME_LOG_THRESHOLD)
   def workerPartitionSorterPrefetchEnabled: Boolean =
     get(WORKER_PARTITION_SORTER_PREFETCH_ENABLED)
   def workerPartitionSorterShuffleBlockCompactionFactor: Double =
@@ -3688,6 +3690,14 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("220s")
 
+  val WORKER_PARTITION_SORTER_SORT_TIME_LOG_THRESHOLD: ConfigEntry[Long] =
+    buildConf("celeborn.worker.sortPartition.sortTimeLogThreshold")
+      .categories("worker")
+      .doc("When sort time exceeds this threshold, log the file id and sort 
duration. " +
+        "Set to 0 to disable logging.")
+      .version("0.6.2")
+      .fallbackConf(WORKER_PARTITION_SORTER_SORT_TIMEOUT)
+
   val WORKER_PARTITION_SORTER_THREADS: OptionalConfigEntry[Int] =
     buildConf("celeborn.worker.sortPartition.threads")
       .withAlternative("celeborn.worker.partitionSorter.threads")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index a63d213fe..0c623fccf 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -180,6 +180,7 @@ license: |
 | celeborn.worker.sortPartition.indexCache.maxWeight | 100000 | false | 
PartitionSorter's cache max weight for index buffer. | 0.4.0 |  | 
 | celeborn.worker.sortPartition.prefetch.enabled | true | false | When true, 
partition sorter will prefetch the original partition files to page cache and 
reserve memory configured by 
`celeborn.worker.sortPartition.reservedMemoryPerPartition` to allocate a block 
of memory for prefetching while sorting a shuffle file off-heap with page cache 
for non-hdfs files. Otherwise, partition sorter seeks to position of each block 
and does not prefetch for non-hdfs files. | 0.5.0 |  | 
 | celeborn.worker.sortPartition.reservedMemoryPerPartition | 1mb | false | 
Reserved memory when sorting a shuffle file off-heap. | 0.3.0 | 
celeborn.worker.partitionSorter.reservedMemoryPerPartition | 
+| celeborn.worker.sortPartition.sortTimeLogThreshold | &lt;value of 
celeborn.worker.sortPartition.timeout&gt; | false | When sort time exceeds this 
threshold, log the file id and sort duration. Set to 0 to disable logging. | 
0.6.2 |  | 
 | celeborn.worker.sortPartition.threads | &lt;undefined&gt; | false | 
PartitionSorter's thread counts. It's recommended to set at least `64` when 
`HDFS` is enabled in `celeborn.storage.availableTypes`. | 0.3.0 | 
celeborn.worker.partitionSorter.threads | 
 | celeborn.worker.sortPartition.timeout | 220s | false | Timeout for a shuffle 
file to sort. | 0.3.0 | celeborn.worker.partitionSorter.sort.timeout | 
 | celeborn.worker.storage.checkDirsEmpty.maxRetries | 3 | false | The number 
of retries for a worker to check if the working directory is cleaned up before 
registering with the master. | 0.3.0 | 
celeborn.worker.disk.checkFileClean.maxRetries | 
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 55ff7cc83..cdfb95c1e 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -92,6 +92,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
   protected final boolean prefetchEnabled;
   protected final long reservedMemoryPerPartition;
   private final long partitionSorterShutdownAwaitTime;
+  private final long sortTimeLogThreshold;
   private DB sortedFilesDb;
 
   protected final AbstractSource source;
@@ -108,6 +109,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
     this.reservedMemoryPerPartition = 
conf.workerPartitionSorterReservedMemoryPerPartition();
     this.partitionSorterShutdownAwaitTime =
         conf.workerGracefulShutdownPartitionSorterCloseAwaitTimeMs();
+    this.sortTimeLogThreshold = 
conf.workerPartitionSorterSortTimeLogThreshold();
     long indexCacheMaxWeight = conf.workerPartitionSorterIndexCacheMaxWeight();
     this.source = source;
     this.cleaner = new PartitionFilesCleaner(this);
@@ -726,6 +728,10 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
 
     public void sort() {
       source.startTimer(WorkerSource.SORT_TIME(), fileId);
+      long sortStartTime = -1;
+      if (sortTimeLogThreshold > 0) {
+        sortStartTime = System.nanoTime();
+      }
 
       try {
         initializeFiles();
@@ -805,6 +811,18 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
           sorting.remove(fileId);
         }
       }
+      if (sortTimeLogThreshold > 0) {
+        long sortDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
sortStartTime);
+        if (sortDuration > sortTimeLogThreshold) {
+          logger.info(
+              "File sorting took {}ms for fileId: {}, shuffleKey: {}, 
originFilePath: {}, originFileLen: {}",
+              sortDuration,
+              fileId,
+              shuffleKey,
+              originFilePath,
+              originFileLen);
+        }
+      }
       source.stopTimer(WorkerSource.SORT_TIME(), fileId);
     }
 

Reply via email to