This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 809c76a2e [CELEBORN-718] Decrease RemainingReviveTimes regardless 
worker is excluded or not
809c76a2e is described below

commit 809c76a2e4fad4499dc27328bc00e0129bd69f69
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Tue Jun 27 15:21:09 2023 +0800

    [CELEBORN-718] Decrease RemainingReviveTimes regardless worker is excluded 
or not
    
    …s excluded or not
    
    ### What changes were proposed in this pull request?
    This PR makes ReviveTimes decrease regardless of the partition location is 
excluded or not.
    
    ### Why are the changes needed?
    In such testing setup:
    
    - 3 Celeborn workers
    - Client side blacklist enabled 
```spark.celeborn.client.push.blacklist.enabled=true```
    - Replication is on ```spark.celeborn.client.push.replicate.enabled=true```
    - Successively kill 2 workers
    
    I expect the task fail because of revive failure (When replication is on, 
we need at least 2 workers), but in stead
    the tasks hang forever. When digging into the logs I found the ```remain 
revive times``` does not decrease, leading
    to infinite revive loop.
    ```
    23/06/27 14:00:57 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    23/06/27 14:01:01 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    23/06/27 14:01:05 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    23/06/27 14:01:09 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    23/06/27 14:01:13 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    23/06/27 14:01:17 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    23/06/27 14:01:21 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    23/06/27 14:01:25 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    23/06/27 14:01:29 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    23/06/27 14:01:33 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    23/06/27 14:01:37 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    23/06/27 14:01:41 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    23/06/27 14:01:45 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    23/06/27 14:01:49 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    23/06/27 14:01:53 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    23/06/27 14:01:57 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    23/06/27 14:02:01 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    23/06/27 14:02:05 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    23/06/27 14:02:09 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    23/06/27 14:02:13 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for 
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
    ```
    
    The reason is before this PR, the revive times will not decrease if the 
partition location is excluded, which I don't see a
    reason for that.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Manually test.
    
    Closes #1628 from waitinfuture/718.
    
    Authored-by: zky.zhoukeyong <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../org/apache/celeborn/client/ShuffleClientImpl.java  | 18 ++----------------
 1 file changed, 2 insertions(+), 16 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 1dc7605cd..61e42714e 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -845,10 +845,7 @@ public class ShuffleClientImpl extends ShuffleClient {
                   e);
               // async retry push data
               if (!mapperEnded(shuffleId, mapId)) {
-                // For blacklisted partition location, Celeborn should not use 
retry quota.
-                if (!pushStatusIsBlacklisted(cause)) {
-                  remainReviveTimes = remainReviveTimes - 1;
-                }
+                remainReviveTimes = remainReviveTimes - 1;
                 pushDataRetryPool.submit(
                     () ->
                         submitRetryPushData(
@@ -1211,12 +1208,6 @@ public class ShuffleClientImpl extends ShuffleClient {
                 remainReviveTimes,
                 e);
             if (!mapperEnded(shuffleId, mapId)) {
-              int tmpRemainReviveTimes = remainReviveTimes;
-              // For blacklisted partition location, Celeborn should not use 
retry quota.
-              if (!pushStatusIsBlacklisted(cause)) {
-                tmpRemainReviveTimes = tmpRemainReviveTimes - 1;
-              }
-              int finalRemainReviveTimes = tmpRemainReviveTimes;
               pushDataRetryPool.submit(
                   () ->
                       submitRetryPushMergedData(
@@ -1227,7 +1218,7 @@ public class ShuffleClientImpl extends ShuffleClient {
                           batches,
                           cause,
                           groupedBatchId,
-                          finalRemainReviveTimes));
+                          remainReviveTimes - 1));
             } else {
               pushState.removeBatch(groupedBatchId, hostPort);
               logger.info(
@@ -1479,11 +1470,6 @@ public class ShuffleClientImpl extends ShuffleClient {
     return stageEndShuffleSet.contains(shuffleId);
   }
 
-  private boolean pushStatusIsBlacklisted(StatusCode cause) {
-    return cause == StatusCode.PUSH_DATA_MASTER_BLACKLISTED
-        || cause == StatusCode.PUSH_DATA_SLAVE_BLACKLISTED;
-  }
-
   private StatusCode getPushDataFailCause(String message) {
     logger.debug("Push data failed cause message: " + message);
     StatusCode cause;

Reply via email to