This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new c14d2ca5d [CELEBORN-1947] Reduce log for CelebornShuffleReader
sleeping before inputStream ready
c14d2ca5d is described below
commit c14d2ca5dc9bd8372751cc412ef50ae19cd0f293
Author: sychen <[email protected]>
AuthorDate: Fri Mar 28 10:50:12 2025 -0700
[CELEBORN-1947] Reduce log for CelebornShuffleReader sleeping before
inputStream ready
### What changes were proposed in this pull request?
### Why are the changes needed?
When shuffle read timeout, a large number of logs are output in the
executor log.
```
inputStream is null, sleeping...
```
<img width="799" alt="image"
src="https://github.com/user-attachments/assets/f5b2bfde-5874-4b1e-8992-7037a9c81aa5"
/>
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #3181 from cxzl25/CELEBORN-1947.
Authored-by: sychen <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit f1c963d0b05a1d4c1382c11bd6f58994c7b6818b)
Signed-off-by: Wang, Fei <[email protected]>
---
.../apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 52eae2ff2..ea4254bc5 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -256,6 +256,7 @@ class CelebornShuffleReader[K, C](
if (handle.numMappers > 0) {
val startFetchWait = System.nanoTime()
var inputStream: CelebornInputStream = streams.get(partitionId)
+ var sleepCnt = 0L
while (inputStream == null) {
if (exceptionRef.get() != null) {
exceptionRef.get() match {
@@ -264,10 +265,16 @@ class CelebornShuffleReader[K, C](
case e => throw e
}
}
- logInfo("inputStream is null, sleeping...")
+ if (sleepCnt == 0) {
+ logInfo(s"inputStream for partition: $partitionId is null,
sleeping...")
+ }
+ sleepCnt += 1
Thread.sleep(50)
inputStream = streams.get(partitionId)
}
+ if (sleepCnt > 0) {
+ logInfo(s"inputStream for partition: $partitionId is not null, sleep
count: $sleepCnt")
+ }
metricsCallback.incReadTime(
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait))
// ensure inputStream is closed when task completes