This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a836f952 [CELEBORN-1859] DfsPartitionReader and LocalPartitionReader 
should reuse pbStreamHandlers get from BatchOpenStream request
6a836f952 is described below

commit 6a836f9523b7eb03d3eff4bf1c23349bf8270064
Author: wuziyi <[email protected]>
AuthorDate: Mon Feb 17 09:46:46 2025 +0800

    [CELEBORN-1859] DfsPartitionReader and LocalPartitionReader should reuse 
pbStreamHandlers get from BatchOpenStream request
    
    ### What changes were proposed in this pull request?
    
    DfsPartitionReader and LocalPartitionReader should reuse pbStreamHandlers 
get from BatchOpenStream request like WorkerPartitionReader instead of sending 
another OpenStream Request.
    
    ### Why are the changes needed?
    
    Reduce unnecessary rpc requests
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Test manually in test cluster
    
    Closes #3093 from Z1Wu/fix/local_dfs_reuse_steam_handler.
    
    Authored-by: wuziyi <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../celeborn/client/read/CelebornInputStream.java  | 18 +++++++++++--
 .../celeborn/client/read/DfsPartitionReader.java   | 22 ++++++++-------
 .../celeborn/client/read/LocalPartitionReader.java | 31 +++++++++++++---------
 3 files changed, 47 insertions(+), 24 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index 8606585bc..e525a135a 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -454,7 +454,14 @@ public abstract class CelebornInputStream extends 
InputStream {
             logger.debug("Read local shuffle file {}", localHostAddress);
             containLocalRead = true;
             return new LocalPartitionReader(
-                conf, shuffleKey, location, clientFactory, startMapIndex, 
endMapIndex, callback);
+                conf,
+                shuffleKey,
+                location,
+                pbStreamHandler,
+                clientFactory,
+                startMapIndex,
+                endMapIndex,
+                callback);
           } else {
             return new WorkerPartitionReader(
                 conf,
@@ -471,7 +478,14 @@ public abstract class CelebornInputStream extends 
InputStream {
         case S3:
         case HDFS:
           return new DfsPartitionReader(
-              conf, shuffleKey, location, clientFactory, startMapIndex, 
endMapIndex, callback);
+              conf,
+              shuffleKey,
+              location,
+              pbStreamHandler,
+              clientFactory,
+              startMapIndex,
+              endMapIndex,
+              callback);
         default:
           throw new CelebornIOException(
               String.format("Unknown storage info %s to read location %s", 
storageInfo, location));
diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index b69cf580f..313fe77d1 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -76,6 +76,7 @@ public class DfsPartitionReader implements PartitionReader {
       CelebornConf conf,
       String shuffleKey,
       PartitionLocation location,
+      PbStreamHandler pbStreamHandler,
       TransportClientFactory clientFactory,
       int startMapIndex,
       int endMapIndex,
@@ -95,10 +96,10 @@ public class DfsPartitionReader implements PartitionReader {
       this.hadoopFs = 
ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.HDFS);
     }
 
-    if (endMapIndex != Integer.MAX_VALUE) {
-      long fetchTimeoutMs = conf.clientFetchTimeoutMs();
-      try {
-        client = clientFactory.createClient(location.getHost(), 
location.getFetchPort());
+    long fetchTimeoutMs = conf.clientFetchTimeoutMs();
+    try {
+      client = clientFactory.createClient(location.getHost(), 
location.getFetchPort());
+      if (pbStreamHandler == null) {
         TransportMessage openStream =
             new TransportMessage(
                 MessageType.OPEN_STREAM,
@@ -112,13 +113,16 @@ public class DfsPartitionReader implements 
PartitionReader {
         ByteBuffer response = client.sendRpcSync(openStream.toByteBuffer(), 
fetchTimeoutMs);
         streamHandler = 
TransportMessage.fromByteBuffer(response).getParsedPayload();
         // Parse this message to ensure sort is done.
-      } catch (IOException | InterruptedException e) {
-        throw new IOException(
-            "read shuffle file from DFS failed, filePath: "
-                + location.getStorageInfo().getFilePath(),
-            e);
+      } else {
+        streamHandler = pbStreamHandler;
       }
+    } catch (IOException | InterruptedException e) {
+      throw new IOException(
+          "read shuffle file from DFS failed, filePath: " + 
location.getStorageInfo().getFilePath(),
+          e);
+    }
 
+    if (endMapIndex != Integer.MAX_VALUE) {
       dfsInputStream =
           hadoopFs.open(new 
Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));
       chunkOffsets.addAll(
diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
 
b/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
index a769687c8..722bab100 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
@@ -74,6 +74,7 @@ public class LocalPartitionReader implements PartitionReader {
       CelebornConf conf,
       String shuffleKey,
       PartitionLocation location,
+      PbStreamHandler pbStreamHandler,
       TransportClientFactory clientFactory,
       int startMapIndex,
       int endMapIndex,
@@ -95,19 +96,23 @@ public class LocalPartitionReader implements 
PartitionReader {
     long fetchTimeoutMs = conf.clientFetchTimeoutMs();
     try {
       client = clientFactory.createClient(location.getHost(), 
location.getFetchPort(), 0);
-      TransportMessage openStreamMsg =
-          new TransportMessage(
-              MessageType.OPEN_STREAM,
-              PbOpenStream.newBuilder()
-                  .setShuffleKey(shuffleKey)
-                  .setFileName(location.getFileName())
-                  .setStartIndex(startMapIndex)
-                  .setEndIndex(endMapIndex)
-                  .setReadLocalShuffle(true)
-                  .build()
-                  .toByteArray());
-      ByteBuffer response = client.sendRpcSync(openStreamMsg.toByteBuffer(), 
fetchTimeoutMs);
-      streamHandler = 
TransportMessage.fromByteBuffer(response).getParsedPayload();
+      if (pbStreamHandler == null) {
+        TransportMessage openStreamMsg =
+            new TransportMessage(
+                MessageType.OPEN_STREAM,
+                PbOpenStream.newBuilder()
+                    .setShuffleKey(shuffleKey)
+                    .setFileName(location.getFileName())
+                    .setStartIndex(startMapIndex)
+                    .setEndIndex(endMapIndex)
+                    .setReadLocalShuffle(true)
+                    .build()
+                    .toByteArray());
+        ByteBuffer response = client.sendRpcSync(openStreamMsg.toByteBuffer(), 
fetchTimeoutMs);
+        streamHandler = 
TransportMessage.fromByteBuffer(response).getParsedPayload();
+      } else {
+        this.streamHandler = pbStreamHandler;
+      }
     } catch (IOException | InterruptedException e) {
       throw new IOException(
           "Read shuffle file from local file failed, partition location: "

Reply via email to