This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 47f66a87a [CELEBORN-678] ShuffleClientImpl::mapperEnded should not 
consider attemptId
47f66a87a is described below

commit 47f66a87a1f6021839b7cde8420c63f1df6675e2
Author: onebox-li <[email protected]>
AuthorDate: Wed Jun 14 21:01:06 2023 +0800

    [CELEBORN-678] ShuffleClientImpl::mapperEnded should not consider attemptId
    
    ### What changes were proposed in this pull request?
    ShuffleClientImpl::mapperEnded should not consider attemptId, speculation 
tasks will update attemptId.
    
    ### Why are the changes needed?
    ditto
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Cluster test
    
    Closes #1591 from onebox-li/fix-mapend.
    
    Authored-by: onebox-li <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../flink/readclient/FlinkShuffleClientImpl.java   | 57 ++--------------------
 .../apache/celeborn/client/ShuffleClientImpl.java  | 42 ++++++++--------
 2 files changed, 28 insertions(+), 71 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
index ca1479637..5d2d1a081 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
@@ -206,20 +206,6 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
     // mapKey
     final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
     final String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
-    // return if shuffle stage already ended
-    if (mapperEnded(shuffleId, mapId, attemptId)) {
-      logger.info(
-          "Push data byteBuf to location {} ignored because mapper already 
ended for shuffle {} map {} attempt {}.",
-          location.hostAndPushPort(),
-          shuffleId,
-          mapId,
-          attemptId);
-      PushState pushState = pushStates.get(mapKey);
-      if (pushState != null) {
-        pushState.cleanup();
-      }
-      return 0;
-    }
 
     PushState pushState = getPushState(mapKey);
 
@@ -258,14 +244,6 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
           @Override
           public void onSuccess(ByteBuffer response) {
             pushState.removeBatch(nextBatchId, location.hostAndPushPort());
-            if (response.remaining() > 0) {
-              byte reason = response.get();
-              if (reason == StatusCode.STAGE_ENDED.getValue()) {
-                mapperEndMap
-                    .computeIfAbsent(shuffleId, (id) -> 
ConcurrentHashMap.newKeySet())
-                    .add(mapKey);
-              }
-            }
             logger.debug(
                 "Push data byteBuf to {} success for shuffle {} map {} 
attemptId {} batch {}.",
                 location.hostAndPushPort(),
@@ -281,21 +259,11 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
             if (pushState.exception.get() != null) {
               return;
             }
-            if (!mapperEnded(shuffleId, mapId, attemptId)) {
-              String errorMsg =
-                  String.format(
-                      "Push data byteBuf to %s failed for shuffle %d map %d 
attempt %d batch %d.",
-                      location.hostAndPushPort(), shuffleId, mapId, attemptId, 
nextBatchId);
-              pushState.exception.compareAndSet(null, new 
CelebornIOException(errorMsg, e));
-            } else {
-              logger.warn(
-                  "Push data to {} failed but mapper already ended for shuffle 
{} map {} attempt {} batch {}.",
-                  location.hostAndPushPort(),
-                  shuffleId,
-                  mapId,
-                  attemptId,
-                  nextBatchId);
-            }
+            String errorMsg =
+                String.format(
+                    "Push data byteBuf to %s failed for shuffle %d map %d 
attempt %d batch %d.",
+                    location.hostAndPushPort(), shuffleId, mapId, attemptId, 
nextBatchId);
+            pushState.exception.compareAndSet(null, new 
CelebornIOException(errorMsg, e));
           }
         };
     // do push data
