This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 0f86bc005 [CELEBORN-2064] Fix the issue where reading replica
partition that returns zero chunk causes tasks to hang
0f86bc005 is described below
commit 0f86bc005e6ee7460e8b2782f691b75c04946cd4
Author: xinyuwang1 <[email protected]>
AuthorDate: Fri Aug 1 13:47:07 2025 -0700
[CELEBORN-2064] Fix the issue where reading replica partition that returns
zero chunk causes tasks to hang
### What changes were proposed in this pull request?
Re-validate hasNextChunk within getNextChunk.
### Why are the changes needed?
Fix the issue where reading replica partition that returns zero chunk
causes tasks to hang
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
manual test
Closes #3364 from littlexyw/fix_get_next_chunk.
Authored-by: xinyuwang1 <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit a61d6a517f180d7c86d97012e0314ab58589821c)
Signed-off-by: Wang, Fei <[email protected]>
---
.../org/apache/celeborn/client/read/CelebornInputStream.java | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index ad91fc381..48fcf0a4e 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -386,7 +386,7 @@ public abstract class CelebornInputStream extends
InputStream {
}
currentReader = createReaderWithRetry(currentLocation._1,
currentLocation._2);
fileIndex++;
- while (!currentReader.hasNext()) {
+ while (!currentReader.hasNext() || (fetchChunk && (currentChunk =
getNextChunk()) == null)) {
currentReader.close();
currentReader = null;
currentLocation = nextReadableLocation();
@@ -396,9 +396,6 @@ public abstract class CelebornInputStream extends
InputStream {
currentReader = createReaderWithRetry(currentLocation._1,
currentLocation._2);
fileIndex++;
}
- if (fetchChunk) {
- currentChunk = getNextChunk();
- }
}
private boolean isExcluded(PartitionLocation location) {
@@ -507,6 +504,9 @@ public abstract class CelebornInputStream extends
InputStream {
throw new CelebornIOException(
"Fetch data from excluded worker! " +
currentReader.getLocation());
}
+ if (!currentReader.hasNext()) {
+ return null;
+ }
return currentReader.next();
} catch (Exception e) {
shuffleClient.excludeFailedFetchLocation(
@@ -726,8 +726,7 @@ public abstract class CelebornInputStream extends
InputStream {
currentChunk.release();
}
currentChunk = null;
- if (currentReader.hasNext()) {
- currentChunk = getNextChunk();
+ if (currentReader.hasNext() && (currentChunk = getNextChunk()) != null) {
return true;
} else if (fileIndex < locations.size()) {
moveToNextReader(true);
@@ -757,6 +756,7 @@ public abstract class CelebornInputStream extends
InputStream {
if (firstChunk && currentReader != null) {
init();
currentChunk = getNextChunk();
+ while (currentChunk == null && moveToNextChunk()) ;
firstChunk = false;
}
if (currentChunk == null) {