This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 8966c9b77 [CELEBORN-2208] Log the partition reader wait time if 
exceeds the threshold
8966c9b77 is described below

commit 8966c9b770490e30ab405e9543e24d7f2c4201eb
Author: Wang, Fei <[email protected]>
AuthorDate: Fri Nov 21 16:33:57 2025 +0800

    [CELEBORN-2208] Log the partition reader wait time if exceeds the threshold
    
    ### What changes were proposed in this pull request?
    Log the partition reader wait time if exceeds the threshold.
    
    ### Why are the changes needed?
    
    Now I see the task shuffle read wait time is very long, however there is no 
task log to indicate the slowness.
    <img width="1702" height="130" alt="image" 
src="https://github.com/user-attachments/assets/47973563-13c7-4178-8954-3d3a23181a02";
 />
    
    <img width="1104" height="425" alt="image" 
src="https://github.com/user-attachments/assets/864448a8-de6b-47da-bb54-75b7b2f8a0c4";
 />
    
    ### Does this PR resolve a correctness bug?
    
    No.
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Code review.
    
    Closes #3544 from turboFei/log_time.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../celeborn/client/read/DfsPartitionReader.java   | 28 ++++++++++++++++++++--
 .../celeborn/client/read/LocalPartitionReader.java | 27 +++++++++++++++++++--
 .../client/read/WorkerPartitionReader.java         | 28 ++++++++++++++++++++--
 .../org/apache/celeborn/common/CelebornConf.scala  | 11 +++++++++
 docs/configuration/client.md                       |  1 +
 5 files changed, 89 insertions(+), 6 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 5493e7f29..735a53232 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -75,6 +75,7 @@ public class DfsPartitionReader implements PartitionReader {
   private TransportClient client;
   private PbStreamHandler streamHandler;
   private MetricsCallback metricsCallback;
+  private long partitionReaderWaitLogThreshold;
   private FileSystem hadoopFs;
 
   private Path dataFilePath;
@@ -100,6 +101,7 @@ public class DfsPartitionReader implements PartitionReader {
     results = new LinkedBlockingQueue<>();
 
     this.metricsCallback = metricsCallback;
+    this.partitionReaderWaitLogThreshold = 
conf.clientPartitionReaderWaitLogThreshold();
     this.location = location;
     if (location.getStorageInfo() != null
         && location.getStorageInfo().getType() == StorageInfo.Type.S3) {
@@ -290,14 +292,36 @@ public class DfsPartitionReader implements 
PartitionReader {
           });
     }
     try {
+      long totalWaitTimeMs = 0;
+      long lastLogTimeMs = 0;
+
       while (chunk == null) {
         checkException();
         Long startFetchWait = System.nanoTime();
         chunk = results.poll(500, TimeUnit.MILLISECONDS);
-        metricsCallback.incReadTime(
-            TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait));
+        long waitTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startFetchWait);
+        metricsCallback.incReadTime(waitTimeMs);
+        totalWaitTimeMs += waitTimeMs;
+        // Log when wait time exceeds another threshold since last log
+        if (chunk == null && totalWaitTimeMs >= lastLogTimeMs + 
partitionReaderWaitLogThreshold) {
+          lastLogTimeMs = totalWaitTimeMs;
+          logger.info(
+              "Waiting for data from partition {}/{} for {}ms",
+              location.getFileName(),
+              location.hostAndPorts(),
+              totalWaitTimeMs);
+        }
+
         logger.debug("poll result with result size: {}", results.size());
       }