@@ -431,11 +399,6 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
             StatusCode respStatus = Utils.toStatusCode(response.getStatus());
             if (StatusCode.SUCCESS.equals(respStatus)) {
               return 
Optional.of(PbSerDeUtils.fromPbPartitionLocation(response.getLocation()));
-            } else if (StatusCode.MAP_ENDED.equals(respStatus)) {
-              mapperEndMap
-                  .computeIfAbsent(shuffleId, (id) -> 
ConcurrentHashMap.newKeySet())
-                  .add(mapKey);
-              return Optional.empty();
             } else {
               // throw exception
               logger.error(
@@ -492,16 +455,6 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
     try {
       // mapKey
       final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
-      // return if shuffle stage already ended
-      if (mapperEnded(shuffleId, mapId, attemptId)) {
-        logger.debug(
-            "Send message to {} ignored because mapper already ended for 
shuffle {} map {} attempt {}.",
-            location.hostAndPushPort(),
-            shuffleId,
-            mapId,
-            attemptId);
-        return null;
-      }
       pushState = getPushState(mapKey);
 
       // add inFlight requests
diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index a2a381dea..799ba6a75 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -89,9 +89,13 @@ public class ShuffleClientImpl extends ShuffleClient {
   private final Map<Integer, ConcurrentHashMap<Integer, PartitionLocation>> 
reducePartitionMap =
       JavaUtils.newConcurrentHashMap();
 
-  protected final ConcurrentHashMap<Integer, Set<String>> mapperEndMap =
+  // key: shuffleId, value: Set(mapId)
+  protected final ConcurrentHashMap<Integer, Set<Integer>> mapperEndMap =
       JavaUtils.newConcurrentHashMap();
 
+  // shuffleIds which have finished all map tasks
+  protected final Set<Integer> stageEndShuffleSet = 
ConcurrentHashMap.newKeySet();
+
   // key: shuffleId-mapId-attemptId
   protected final Map<String, PushState> pushStates = 
JavaUtils.newConcurrentHashMap();
 
@@ -212,7 +216,7 @@ public class ShuffleClientImpl extends ShuffleClient {
         applicationId, shuffleId, mapId, attemptId, partitionId, 
loc.getEpoch(), loc, cause)) {
       wrappedCallback.onFailure(
           new CelebornIOException(cause + " then revive but " + 
StatusCode.REVIVE_FAILED));
-    } else if (mapperEnded(shuffleId, mapId, attemptId)) {
+    } else if (mapperEnded(shuffleId, mapId)) {
       logger.debug(
           "Revive for push data success, but the mapper already ended for 
shuffle {} map {} attempt {} partition {} batch {} location {}.",
           shuffleId,
@@ -304,7 +308,7 @@ public class ShuffleClientImpl extends ShuffleClient {
                   new CelebornIOException(cause + " then revive but " + 
StatusCode.REVIVE_FAILED)));
           return;
         }
-      } else if (mapperEnded(shuffleId, mapId, attemptId)) {
+      } else if (mapperEnded(shuffleId, mapId)) {
         logger.debug(
             "Revive for push merged data success, but the mapper already ended 
for shuffle {} map {} attempt {} partition {} batch {}.",
             shuffleId,
@@ -557,8 +561,7 @@ public class ShuffleClientImpl extends ShuffleClient {
           epoch);
       return true;
     }
-    String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
-    if (mapperEnded(shuffleId, mapId, attemptId)) {
+    if (mapperEnded(shuffleId, mapId)) {
       logger.debug(
           "Revive success, but the mapper ended for shuffle {} map {} attempt 
{} partition {}, just return true(Assume revive successfully).",
           shuffleId,
@@ -594,7 +597,7 @@ public class ShuffleClientImpl extends ShuffleClient {
             mapId,
             attemptId,
             partitionId);
-        mapperEndMap.computeIfAbsent(shuffleId, (id) -> 
ConcurrentHashMap.newKeySet()).add(mapKey);
+        mapperEndMap.computeIfAbsent(shuffleId, (id) -> 
ConcurrentHashMap.newKeySet()).add(mapId);
         return true;
       } else {
         return false;
@@ -629,7 +632,7 @@ public class ShuffleClientImpl extends ShuffleClient {
     final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
     final String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
     // return if shuffle stage already ended
-    if (mapperEnded(shuffleId, mapId, attemptId)) {
+    if (mapperEnded(shuffleId, mapId)) {
       logger.debug(
           "Push or merge data ignored because mapper already ended for shuffle 
{} map {} attempt {} partition {}.",
           shuffleId,
@@ -668,7 +671,7 @@ public class ShuffleClientImpl extends ShuffleClient {
       }
     }
 
-    if (mapperEnded(shuffleId, mapId, attemptId)) {
+    if (mapperEnded(shuffleId, mapId)) {
       logger.debug(
           "Push or merge data ignored because mapper already ended for shuffle 
{} map {} attempt {} partition {}.",
           shuffleId,
@@ -728,9 +731,7 @@ public class ShuffleClientImpl extends ShuffleClient {
               // TODO Need to adjust maxReqsInFlight if server response is 
congested, see
               // CELEBORN-62
               if (response.remaining() > 0 && response.get() == 
StatusCode.STAGE_ENDED.getValue()) {
-                mapperEndMap
-                    .computeIfAbsent(shuffleId, (id) -> 
ConcurrentHashMap.newKeySet())
-                    .add(mapKey);
+                stageEndShuffleSet.add(shuffleId);
               }
               logger.debug(
                   "Push data to {} success for shuffle {} map {} attempt {} 
partition {} batch {}.",
@@ -857,7 +858,7 @@ public class ShuffleClientImpl extends ShuffleClient {
                   remainReviveTimes,
                   e);
               // async retry push data
-              if (!mapperEnded(shuffleId, mapId, attemptId)) {
+              if (!mapperEnded(shuffleId, mapId)) {
                 // For blacklisted partition location, Celeborn should not use 
retry quota.
                 if (!pushStatusIsBlacklisted(cause)) {
                   remainReviveTimes = remainReviveTimes - 1;
@@ -1115,9 +1116,7 @@ public class ShuffleClientImpl extends ShuffleClient {
             pushState.removeBatch(groupedBatchId, hostPort);
             // TODO Need to adjust maxReqsInFlight if server response is 
congested, see CELEBORN-62
             if (response.remaining() > 0 && response.get() == 
StatusCode.STAGE_ENDED.getValue()) {
-              mapperEndMap
-                  .computeIfAbsent(shuffleId, (id) -> 
ConcurrentHashMap.newKeySet())
-                  .add(Utils.makeMapKey(shuffleId, mapId, attemptId));
+              stageEndShuffleSet.add(shuffleId);
             }
           }
 
@@ -1242,7 +1241,7 @@ public class ShuffleClientImpl extends ShuffleClient {
                 Arrays.toString(batchIds),
                 remainReviveTimes,
                 e);
-            if (!mapperEnded(shuffleId, mapId, attemptId)) {
+            if (!mapperEnded(shuffleId, mapId)) {
               int tmpRemainReviveTimes = remainReviveTimes;
               // For blacklisted partition location, Celeborn should not use 
retry quota.
               if (!pushStatusIsBlacklisted(cause)) {
@@ -1379,6 +1378,7 @@ public class ShuffleClientImpl extends ShuffleClient {
     reducePartitionMap.remove(shuffleId);
     reduceFileGroupsMap.remove(shuffleId);
     mapperEndMap.remove(shuffleId);
+    stageEndShuffleSet.remove(shuffleId);
     splitting.remove(shuffleId);
 
     logger.info("Unregistered shuffle {}.", shuffleId);
@@ -1525,9 +1525,13 @@ public class ShuffleClientImpl extends ShuffleClient {
     driverRssMetaService = endpointRef;
   }
 
-  protected boolean mapperEnded(int shuffleId, int mapId, int attemptId) {
-    return mapperEndMap.containsKey(shuffleId)
-        && mapperEndMap.get(shuffleId).contains(Utils.makeMapKey(shuffleId, 
mapId, attemptId));
+  protected boolean mapperEnded(int shuffleId, int mapId) {
+    return (mapperEndMap.containsKey(shuffleId) && 
mapperEndMap.get(shuffleId).contains(mapId))
+        || stageEnded(shuffleId);
+  }
+
+  protected boolean stageEnded(int shuffleId) {
+    return stageEndShuffleSet.contains(shuffleId);
   }
 
   private boolean pushStatusIsBlacklisted(StatusCode cause) {

Reply via email to