This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new ef30c2391 [CELEBORN-1858] Support DfsPartitionReader read partition by 
chunkOffsets when enable optimize skew partition read
ef30c2391 is described below

commit ef30c2391699461413bd99c4cfcc4473781d10ad
Author: wuziyi <[email protected]>
AuthorDate: Wed Feb 26 23:15:34 2025 +0800

    [CELEBORN-1858] Support DfsPartitionReader read partition by chunkOffsets 
when enable optimize skew partition read
    
    ### What changes were proposed in this pull request?
    
    Support DfsPartitionReader read partition by chunkOffsets when enable 
optimize skew partition read
    
    ### Why are the changes needed?
    
    Follow-up of https://github.com/apache/celeborn/pull/2373
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    test in test-cluster
    
    Closes #3115 from Z1Wu/feat/c1858.
    
    Authored-by: wuziyi <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../celeborn/client/read/CelebornInputStream.java  |  4 +-
 .../celeborn/client/read/DfsPartitionReader.java   | 46 +++++++++++++---------
 2 files changed, 31 insertions(+), 19 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index ab2afcf73..0b9434a3b 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -591,7 +591,9 @@ public abstract class CelebornInputStream extends 
InputStream {
               clientFactory,
               startMapIndex,
               endMapIndex,
-              callback);
+              callback,
+              startChunkIndex,
+              endChunkIndex);
         default:
           throw new CelebornIOException(
               String.format("Unknown storage info %s to read location %s", 
storageInfo, location));
diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 313fe77d1..6cca0d47b 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -66,12 +66,16 @@ public class DfsPartitionReader implements PartitionReader {
   private int numChunks = 0;
   private int returnedChunks = 0;
   private int currentChunkIndex = 0;
+  private int startChunkIndex;
+  private int endChunkIndex;
   private final List<Long> chunkOffsets = new ArrayList<>();
   private TransportClient client;
   private PbStreamHandler streamHandler;
   private MetricsCallback metricsCallback;
   private FileSystem hadoopFs;
 
+  private Path dataFilePath;
+
   public DfsPartitionReader(
       CelebornConf conf,
       String shuffleKey,
@@ -80,7 +84,9 @@ public class DfsPartitionReader implements PartitionReader {
       TransportClientFactory clientFactory,
       int startMapIndex,
       int endMapIndex,
-      MetricsCallback metricsCallback)
+      MetricsCallback metricsCallback,
+      int startChunkIndex,
+      int endChunkIndex)
       throws IOException {
     this.conf = conf;
     shuffleChunkSize = conf.dfsReadChunkSize();
@@ -121,23 +127,33 @@ public class DfsPartitionReader implements 
PartitionReader {
           "read shuffle file from DFS failed, filePath: " + 
location.getStorageInfo().getFilePath(),
           e);
     }
-
-    if (endMapIndex != Integer.MAX_VALUE) {
-      dfsInputStream =
-          hadoopFs.open(new 
Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));
+    if (endMapIndex != Integer.MAX_VALUE && endMapIndex != -1) {
+      dataFilePath = new 
Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath()));
+      dfsInputStream = hadoopFs.open(dataFilePath);
       chunkOffsets.addAll(
           getChunkOffsetsFromSortedIndex(conf, location, startMapIndex, 
endMapIndex));
     } else {
-      dfsInputStream = hadoopFs.open(new 
Path(location.getStorageInfo().getFilePath()));
+      dataFilePath = new Path(location.getStorageInfo().getFilePath());
+      dfsInputStream = hadoopFs.open(dataFilePath);
       chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location));
     }
+    this.startChunkIndex = startChunkIndex == -1 ? 0 : startChunkIndex;
+    this.endChunkIndex =
+        endChunkIndex == -1
+            ? chunkOffsets.size() - 2
+            : Math.min(chunkOffsets.size() - 2, endChunkIndex);
+    this.currentChunkIndex = this.startChunkIndex;
+    this.numChunks = this.endChunkIndex - this.startChunkIndex + 1;
     logger.debug(
-        "DFS {} index count:{} offsets:{}",
+        "DFS {} total offset count:{} chunk count: {} "
+            + "start chunk index:{} end chunk index:{} offsets:{}",
         location.getStorageInfo().getFilePath(),
         chunkOffsets.size(),
+        this.numChunks,
+        this.startChunkIndex,
+        this.endChunkIndex,
         chunkOffsets);
-    if (chunkOffsets.size() > 1) {
-      numChunks = chunkOffsets.size() - 1;
+    if (this.numChunks > 0) {
       fetchThread =
           ThreadUtils.newDaemonSingleThreadExecutor(
               "celeborn-client-dfs-partition-fetcher" + 
location.getStorageInfo().getFilePath());
@@ -197,7 +213,7 @@ public class DfsPartitionReader implements PartitionReader {
       fetchThread.submit(
           () -> {
             try {
-              while (!closed && currentChunkIndex < numChunks) {
+              while (!closed && currentChunkIndex <= endChunkIndex) {
                 while (results.size() >= fetchMaxReqsInFlight) {
                   Thread.sleep(50);
                 }
@@ -208,16 +224,10 @@ public class DfsPartitionReader implements 
PartitionReader {
                 try {
                   dfsInputStream.readFully(offset, buffer);
                 } catch (IOException e) {
-                  logger.warn(
-                      "read DFS {} failed will retry, error detail {}",
-                      location.getStorageInfo().getFilePath(),
-                      e);
+                  logger.warn("read DFS {} failed will retry, error detail 
{}", dataFilePath, e);
                   try {
                     dfsInputStream.close();
-                    dfsInputStream =
-                        hadoopFs.open(
-                            new Path(
-                                
Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));
+                    dfsInputStream = hadoopFs.open(dataFilePath);
                     dfsInputStream.readFully(offset, buffer);
                   } catch (IOException ex) {
                     logger.warn(

Reply via email to