This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new c14d2ca5d [CELEBORN-1947] Reduce log for CelebornShuffleReader 
sleeping before inputStream ready
c14d2ca5d is described below

commit c14d2ca5dc9bd8372751cc412ef50ae19cd0f293
Author: sychen <[email protected]>
AuthorDate: Fri Mar 28 10:50:12 2025 -0700

    [CELEBORN-1947] Reduce log for CelebornShuffleReader sleeping before 
inputStream ready
    
    ### What changes were proposed in this pull request?
    
    ### Why are the changes needed?
    When shuffle read timeout, a large number of logs are output in the 
executor log.
    
    ```
    inputStream is null, sleeping...
    ```
    
    <img width="799" alt="image" 
src="https://github.com/user-attachments/assets/f5b2bfde-5874-4b1e-8992-7037a9c81aa5";
 />
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #3181 from cxzl25/CELEBORN-1947.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit f1c963d0b05a1d4c1382c11bd6f58994c7b6818b)
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../apache/spark/shuffle/celeborn/CelebornShuffleReader.scala    | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 52eae2ff2..ea4254bc5 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -256,6 +256,7 @@ class CelebornShuffleReader[K, C](
       if (handle.numMappers > 0) {
         val startFetchWait = System.nanoTime()
         var inputStream: CelebornInputStream = streams.get(partitionId)
+        var sleepCnt = 0L
         while (inputStream == null) {
           if (exceptionRef.get() != null) {
             exceptionRef.get() match {
@@ -264,10 +265,16 @@ class CelebornShuffleReader[K, C](
               case e => throw e
             }
           }
-          logInfo("inputStream is null, sleeping...")
+          if (sleepCnt == 0) {
+            logInfo(s"inputStream for partition: $partitionId is null, 
sleeping...")
+          }
+          sleepCnt += 1
           Thread.sleep(50)
           inputStream = streams.get(partitionId)
         }
+        if (sleepCnt > 0) {
+          logInfo(s"inputStream for partition: $partitionId is not null, sleep 
count: $sleepCnt")
+        }
         metricsCallback.incReadTime(
           TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait))
         // ensure inputStream is closed when task completes

Reply via email to