+
+      if (totalWaitTimeMs >= partitionReaderWaitLogThreshold) {
+        logger.info(
+            "Finished waiting for data from partition {}/{} after {}ms",
+            location.getFileName(),
+            location.hostAndPorts(),
+            totalWaitTimeMs);
+      }
     } catch (Exception e) {
       logger.error("PartitionReader thread interrupted while fetching data.");
       throw e;
diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
 
b/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
index 56982f914..0e7950376 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
@@ -69,6 +69,7 @@ public class LocalPartitionReader implements PartitionReader {
   private PbStreamHandler streamHandler;
   private TransportClient client;
   private MetricsCallback metricsCallback;
+  private long partitionReaderWaitLogThreshold;
   private int startChunkIndex;
   private int endChunkIndex;
 
@@ -98,6 +99,7 @@ public class LocalPartitionReader implements PartitionReader {
     results = new LinkedBlockingQueue<>();
     this.location = location;
     this.metricsCallback = metricsCallback;
+    this.partitionReaderWaitLogThreshold = 
conf.clientPartitionReaderWaitLogThreshold();
     long fetchTimeoutMs = conf.clientFetchTimeoutMs();
     try {
       client = clientFactory.createClient(location.getHost(), 
location.getFetchPort(), 0);
@@ -219,14 +221,35 @@ public class LocalPartitionReader implements 
PartitionReader {
     }
     ByteBuf chunk = null;
     try {
+      long totalWaitTimeMs = 0;
+      long lastLogTimeMs = 0;
+
       while (chunk == null) {
         checkException();
         Long startFetchWait = System.nanoTime();
         chunk = results.poll(100, TimeUnit.MILLISECONDS);
-        metricsCallback.incReadTime(
-            TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait));
+        long waitTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startFetchWait);
+        metricsCallback.incReadTime(waitTimeMs);
+        totalWaitTimeMs += waitTimeMs;
+        // Log when wait time exceeds another threshold since last log
+        if (chunk == null && totalWaitTimeMs >= lastLogTimeMs + 
partitionReaderWaitLogThreshold) {
+          lastLogTimeMs = totalWaitTimeMs;
+          logger.info(
+              "Waiting for data from partition {}/{} for {}ms",
+              location.getFileName(),
+              location.hostAndPorts(),
+              totalWaitTimeMs);
+        }
         logger.debug("Poll result with result size: {}", results.size());
       }
+
+      if (totalWaitTimeMs >= partitionReaderWaitLogThreshold) {
+        logger.info(
+            "Finished waiting for data from partition {}/{} after {}ms",
+            location.getFileName(),
+            location.hostAndPorts(),
+            totalWaitTimeMs);
+      }
     } catch (InterruptedException e) {
       logger.error("PartitionReader thread interrupted while fetching data.");
       throw e;
diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
 
b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
index b6fb2209f..7a0667203 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
@@ -54,6 +54,7 @@ public class WorkerPartitionReader implements PartitionReader 
{
   private PbStreamHandler streamHandler;
   private TransportClient client;
   private MetricsCallback metricsCallback;
+  private long partitionReaderWaitLogThreshold;
 
   private int lastReturnedChunkId = -1;
   private int returnedChunks;
@@ -101,6 +102,7 @@ public class WorkerPartitionReader implements 
PartitionReader {
     pollChunkWaitTime = conf.clientFetchPollChunkWaitTime();
     inflightRequestCount = 0;
     this.metricsCallback = metricsCallback;
+    this.partitionReaderWaitLogThreshold = 
conf.clientPartitionReaderWaitLogThreshold();
     // only add the buffer to results queue if this reader is not closed.
     callback =
         new ChunkReceivedCallback() {
@@ -191,12 +193,34 @@ public class WorkerPartitionReader implements 
PartitionReader {
     }
     Pair<Integer, ByteBuf> chunk = null;
     try {
+      long totalWaitTimeMs = 0;
+      long lastLogTimeMs = 0;
+
       while (chunk == null) {
         checkException();
         Long startFetchWait = System.nanoTime();
         chunk = results.poll(pollChunkWaitTime, TimeUnit.MILLISECONDS);
-        metricsCallback.incReadTime(
-            TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait));
+        long waitTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startFetchWait);
+        metricsCallback.incReadTime(waitTimeMs);
+        totalWaitTimeMs += waitTimeMs;
+        // Log when wait time exceeds another threshold since last log
+        if (chunk == null && totalWaitTimeMs >= lastLogTimeMs + 
partitionReaderWaitLogThreshold) {
+          lastLogTimeMs = totalWaitTimeMs;
+          logger.info(
+              "Waiting for data from partition {}/{} for {}ms",
+              location.getFileName(),
+              location.hostAndPorts(),
+              totalWaitTimeMs);
+        }
+        logger.debug("poll result with result size: {}", results.size());
+      }
+
+      if (totalWaitTimeMs >= partitionReaderWaitLogThreshold) {
+        logger.info(
+            "Finished waiting for data from partition {}/{} after {}ms",
+            location.getFileName(),
+            location.hostAndPorts(),
+            totalWaitTimeMs);
       }
     } catch (InterruptedException e) {
       logger.error("PartitionReader thread interrupted while polling data.");
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index ce0691a34..c863a5f6d 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1025,6 +1025,8 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def clientFetchMaxReqsInFlight: Int = get(CLIENT_FETCH_MAX_REQS_IN_FLIGHT)
   def isPartitionReaderCheckpointEnabled: Boolean =
     get(PARTITION_READER_CHECKPOINT_ENABLED)
+  def clientPartitionReaderWaitLogThreshold: Long =
+    get(PARTITION_READER_WAIT_LOG_THRESHOLD)
 
   def clientFetchMaxRetriesForEachReplica: Int = 
get(CLIENT_FETCH_MAX_RETRIES_FOR_EACH_REPLICA)
   def clientStageRerunEnabled: Boolean = get(CLIENT_STAGE_RERUN_ENABLED)
@@ -5034,6 +5036,15 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(false)
 
+  val PARTITION_READER_WAIT_LOG_THRESHOLD: ConfigEntry[Long] =
+    buildConf("celeborn.client.partition.reader.waitLog.threshold")
+      .categories("client")
+      .version("0.6.2")
+      .doc("The threshold in milliseconds for logging partition read wait 
time. " +
+        "Log messages will be generated when wait time exceeds multiples of 
this threshold.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("60s")
+
   val CLIENT_FETCH_MAX_REQS_IN_FLIGHT: ConfigEntry[Int] =
     buildConf("celeborn.client.fetch.maxReqsInFlight")
       .withAlternative("celeborn.fetch.maxReqsInFlight")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 0b0facf44..fb56d8d72 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -52,6 +52,7 @@ license: |
 | celeborn.client.inputStream.creation.window | 16 | false | Window size that 
CelebornShuffleReader pre-creates CelebornInputStreams, for coalesced scenario 
where multiple Partitions are read | 0.5.1 |  | 
 | celeborn.client.mr.pushData.max | 32m | false | Max size for a push data 
sent from mr client. | 0.4.0 |  | 
 | celeborn.client.partition.reader.checkpoint.enabled | false | false | 
Whether or not checkpoint reads when re-creating a partition reader. Setting to 
true minimizes the amount of unnecessary reads during partition read retries | 
0.6.0 |  | 
+| celeborn.client.partition.reader.waitLog.threshold | 60s | false | The 
threshold in milliseconds for logging partition read wait time. Log messages 
will be generated when wait time exceeds multiples of this threshold. | 0.6.2 | 
 | 
 | celeborn.client.push.buffer.initial.size | 8k | false |  | 0.3.0 | 
celeborn.push.buffer.initial.size | 
 | celeborn.client.push.buffer.max.size | 64k | false | Max size of reducer 
partition buffer memory for shuffle hash writer. The pushed data will be 
buffered in memory before sending to Celeborn worker. For performance 
consideration keep this buffer size higher than 32K. Example: If reducer amount 
is 2000, buffer size is 64K, then each task will consume up to `64KiB * 2000 = 
125MiB` heap memory. | 0.3.0 | celeborn.push.buffer.max.size | 
 | celeborn.client.push.excludeWorkerOnFailure.enabled | false | false | 
Whether to enable shuffle client-side push exclude workers on failures. | 0.3.0 
|  | 

Reply via email to