This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 8966c9b77 [CELEBORN-2208] Log the partition reader wait time if
exceeds the threshold
8966c9b77 is described below
commit 8966c9b770490e30ab405e9543e24d7f2c4201eb
Author: Wang, Fei <[email protected]>
AuthorDate: Fri Nov 21 16:33:57 2025 +0800
[CELEBORN-2208] Log the partition reader wait time if exceeds the threshold
### What changes were proposed in this pull request?
Log the partition reader wait time if exceeds the threshold.
### Why are the changes needed?
Now I see the task shuffle read wait time is very long, however there is no
task log to indicate the slowness.
<img width="1702" height="130" alt="image"
src="https://github.com/user-attachments/assets/47973563-13c7-4178-8954-3d3a23181a02"
/>
<img width="1104" height="425" alt="image"
src="https://github.com/user-attachments/assets/864448a8-de6b-47da-bb54-75b7b2f8a0c4"
/>
### Does this PR resolve a correctness bug?
No.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Code review.
Closes #3544 from turboFei/log_time.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../celeborn/client/read/DfsPartitionReader.java | 28 ++++++++++++++++++++--
.../celeborn/client/read/LocalPartitionReader.java | 27 +++++++++++++++++++--
.../client/read/WorkerPartitionReader.java | 28 ++++++++++++++++++++--
.../org/apache/celeborn/common/CelebornConf.scala | 11 +++++++++
docs/configuration/client.md | 1 +
5 files changed, 89 insertions(+), 6 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 5493e7f29..735a53232 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -75,6 +75,7 @@ public class DfsPartitionReader implements PartitionReader {
private TransportClient client;
private PbStreamHandler streamHandler;
private MetricsCallback metricsCallback;
+ private long partitionReaderWaitLogThreshold;
private FileSystem hadoopFs;
private Path dataFilePath;
@@ -100,6 +101,7 @@ public class DfsPartitionReader implements PartitionReader {
results = new LinkedBlockingQueue<>();
this.metricsCallback = metricsCallback;
+ this.partitionReaderWaitLogThreshold =
conf.clientPartitionReaderWaitLogThreshold();
this.location = location;
if (location.getStorageInfo() != null
&& location.getStorageInfo().getType() == StorageInfo.Type.S3) {
@@ -290,14 +292,36 @@ public class DfsPartitionReader implements
PartitionReader {
});
}
try {
+ long totalWaitTimeMs = 0;
+ long lastLogTimeMs = 0;
+
while (chunk == null) {
checkException();
Long startFetchWait = System.nanoTime();
chunk = results.poll(500, TimeUnit.MILLISECONDS);
- metricsCallback.incReadTime(
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait));
+ long waitTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startFetchWait);
+ metricsCallback.incReadTime(waitTimeMs);
+ totalWaitTimeMs += waitTimeMs;
+ // Log when wait time exceeds another threshold since last log
+ if (chunk == null && totalWaitTimeMs >= lastLogTimeMs +
partitionReaderWaitLogThreshold) {
+ lastLogTimeMs = totalWaitTimeMs;
+ logger.info(
+ "Waiting for data from partition {}/{} for {}ms",
+ location.getFileName(),
+ location.hostAndPorts(),
+ totalWaitTimeMs);
+ }
+
logger.debug("poll result with result size: {}", results.size());
}
+
+ if (totalWaitTimeMs >= partitionReaderWaitLogThreshold) {
+ logger.info(
+ "Finished waiting for data from partition {}/{} after {}ms",
+ location.getFileName(),
+ location.hostAndPorts(),
+ totalWaitTimeMs);
+ }
} catch (Exception e) {
logger.error("PartitionReader thread interrupted while fetching data.");
throw e;
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
b/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
index 56982f914..0e7950376 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
@@ -69,6 +69,7 @@ public class LocalPartitionReader implements PartitionReader {
private PbStreamHandler streamHandler;
private TransportClient client;
private MetricsCallback metricsCallback;
+ private long partitionReaderWaitLogThreshold;
private int startChunkIndex;
private int endChunkIndex;
@@ -98,6 +99,7 @@ public class LocalPartitionReader implements PartitionReader {
results = new LinkedBlockingQueue<>();
this.location = location;
this.metricsCallback = metricsCallback;
+ this.partitionReaderWaitLogThreshold =
conf.clientPartitionReaderWaitLogThreshold();
long fetchTimeoutMs = conf.clientFetchTimeoutMs();
try {
client = clientFactory.createClient(location.getHost(),
location.getFetchPort(), 0);
@@ -219,14 +221,35 @@ public class LocalPartitionReader implements
PartitionReader {
}
ByteBuf chunk = null;
try {
+ long totalWaitTimeMs = 0;
+ long lastLogTimeMs = 0;
+
while (chunk == null) {
checkException();
Long startFetchWait = System.nanoTime();
chunk = results.poll(100, TimeUnit.MILLISECONDS);
- metricsCallback.incReadTime(
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait));
+ long waitTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startFetchWait);
+ metricsCallback.incReadTime(waitTimeMs);
+ totalWaitTimeMs += waitTimeMs;
+ // Log when wait time exceeds another threshold since last log
+ if (chunk == null && totalWaitTimeMs >= lastLogTimeMs +
partitionReaderWaitLogThreshold) {
+ lastLogTimeMs = totalWaitTimeMs;
+ logger.info(
+ "Waiting for data from partition {}/{} for {}ms",
+ location.getFileName(),
+ location.hostAndPorts(),
+ totalWaitTimeMs);
+ }
logger.debug("Poll result with result size: {}", results.size());
}
+
+ if (totalWaitTimeMs >= partitionReaderWaitLogThreshold) {
+ logger.info(
+ "Finished waiting for data from partition {}/{} after {}ms",
+ location.getFileName(),
+ location.hostAndPorts(),
+ totalWaitTimeMs);
+ }
} catch (InterruptedException e) {
logger.error("PartitionReader thread interrupted while fetching data.");
throw e;
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
index b6fb2209f..7a0667203 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
@@ -54,6 +54,7 @@ public class WorkerPartitionReader implements PartitionReader
{
private PbStreamHandler streamHandler;
private TransportClient client;
private MetricsCallback metricsCallback;
+ private long partitionReaderWaitLogThreshold;
private int lastReturnedChunkId = -1;
private int returnedChunks;
@@ -101,6 +102,7 @@ public class WorkerPartitionReader implements
PartitionReader {
pollChunkWaitTime = conf.clientFetchPollChunkWaitTime();
inflightRequestCount = 0;
this.metricsCallback = metricsCallback;
+ this.partitionReaderWaitLogThreshold =
conf.clientPartitionReaderWaitLogThreshold();
// only add the buffer to results queue if this reader is not closed.
callback =
new ChunkReceivedCallback() {
@@ -191,12 +193,34 @@ public class WorkerPartitionReader implements
PartitionReader {
}
Pair<Integer, ByteBuf> chunk = null;
try {
+ long totalWaitTimeMs = 0;
+ long lastLogTimeMs = 0;
+
while (chunk == null) {
checkException();
Long startFetchWait = System.nanoTime();
chunk = results.poll(pollChunkWaitTime, TimeUnit.MILLISECONDS);
- metricsCallback.incReadTime(
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait));
+ long waitTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startFetchWait);
+ metricsCallback.incReadTime(waitTimeMs);
+ totalWaitTimeMs += waitTimeMs;
+ // Log when wait time exceeds another threshold since last log
+ if (chunk == null && totalWaitTimeMs >= lastLogTimeMs +
partitionReaderWaitLogThreshold) {
+ lastLogTimeMs = totalWaitTimeMs;
+ logger.info(
+ "Waiting for data from partition {}/{} for {}ms",
+ location.getFileName(),
+ location.hostAndPorts(),
+ totalWaitTimeMs);
+ }
+ logger.debug("poll result with result size: {}", results.size());
+ }
+
+ if (totalWaitTimeMs >= partitionReaderWaitLogThreshold) {
+ logger.info(
+ "Finished waiting for data from partition {}/{} after {}ms",
+ location.getFileName(),
+ location.hostAndPorts(),
+ totalWaitTimeMs);
}
} catch (InterruptedException e) {
logger.error("PartitionReader thread interrupted while polling data.");
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index ce0691a34..c863a5f6d 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1025,6 +1025,8 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def clientFetchMaxReqsInFlight: Int = get(CLIENT_FETCH_MAX_REQS_IN_FLIGHT)
def isPartitionReaderCheckpointEnabled: Boolean =
get(PARTITION_READER_CHECKPOINT_ENABLED)
+ def clientPartitionReaderWaitLogThreshold: Long =
+ get(PARTITION_READER_WAIT_LOG_THRESHOLD)
def clientFetchMaxRetriesForEachReplica: Int =
get(CLIENT_FETCH_MAX_RETRIES_FOR_EACH_REPLICA)
def clientStageRerunEnabled: Boolean = get(CLIENT_STAGE_RERUN_ENABLED)
@@ -5034,6 +5036,15 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)
+ val PARTITION_READER_WAIT_LOG_THRESHOLD: ConfigEntry[Long] =
+ buildConf("celeborn.client.partition.reader.waitLog.threshold")
+ .categories("client")
+ .version("0.6.2")
+ .doc("The threshold in milliseconds for logging partition read wait
time. " +
+ "Log messages will be generated when wait time exceeds multiples of
this threshold.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("60s")
+
val CLIENT_FETCH_MAX_REQS_IN_FLIGHT: ConfigEntry[Int] =
buildConf("celeborn.client.fetch.maxReqsInFlight")
.withAlternative("celeborn.fetch.maxReqsInFlight")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 0b0facf44..fb56d8d72 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -52,6 +52,7 @@ license: |
| celeborn.client.inputStream.creation.window | 16 | false | Window size that
CelebornShuffleReader pre-creates CelebornInputStreams, for coalesced scenario
where multiple Partitions are read | 0.5.1 | |
| celeborn.client.mr.pushData.max | 32m | false | Max size for a push data
sent from mr client. | 0.4.0 | |
| celeborn.client.partition.reader.checkpoint.enabled | false | false |
Whether or not checkpoint reads when re-creating a partition reader. Setting to
true minimizes the amount of unnecessary reads during partition read retries |
0.6.0 | |
+| celeborn.client.partition.reader.waitLog.threshold | 60s | false | The
threshold in milliseconds for logging partition read wait time. Log messages
will be generated when wait time exceeds multiples of this threshold. | 0.6.2 |
|
| celeborn.client.push.buffer.initial.size | 8k | false | | 0.3.0 |
celeborn.push.buffer.initial.size |
| celeborn.client.push.buffer.max.size | 64k | false | Max size of reducer
partition buffer memory for shuffle hash writer. The pushed data will be
buffered in memory before sending to Celeborn worker. For performance
consideration keep this buffer size higher than 32K. Example: If reducer amount
is 2000, buffer size is 64K, then each task will consume up to `64KiB * 2000 =
125MiB` heap memory. | 0.3.0 | celeborn.push.buffer.max.size |
| celeborn.client.push.excludeWorkerOnFailure.enabled | false | false |
Whether to enable shuffle client-side push exclude workers on failures. | 0.3.0
| |