This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 043a20e85 [CELEBORN-1432] ShuffleClientImpl should invoke
loadFileGroupInternal only once when using the reduce partition mode
043a20e85 is described below
commit 043a20e85cb23213d04143e5ce1c1343a383a355
Author: SteNicholas <[email protected]>
AuthorDate: Tue May 28 16:46:36 2024 +0800
[CELEBORN-1432] ShuffleClientImpl should invoke loadFileGroupInternal only
once when using the reduce partition mode
### What changes were proposed in this pull request?
`ShuffleClientImpl` invokes `loadFileGroupInternal` only once when using
the reduce partition mode.
### Why are the changes needed?
`ShuffleClientImpl` may call `loadFileGroupInternal` multiple times when
using reduce partition mode, which is not as expected. This bug was introduced
in #2219.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA.
Closes #2531 from SteNicholas/CELEBORN-1432.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../flink/readclient/FlinkShuffleClientImpl.java | 4 +++-
.../apache/celeborn/client/ShuffleClientImpl.java | 20 ++++++++------------
2 files changed, 11 insertions(+), 13 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
index 3d69613f8..79c659db5 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
@@ -174,7 +174,9 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
public ReduceFileGroups updateFileGroup(int shuffleId, int partitionId)
throws CelebornIOException {
ReduceFileGroups reduceFileGroups =
- reduceFileGroupsMap.computeIfAbsent(shuffleId, (id) -> new
ReduceFileGroups());
+ reduceFileGroupsMap.computeIfAbsent(
+ shuffleId, (id) -> Tuple2.apply(new ReduceFileGroups(), null))
+ ._1;
if (reduceFileGroups.partitionIds != null
&& reduceFileGroups.partitionIds.contains(partitionId)) {
logger.debug(
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index e44b8ba85..345caecdb 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -164,7 +164,7 @@ public class ShuffleClientImpl extends ShuffleClient {
}
// key: shuffleId
- protected final Map<Integer, ReduceFileGroups> reduceFileGroupsMap =
+ protected final Map<Integer, Tuple2<ReduceFileGroups, String>>
reduceFileGroupsMap =
JavaUtils.newConcurrentHashMap();
public ShuffleClientImpl(String appUniqueId, CelebornConf conf,
UserIdentifier userIdentifier) {
@@ -1647,17 +1647,13 @@ public class ShuffleClientImpl extends ShuffleClient {
public ReduceFileGroups updateFileGroup(int shuffleId, int partitionId)
throws CelebornIOException {
- if (reduceFileGroupsMap.containsKey(shuffleId)) {
- return reduceFileGroupsMap.get(shuffleId);
+ Tuple2<ReduceFileGroups, String> fileGroupTuple =
+ reduceFileGroupsMap.computeIfAbsent(shuffleId, (id) ->
loadFileGroupInternal(shuffleId));
+ if (fileGroupTuple._1 == null) {
+ throw new CelebornIOException(
+ loadFileGroupException(shuffleId, partitionId, (fileGroupTuple._2)));
} else {
- Tuple2<ReduceFileGroups, String> fileGroups =
loadFileGroupInternal(shuffleId);
- ReduceFileGroups newGroups = fileGroups._1;
- if (newGroups == null) {
- throw new CelebornIOException(
- loadFileGroupException(shuffleId, partitionId, fileGroups._2));
- }
- reduceFileGroupsMap.put(shuffleId, newGroups);
- return newGroups;
+ return fileGroupTuple._1;
}
}
@@ -1726,7 +1722,7 @@ public class ShuffleClientImpl extends ShuffleClient {
}
@VisibleForTesting
- public Map<Integer, ReduceFileGroups> getReduceFileGroupsMap() {
+ public Map<Integer, Tuple2<ReduceFileGroups, String>>
getReduceFileGroupsMap() {
return reduceFileGroupsMap;
}