This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 0f86bc005 [CELEBORN-2064] Fix the issue where reading replica 
partition that returns zero chunk causes tasks to hang
0f86bc005 is described below

commit 0f86bc005e6ee7460e8b2782f691b75c04946cd4
Author: xinyuwang1 <[email protected]>
AuthorDate: Fri Aug 1 13:47:07 2025 -0700

    [CELEBORN-2064] Fix the issue where reading replica partition that returns 
zero chunk causes tasks to hang
    
    ### What changes were proposed in this pull request?
    Re-validate hasNextChunk within getNextChunk.
    
    ### Why are the changes needed?
    Fix the issue where reading replica partition that returns zero chunk 
causes tasks to hang
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    manual test
    
    Closes #3364 from littlexyw/fix_get_next_chunk.
    
    Authored-by: xinyuwang1 <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit a61d6a517f180d7c86d97012e0314ab58589821c)
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../org/apache/celeborn/client/read/CelebornInputStream.java | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index ad91fc381..48fcf0a4e 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -386,7 +386,7 @@ public abstract class CelebornInputStream extends 
InputStream {
       }
       currentReader = createReaderWithRetry(currentLocation._1, 
currentLocation._2);
       fileIndex++;
-      while (!currentReader.hasNext()) {
+      while (!currentReader.hasNext() || (fetchChunk && (currentChunk = 
getNextChunk()) == null)) {
         currentReader.close();
         currentReader = null;
         currentLocation = nextReadableLocation();
@@ -396,9 +396,6 @@ public abstract class CelebornInputStream extends 
InputStream {
         currentReader = createReaderWithRetry(currentLocation._1, 
currentLocation._2);
         fileIndex++;
       }
-      if (fetchChunk) {
-        currentChunk = getNextChunk();
-      }
     }
 
     private boolean isExcluded(PartitionLocation location) {
@@ -507,6 +504,9 @@ public abstract class CelebornInputStream extends 
InputStream {
             throw new CelebornIOException(
                 "Fetch data from excluded worker! " + 
currentReader.getLocation());
           }
+          if (!currentReader.hasNext()) {
+            return null;
+          }
           return currentReader.next();
         } catch (Exception e) {
           shuffleClient.excludeFailedFetchLocation(
@@ -726,8 +726,7 @@ public abstract class CelebornInputStream extends 
InputStream {
         currentChunk.release();
       }
       currentChunk = null;
-      if (currentReader.hasNext()) {
-        currentChunk = getNextChunk();
+      if (currentReader.hasNext() && (currentChunk = getNextChunk()) != null) {
         return true;
       } else if (fileIndex < locations.size()) {
         moveToNextReader(true);
@@ -757,6 +756,7 @@ public abstract class CelebornInputStream extends 
InputStream {
         if (firstChunk && currentReader != null) {
           init();
           currentChunk = getNextChunk();
+          while (currentChunk == null && moveToNextChunk()) ;
           firstChunk = false;
         }
         if (currentChunk == null) {

Reply via email to