This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 6a836f952 [CELEBORN-1859] DfsPartitionReader and LocalPartitionReader
should reuse pbStreamHandlers get from BatchOpenStream request
6a836f952 is described below
commit 6a836f9523b7eb03d3eff4bf1c23349bf8270064
Author: wuziyi <[email protected]>
AuthorDate: Mon Feb 17 09:46:46 2025 +0800
[CELEBORN-1859] DfsPartitionReader and LocalPartitionReader should reuse
pbStreamHandlers get from BatchOpenStream request
### What changes were proposed in this pull request?
DfsPartitionReader and LocalPartitionReader should reuse pbStreamHandlers
get from BatchOpenStream request like WorkerPartitionReader instead of sending
another OpenStream Request.
### Why are the changes needed?
Reduce unnecessary rpc requests
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Test manually in test cluster
Closes #3093 from Z1Wu/fix/local_dfs_reuse_steam_handler.
Authored-by: wuziyi <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../celeborn/client/read/CelebornInputStream.java | 18 +++++++++++--
.../celeborn/client/read/DfsPartitionReader.java | 22 ++++++++-------
.../celeborn/client/read/LocalPartitionReader.java | 31 +++++++++++++---------
3 files changed, 47 insertions(+), 24 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index 8606585bc..e525a135a 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -454,7 +454,14 @@ public abstract class CelebornInputStream extends
InputStream {
logger.debug("Read local shuffle file {}", localHostAddress);
containLocalRead = true;
return new LocalPartitionReader(
- conf, shuffleKey, location, clientFactory, startMapIndex,
endMapIndex, callback);
+ conf,
+ shuffleKey,
+ location,
+ pbStreamHandler,
+ clientFactory,
+ startMapIndex,
+ endMapIndex,
+ callback);
} else {
return new WorkerPartitionReader(
conf,
@@ -471,7 +478,14 @@ public abstract class CelebornInputStream extends
InputStream {
case S3:
case HDFS:
return new DfsPartitionReader(
- conf, shuffleKey, location, clientFactory, startMapIndex,
endMapIndex, callback);
+ conf,
+ shuffleKey,
+ location,
+ pbStreamHandler,
+ clientFactory,
+ startMapIndex,
+ endMapIndex,
+ callback);
default:
throw new CelebornIOException(
String.format("Unknown storage info %s to read location %s",
storageInfo, location));
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index b69cf580f..313fe77d1 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -76,6 +76,7 @@ public class DfsPartitionReader implements PartitionReader {
CelebornConf conf,
String shuffleKey,
PartitionLocation location,
+ PbStreamHandler pbStreamHandler,
TransportClientFactory clientFactory,
int startMapIndex,
int endMapIndex,
@@ -95,10 +96,10 @@ public class DfsPartitionReader implements PartitionReader {
this.hadoopFs =
ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.HDFS);
}
- if (endMapIndex != Integer.MAX_VALUE) {
- long fetchTimeoutMs = conf.clientFetchTimeoutMs();
- try {
- client = clientFactory.createClient(location.getHost(),
location.getFetchPort());
+ long fetchTimeoutMs = conf.clientFetchTimeoutMs();
+ try {
+ client = clientFactory.createClient(location.getHost(),
location.getFetchPort());
+ if (pbStreamHandler == null) {
TransportMessage openStream =
new TransportMessage(
MessageType.OPEN_STREAM,
@@ -112,13 +113,16 @@ public class DfsPartitionReader implements
PartitionReader {
ByteBuffer response = client.sendRpcSync(openStream.toByteBuffer(),
fetchTimeoutMs);
streamHandler =
TransportMessage.fromByteBuffer(response).getParsedPayload();
// Parse this message to ensure sort is done.
- } catch (IOException | InterruptedException e) {
- throw new IOException(
- "read shuffle file from DFS failed, filePath: "
- + location.getStorageInfo().getFilePath(),
- e);
+ } else {
+ streamHandler = pbStreamHandler;
}
+ } catch (IOException | InterruptedException e) {
+ throw new IOException(
+ "read shuffle file from DFS failed, filePath: " +
location.getStorageInfo().getFilePath(),
+ e);
+ }
+ if (endMapIndex != Integer.MAX_VALUE) {
dfsInputStream =
hadoopFs.open(new
Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));
chunkOffsets.addAll(
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
b/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
index a769687c8..722bab100 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java
@@ -74,6 +74,7 @@ public class LocalPartitionReader implements PartitionReader {
CelebornConf conf,
String shuffleKey,
PartitionLocation location,
+ PbStreamHandler pbStreamHandler,
TransportClientFactory clientFactory,
int startMapIndex,
int endMapIndex,
@@ -95,19 +96,23 @@ public class LocalPartitionReader implements
PartitionReader {
long fetchTimeoutMs = conf.clientFetchTimeoutMs();
try {
client = clientFactory.createClient(location.getHost(),
location.getFetchPort(), 0);
- TransportMessage openStreamMsg =
- new TransportMessage(
- MessageType.OPEN_STREAM,
- PbOpenStream.newBuilder()
- .setShuffleKey(shuffleKey)
- .setFileName(location.getFileName())
- .setStartIndex(startMapIndex)
- .setEndIndex(endMapIndex)
- .setReadLocalShuffle(true)
- .build()
- .toByteArray());
- ByteBuffer response = client.sendRpcSync(openStreamMsg.toByteBuffer(),
fetchTimeoutMs);
- streamHandler =
TransportMessage.fromByteBuffer(response).getParsedPayload();
+ if (pbStreamHandler == null) {
+ TransportMessage openStreamMsg =
+ new TransportMessage(
+ MessageType.OPEN_STREAM,
+ PbOpenStream.newBuilder()
+ .setShuffleKey(shuffleKey)
+ .setFileName(location.getFileName())
+ .setStartIndex(startMapIndex)
+ .setEndIndex(endMapIndex)
+ .setReadLocalShuffle(true)
+ .build()
+ .toByteArray());
+ ByteBuffer response = client.sendRpcSync(openStreamMsg.toByteBuffer(),
fetchTimeoutMs);
+ streamHandler =
TransportMessage.fromByteBuffer(response).getParsedPayload();
+ } else {
+ this.streamHandler = pbStreamHandler;
+ }
} catch (IOException | InterruptedException e) {
throw new IOException(
"Read shuffle file from local file failed, partition location: "