This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 043a20e85 [CELEBORN-1432] ShuffleClientImpl should invoke 
loadFileGroupInternal only once when using the reduce partition mode
043a20e85 is described below

commit 043a20e85cb23213d04143e5ce1c1343a383a355
Author: SteNicholas <[email protected]>
AuthorDate: Tue May 28 16:46:36 2024 +0800

    [CELEBORN-1432] ShuffleClientImpl should invoke loadFileGroupInternal only 
once when using the reduce partition mode
    
    ### What changes were proposed in this pull request?
    
    `ShuffleClientImpl` invokes `loadFileGroupInternal` only once when using 
the reduce partition mode.
    
    ### Why are the changes needed?
    
    `ShuffleClientImpl` may call `loadFileGroupInternal` multiple times when 
using reduce partition mode, which is not as expected. This bug was introduced 
in #2219.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    GA.
    
    Closes #2531 from SteNicholas/CELEBORN-1432.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../flink/readclient/FlinkShuffleClientImpl.java     |  4 +++-
 .../apache/celeborn/client/ShuffleClientImpl.java    | 20 ++++++++------------
 2 files changed, 11 insertions(+), 13 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
index 3d69613f8..79c659db5 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
@@ -174,7 +174,9 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
   public ReduceFileGroups updateFileGroup(int shuffleId, int partitionId)
       throws CelebornIOException {
     ReduceFileGroups reduceFileGroups =
-        reduceFileGroupsMap.computeIfAbsent(shuffleId, (id) -> new 
ReduceFileGroups());
+        reduceFileGroupsMap.computeIfAbsent(
+                shuffleId, (id) -> Tuple2.apply(new ReduceFileGroups(), null))
+            ._1;
     if (reduceFileGroups.partitionIds != null
         && reduceFileGroups.partitionIds.contains(partitionId)) {
       logger.debug(
diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index e44b8ba85..345caecdb 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -164,7 +164,7 @@ public class ShuffleClientImpl extends ShuffleClient {
   }
 
   // key: shuffleId
-  protected final Map<Integer, ReduceFileGroups> reduceFileGroupsMap =
+  protected final Map<Integer, Tuple2<ReduceFileGroups, String>> 
reduceFileGroupsMap =
       JavaUtils.newConcurrentHashMap();
 
   public ShuffleClientImpl(String appUniqueId, CelebornConf conf, 
UserIdentifier userIdentifier) {
@@ -1647,17 +1647,13 @@ public class ShuffleClientImpl extends ShuffleClient {
 
   public ReduceFileGroups updateFileGroup(int shuffleId, int partitionId)
       throws CelebornIOException {
-    if (reduceFileGroupsMap.containsKey(shuffleId)) {
-      return reduceFileGroupsMap.get(shuffleId);
+    Tuple2<ReduceFileGroups, String> fileGroupTuple =
+        reduceFileGroupsMap.computeIfAbsent(shuffleId, (id) -> 
loadFileGroupInternal(shuffleId));
+    if (fileGroupTuple._1 == null) {
+      throw new CelebornIOException(
+          loadFileGroupException(shuffleId, partitionId, (fileGroupTuple._2)));
     } else {
-      Tuple2<ReduceFileGroups, String> fileGroups = 
loadFileGroupInternal(shuffleId);
-      ReduceFileGroups newGroups = fileGroups._1;
-      if (newGroups == null) {
-        throw new CelebornIOException(
-            loadFileGroupException(shuffleId, partitionId, fileGroups._2));
-      }
-      reduceFileGroupsMap.put(shuffleId, newGroups);
-      return newGroups;
+      return fileGroupTuple._1;
     }
   }
 
@@ -1726,7 +1722,7 @@ public class ShuffleClientImpl extends ShuffleClient {
   }
 
   @VisibleForTesting
-  public Map<Integer, ReduceFileGroups> getReduceFileGroupsMap() {
+  public Map<Integer, Tuple2<ReduceFileGroups, String>> 
getReduceFileGroupsMap() {
     return reduceFileGroupsMap;
   }
 

Reply via email to