This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new f9526021c [CELEBORN-1846] Fix the StreamHandler usage in fetching 
chunk when task attempt is odd
f9526021c is described below

commit f9526021c758c09a160398f9217e23d45c746c88
Author: onebox-li <[email protected]>
AuthorDate: Wed Feb 12 14:22:33 2025 +0800

    [CELEBORN-1846] Fix the StreamHandler usage in fetching chunk when task 
attempt is odd
    
    ### What changes were proposed in this pull request?
    The streams opened in the `streamCreatorPool` thread pool are all based on 
the primary locations. When the task attempt is odd, the task will start to 
fetch the chunk from the replica location first. This will cause using the 
wrong streamHandler to fetch data. To keep the logic simple, we always fetch 
from the primary location, and when change to peer, closing stream and use a 
null streamHandler when fetching peers.
    
    ### Why are the changes needed?
    Avoid tasks that are slowed down by NPE and potential data problems.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Manual test.
    
    Closes #3079 from onebox-li/fix-stream-handler.
    
    Authored-by: onebox-li <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../celeborn/client/read/CelebornInputStream.java  | 31 +++++++++++++++++-----
 1 file changed, 25 insertions(+), 6 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index fc295a746..8606585bc 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -38,7 +38,9 @@ import org.apache.celeborn.client.ShuffleClient;
 import org.apache.celeborn.client.compress.Decompressor;
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.common.network.client.TransportClient;
 import org.apache.celeborn.common.network.client.TransportClientFactory;
+import org.apache.celeborn.common.network.protocol.TransportMessage;
 import org.apache.celeborn.common.protocol.*;
 import org.apache.celeborn.common.unsafe.Platform;
 import org.apache.celeborn.common.util.ExceptionMaker;
@@ -322,14 +324,10 @@ public abstract class CelebornInputStream extends 
InputStream {
 
     private PartitionReader createReaderWithRetry(
         PartitionLocation location, PbStreamHandler pbStreamHandler) throws 
IOException {
-      // For the first time, the location will be selected according to 
attemptNumber
-      if (fetchChunkRetryCnt == 0 && attemptNumber % 2 == 1 && 
location.hasPeer()) {
-        location = location.getPeer();
-        logger.debug("Read peer {} for attempt {}.", location, attemptNumber);
-      }
       Exception lastException = null;
       while (fetchChunkRetryCnt < fetchChunkMaxRetry) {
         try {
+          logger.debug("Create reader for location {}", location);
           if (isExcluded(location)) {
             throw new CelebornIOException("Fetch data from excluded worker! " 
+ location);
           }
@@ -350,6 +348,28 @@ public abstract class CelebornInputStream extends 
InputStream {
                 fetchChunkMaxRetry,
                 location,
                 e);
+            if (pbStreamHandler != null) {
+              try {
+                TransportClient client =
+                    clientFactory.createClient(location.getHost(), 
location.getFetchPort());
+                TransportMessage bufferStreamEnd =
+                    new TransportMessage(
+                        MessageType.BUFFER_STREAM_END,
+                        PbBufferStreamEnd.newBuilder()
+                            .setStreamType(StreamType.ChunkStream)
+                            .setStreamId(pbStreamHandler.getStreamId())
+                            .build()
+                            .toByteArray());
+                client.sendRpc(bufferStreamEnd.toByteBuffer());
+              } catch (InterruptedException | IOException ex) {
+                logger.warn(
+                    "Close {} stream {} failed",
+                    location.hostAndFetchPort(),
+                    pbStreamHandler.getStreamId(),
+                    ex);
+              }
+              pbStreamHandler = null;
+            }
             location = location.getPeer();
           } else {
             logger.warn(
@@ -422,7 +442,6 @@ public abstract class CelebornInputStream extends 
InputStream {
         int fetchChunkRetryCnt,
         int fetchChunkMaxRetry)
         throws IOException, InterruptedException {
-      logger.debug("Create reader for location {}", location);
 
       StorageInfo storageInfo = location.getStorageInfo();
       switch (storageInfo.getType()) {

Reply via email to