This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 809c76a2e [CELEBORN-718] Decrease RemainingReviveTimes regardless
worker is excluded or not
809c76a2e is described below
commit 809c76a2e4fad4499dc27328bc00e0129bd69f69
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Tue Jun 27 15:21:09 2023 +0800
[CELEBORN-718] Decrease RemainingReviveTimes regardless worker is excluded
or not
…s excluded or not
### What changes were proposed in this pull request?
This PR makes ReviveTimes decrease regardless of the partition location is
excluded or not.
### Why are the changes needed?
In such testing setup:
- 3 Celeborn workers
- Client side blacklist enabled
```spark.celeborn.client.push.blacklist.enabled=true```
- Replication is on ```spark.celeborn.client.push.replicate.enabled=true```
- Successively kill 2 workers
I expect the task fail because of revive failure (When replication is on,
we need at least 2 workers), but in stead
the tasks hang forever. When digging into the logs I found the ```remain
revive times``` does not decrease, leading
to infinite revive loop.
```
23/06/27 14:00:57 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
23/06/27 14:01:01 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
23/06/27 14:01:05 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
23/06/27 14:01:09 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
23/06/27 14:01:13 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
23/06/27 14:01:17 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
23/06/27 14:01:21 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
23/06/27 14:01:25 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
23/06/27 14:01:29 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
23/06/27 14:01:33 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
23/06/27 14:01:37 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
23/06/27 14:01:41 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
23/06/27 14:01:45 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
23/06/27 14:01:49 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
23/06/27 14:01:53 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
23/06/27 14:01:57 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
23/06/27 14:02:01 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
23/06/27 14:02:05 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
23/06/27 14:02:09 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
23/06/27 14:02:13 ERROR ShuffleClientImpl: Push data to xxx:xxx failed for
shuffle 0 map 998 attempt 1 partition 666 batch 1, remain revive times 5.
```
The reason is before this PR, the revive times will not decrease if the
partition location is excluded, which I don't see a
reason for that.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually test.
Closes #1628 from waitinfuture/718.
Authored-by: zky.zhoukeyong <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/celeborn/client/ShuffleClientImpl.java | 18 ++----------------
1 file changed, 2 insertions(+), 16 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 1dc7605cd..61e42714e 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -845,10 +845,7 @@ public class ShuffleClientImpl extends ShuffleClient {
e);
// async retry push data
if (!mapperEnded(shuffleId, mapId)) {
- // For blacklisted partition location, Celeborn should not use
retry quota.
- if (!pushStatusIsBlacklisted(cause)) {
- remainReviveTimes = remainReviveTimes - 1;
- }
+ remainReviveTimes = remainReviveTimes - 1;
pushDataRetryPool.submit(
() ->
submitRetryPushData(
@@ -1211,12 +1208,6 @@ public class ShuffleClientImpl extends ShuffleClient {
remainReviveTimes,
e);
if (!mapperEnded(shuffleId, mapId)) {
- int tmpRemainReviveTimes = remainReviveTimes;
- // For blacklisted partition location, Celeborn should not use
retry quota.
- if (!pushStatusIsBlacklisted(cause)) {
- tmpRemainReviveTimes = tmpRemainReviveTimes - 1;
- }
- int finalRemainReviveTimes = tmpRemainReviveTimes;
pushDataRetryPool.submit(
() ->
submitRetryPushMergedData(
@@ -1227,7 +1218,7 @@ public class ShuffleClientImpl extends ShuffleClient {
batches,
cause,
groupedBatchId,
- finalRemainReviveTimes));
+ remainReviveTimes - 1));
} else {
pushState.removeBatch(groupedBatchId, hostPort);
logger.info(
@@ -1479,11 +1470,6 @@ public class ShuffleClientImpl extends ShuffleClient {
return stageEndShuffleSet.contains(shuffleId);
}
- private boolean pushStatusIsBlacklisted(StatusCode cause) {
- return cause == StatusCode.PUSH_DATA_MASTER_BLACKLISTED
- || cause == StatusCode.PUSH_DATA_SLAVE_BLACKLISTED;
- }
-
private StatusCode getPushDataFailCause(String message) {
logger.debug("Push data failed cause message: " + message);
StatusCode cause;