This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new ef30c2391 [CELEBORN-1858] Support DfsPartitionReader read partition by
chunkOffsets when enable optimize skew partition read
ef30c2391 is described below
commit ef30c2391699461413bd99c4cfcc4473781d10ad
Author: wuziyi <[email protected]>
AuthorDate: Wed Feb 26 23:15:34 2025 +0800
[CELEBORN-1858] Support DfsPartitionReader read partition by chunkOffsets
when enable optimize skew partition read
### What changes were proposed in this pull request?
Support DfsPartitionReader read partition by chunkOffsets when enable
optimize skew partition read
### Why are the changes needed?
Follow-up of https://github.com/apache/celeborn/pull/2373
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
test in test-cluster
Closes #3115 from Z1Wu/feat/c1858.
Authored-by: wuziyi <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../celeborn/client/read/CelebornInputStream.java | 4 +-
.../celeborn/client/read/DfsPartitionReader.java | 46 +++++++++++++---------
2 files changed, 31 insertions(+), 19 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index ab2afcf73..0b9434a3b 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -591,7 +591,9 @@ public abstract class CelebornInputStream extends
InputStream {
clientFactory,
startMapIndex,
endMapIndex,
- callback);
+ callback,
+ startChunkIndex,
+ endChunkIndex);
default:
throw new CelebornIOException(
String.format("Unknown storage info %s to read location %s",
storageInfo, location));
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 313fe77d1..6cca0d47b 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -66,12 +66,16 @@ public class DfsPartitionReader implements PartitionReader {
private int numChunks = 0;
private int returnedChunks = 0;
private int currentChunkIndex = 0;
+ private int startChunkIndex;
+ private int endChunkIndex;
private final List<Long> chunkOffsets = new ArrayList<>();
private TransportClient client;
private PbStreamHandler streamHandler;
private MetricsCallback metricsCallback;
private FileSystem hadoopFs;
+ private Path dataFilePath;
+
public DfsPartitionReader(
CelebornConf conf,
String shuffleKey,
@@ -80,7 +84,9 @@ public class DfsPartitionReader implements PartitionReader {
TransportClientFactory clientFactory,
int startMapIndex,
int endMapIndex,
- MetricsCallback metricsCallback)
+ MetricsCallback metricsCallback,
+ int startChunkIndex,
+ int endChunkIndex)
throws IOException {
this.conf = conf;
shuffleChunkSize = conf.dfsReadChunkSize();
@@ -121,23 +127,33 @@ public class DfsPartitionReader implements
PartitionReader {
"read shuffle file from DFS failed, filePath: " +
location.getStorageInfo().getFilePath(),
e);
}
-
- if (endMapIndex != Integer.MAX_VALUE) {
- dfsInputStream =
- hadoopFs.open(new
Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));
+ if (endMapIndex != Integer.MAX_VALUE && endMapIndex != -1) {
+ dataFilePath = new
Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath()));
+ dfsInputStream = hadoopFs.open(dataFilePath);
chunkOffsets.addAll(
getChunkOffsetsFromSortedIndex(conf, location, startMapIndex,
endMapIndex));
} else {
- dfsInputStream = hadoopFs.open(new
Path(location.getStorageInfo().getFilePath()));
+ dataFilePath = new Path(location.getStorageInfo().getFilePath());
+ dfsInputStream = hadoopFs.open(dataFilePath);
chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location));
}
+ this.startChunkIndex = startChunkIndex == -1 ? 0 : startChunkIndex;
+ this.endChunkIndex =
+ endChunkIndex == -1
+ ? chunkOffsets.size() - 2
+ : Math.min(chunkOffsets.size() - 2, endChunkIndex);
+ this.currentChunkIndex = this.startChunkIndex;
+ this.numChunks = this.endChunkIndex - this.startChunkIndex + 1;
logger.debug(
- "DFS {} index count:{} offsets:{}",
+ "DFS {} total offset count:{} chunk count: {} "
+ + "start chunk index:{} end chunk index:{} offsets:{}",
location.getStorageInfo().getFilePath(),
chunkOffsets.size(),
+ this.numChunks,
+ this.startChunkIndex,
+ this.endChunkIndex,
chunkOffsets);
- if (chunkOffsets.size() > 1) {
- numChunks = chunkOffsets.size() - 1;
+ if (this.numChunks > 0) {
fetchThread =
ThreadUtils.newDaemonSingleThreadExecutor(
"celeborn-client-dfs-partition-fetcher" +
location.getStorageInfo().getFilePath());
@@ -197,7 +213,7 @@ public class DfsPartitionReader implements PartitionReader {
fetchThread.submit(
() -> {
try {
- while (!closed && currentChunkIndex < numChunks) {
+ while (!closed && currentChunkIndex <= endChunkIndex) {
while (results.size() >= fetchMaxReqsInFlight) {
Thread.sleep(50);
}
@@ -208,16 +224,10 @@ public class DfsPartitionReader implements
PartitionReader {
try {
dfsInputStream.readFully(offset, buffer);
} catch (IOException e) {
- logger.warn(
- "read DFS {} failed will retry, error detail {}",
- location.getStorageInfo().getFilePath(),
- e);
+ logger.warn("read DFS {} failed will retry, error detail
{}", dataFilePath, e);
try {
dfsInputStream.close();
- dfsInputStream =
- hadoopFs.open(
- new Path(
-
Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));
+ dfsInputStream = hadoopFs.open(dataFilePath);
dfsInputStream.readFully(offset, buffer);
} catch (IOException ex) {
logger.warn(