This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 385246784 [CELEBORN-1859] DfsPartitionReader and LocalPartitionReader 
should reuse pbStreamHandlers get from BatchOpenStream request
385246784 is described below

commit 3852467842a7634ddc0dda09c6c3d04e9883c4df
Author: wuziyi <[email protected]>
AuthorDate: Mon Feb 17 09:46:46 2025 +0800

    [CELEBORN-1859] DfsPartitionReader and LocalPartitionReader should reuse 
pbStreamHandlers get from BatchOpenStream request
    
    DfsPartitionReader and LocalPartitionReader should reuse pbStreamHandlers 
get from BatchOpenStream request like WorkerPartitionReader instead of sending 
another OpenStream Request.
    
    Reduce unnecessary rpc requests
    
    No
    
    Test manually in test cluster
    
    Closes #3093 from Z1Wu/fix/local_dfs_reuse_steam_handler.
    
    Authored-by: wuziyi <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit 6a836f9523b7eb03d3eff4bf1c23349bf8270064)
    Signed-off-by: Shuang <[email protected]>
---
 .../celeborn/client/read/CelebornInputStream.java  | 18 +++++++++++--
 .../celeborn/client/read/DfsPartitionReader.java   | 23 +++++++++-------
 .../celeborn/client/read/LocalPartitionReader.java | 31 +++++++++++++---------
 3 files changed, 48 insertions(+), 24 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index 471cbce71..0ef146ce0 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -467,7 +467,14 @@ public abstract class CelebornInputStream extends 
InputStream {
             logger.debug("Read local shuffle file {}", localHostAddress);
             containLocalRead = true;
             return new LocalPartitionReader(
-                conf, shuffleKey, location, clientFactory, startMapIndex, 
endMapIndex, callback);
+                conf,
+                shuffleKey,
+                location,
+                pbStreamHandler,
+                clientFactory,
+                startMapIndex,
+                endMapIndex,
+                callback);
           } else {
             return new WorkerPartitionReader(
                 conf,
@@ -483,7 +490,14 @@ public abstract class CelebornInputStream extends 
InputStream {
           }
         case HDFS:
           return new DfsPartitionReader(
-              conf, shuffleKey, location, clientFactory, startMapIndex, 
endMapIndex, callback);
+              conf,
+              shuffleKey,
+              location,
+              pbStreamHandler,
+              clientFactory,
+              startMapIndex,
+              endMapIndex,
+              callback);
         default:
           throw new CelebornIOException(
               String.format("Unknown storage info %s to read location %s", 
storageInfo, location));
diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 712bd82e4..e648d5780 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -73,6 +73,7 @@ public class DfsPartitionReader implements PartitionReader {
       CelebornConf conf,
       String shuffleKey,
       PartitionLocation location,
+      PbStreamHandler pbStreamHandler,
       TransportClientFactory clientFactory,
       int startMapIndex,
       int endMapIndex,
@@ -86,10 +87,10 @@ public class DfsPartitionReader implements PartitionReader {
     this.metricsCallback = metricsCallback;
     this.location = location;
 
-    if (endMapIndex != Integer.MAX_VALUE) {
-      long fetchTimeoutMs = conf.clientFetchTimeoutMs();
-      try {
-        client = clientFactory.createClient(location.getHost(), 
location.getFetchPort());
+    long fetchTimeoutMs = conf.clientFetchTimeoutMs();
+    try {
+      client = clientFactory.createClient(location.getHost(), 
location.getFetchPort());
+      if (pbStreamHandler == null) {
         TransportMessage openStream =
             new TransportMessage(
                 MessageType.OPEN_STREAM,
@@ -103,12 +104,16 @@ public class DfsPartitionReader implements 
PartitionReader {
         ByteBuffer response = client.sendRpcSync(openStream.toByteBuffer(), 
fetchTimeoutMs);
         streamHandler = 
TransportMessage.fromByteBuffer(response).getParsedPayload();
         // Parse this message to ensure sort is done.
-      } catch (IOException | InterruptedException e) {
-        throw new IOException(
-            "read shuffle file from HDFS failed, filePath: "
-                + location.getStorageInfo().getFilePath(),
-            e);
+      } else {
+        streamHandler = pbStreamHandler;
       }
+    } catch (IOException | InterruptedException e) {
+      throw new IOException(
+          "read shuffle file from DFS failed, filePath: " + 
location.getStorageInfo().getFilePath(),
+          e);
+    }
+
+    if (endMapIndex != Integer.MAX_VALUE) {
       hdfsInputStream =
           ShuffleClient.getHdfsFs(conf)
               .open(new 
Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));
diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
 
b/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
index ace28c61e..36e34d912 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
@@ -73,6 +73,7 @@ public class LocalPartitionReader implements PartitionReader {
       CelebornConf conf,
       String shuffleKey,
       PartitionLocation location,
+      PbStreamHandler pbStreamHandler,
       TransportClientFactory clientFactory,
       int startMapIndex,
       int endMapIndex,
@@ -94,19 +95,23 @@ public class LocalPartitionReader implements 
PartitionReader {
     long fetchTimeoutMs = conf.clientFetchTimeoutMs();
     try {
       client = clientFactory.createClient(location.getHost(), 
location.getFetchPort(), 0);
-      TransportMessage openStreamMsg =
-          new TransportMessage(
-              MessageType.OPEN_STREAM,
-              PbOpenStream.newBuilder()
-                  .setShuffleKey(shuffleKey)
-                  .setFileName(location.getFileName())
-                  .setStartIndex(startMapIndex)
-                  .setEndIndex(endMapIndex)
-                  .setReadLocalShuffle(true)
-                  .build()
-                  .toByteArray());
-      ByteBuffer response = client.sendRpcSync(openStreamMsg.toByteBuffer(), 
fetchTimeoutMs);
-      streamHandler = 
TransportMessage.fromByteBuffer(response).getParsedPayload();
+      if (pbStreamHandler == null) {
+        TransportMessage openStreamMsg =
+            new TransportMessage(
+                MessageType.OPEN_STREAM,
+                PbOpenStream.newBuilder()
+                    .setShuffleKey(shuffleKey)
+                    .setFileName(location.getFileName())
+                    .setStartIndex(startMapIndex)
+                    .setEndIndex(endMapIndex)
+                    .setReadLocalShuffle(true)
+                    .build()
+                    .toByteArray());
+        ByteBuffer response = client.sendRpcSync(openStreamMsg.toByteBuffer(), 
fetchTimeoutMs);
+        streamHandler = 
TransportMessage.fromByteBuffer(response).getParsedPayload();
+      } else {
+        this.streamHandler = pbStreamHandler;
+      }
     } catch (IOException | InterruptedException e) {
       throw new IOException(
           "Read shuffle file from local file failed, partition location: "

Reply via email to