This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bb437bd07f583ffa4c86bc525659abe477d6d051 Author: Rui Fan <[email protected]> AuthorDate: Sat Dec 16 12:33:10 2023 +0800 [FLINK-33565][Exception] Restart strategy checks whether current failure is a new attempt --- ...ExponentialDelayRestartBackoffTimeStrategy.java | 5 +-- .../FailureRateRestartBackoffTimeStrategy.java | 3 +- .../FixedDelayRestartBackoffTimeStrategy.java | 3 +- .../failover/NoRestartBackoffTimeStrategy.java | 3 +- .../failover/RestartBackoffTimeStrategy.java | 5 ++- ...nentialDelayRestartBackoffTimeStrategyTest.java | 36 +++++++++++++--------- .../failover/TestRestartBackoffTimeStrategy.java | 3 +- 7 files changed, 36 insertions(+), 22 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java index 95e23fd6e05..046c88f6984 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java @@ -109,12 +109,12 @@ public class ExponentialDelayRestartBackoffTimeStrategy implements RestartBackof } @Override - public void notifyFailure(Throwable cause) { + public boolean notifyFailure(Throwable cause) { long now = clock.absoluteTimeMillis(); // Merge multiple failures into one attempt if there are tasks will be restarted later. if (now <= nextRestartTimestamp) { - return; + return false; } if ((now - nextRestartTimestamp) >= resetBackoffThresholdMS) { @@ -122,6 +122,7 @@ public class ExponentialDelayRestartBackoffTimeStrategy implements RestartBackof } nextRestartTimestamp = now + calculateActualBackoffTime(); currentRestartAttempt++; + return true; } @VisibleForTesting diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java index 66098b14d27..f6f6e4254c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java @@ -79,11 +79,12 @@ public class FailureRateRestartBackoffTimeStrategy implements RestartBackoffTime } @Override - public void notifyFailure(Throwable cause) { + public boolean notifyFailure(Throwable cause) { if (isFailureTimestampsQueueFull()) { failureTimestamps.remove(); } failureTimestamps.add(clock.absoluteTimeMillis()); + return true; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java index 2d9a63cb93c..f19ea1777fc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java @@ -61,8 +61,9 @@ public class FixedDelayRestartBackoffTimeStrategy implements RestartBackoffTimeS } @Override - public void notifyFailure(Throwable cause) { + public boolean notifyFailure(Throwable cause) { currentRestartAttempt++; + return true; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/NoRestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/NoRestartBackoffTimeStrategy.java index 61b737dd006..fa4bdd7317d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/NoRestartBackoffTimeStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/NoRestartBackoffTimeStrategy.java @@ -33,8 +33,9 @@ public enum NoRestartBackoffTimeStrategy implements RestartBackoffTimeStrategy { } @Override - public void notifyFailure(final Throwable cause) { + public boolean notifyFailure(final Throwable cause) { // nothing to do + return true; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategy.java index 9113154f544..9c400b020ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategy.java @@ -38,8 +38,11 @@ public interface RestartBackoffTimeStrategy { * Notify the strategy about the task failure cause. * * @param cause of the task failure + * @return True means that the current failure is the first one after the most-recent failure + * handling happened, false means that there has been a failure before that was not handled, + * yet, and the current failure will be considered in a combined failure handling effort. */ - void notifyFailure(Throwable cause); + boolean notifyFailure(Throwable cause); // ------------------------------------------------------------------------ // factory diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java index cfe37bec94d..eeee69dcf74 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java @@ -49,7 +49,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { for (int i = 0; i <= maxAttempts; i++) { assertThat(restartStrategy.canRestart()).isTrue(); - restartStrategy.notifyFailure(failure); + assertThat(restartStrategy.notifyFailure(failure)).isTrue(); clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1)); } assertThat(restartStrategy.canRestart()).isFalse(); @@ -110,19 +110,19 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { 0.25, Integer.MAX_VALUE); - restartStrategy.notifyFailure(failure); + assertThat(restartStrategy.notifyFailure(failure)).isTrue(); clock.advanceTime( resetBackoffThresholdMS + restartStrategy.getBackoffTime() - 1, TimeUnit.MILLISECONDS); - restartStrategy.notifyFailure(failure); + assertThat(restartStrategy.notifyFailure(failure)).isTrue(); assertThat(restartStrategy.getBackoffTime()) .as("Backoff should be increased") .isEqualTo(2L); clock.advanceTime( resetBackoffThresholdMS + restartStrategy.getBackoffTime(), TimeUnit.MILLISECONDS); - restartStrategy.notifyFailure(failure); + assertThat(restartStrategy.notifyFailure(failure)).isTrue(); assertThat(restartStrategy.getBackoffTime()) .as("Backoff should be reset") .isEqualTo(initialBackoffMS); @@ -146,15 +146,15 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { jitterFactor, 10); - restartStrategy.notifyFailure(failure); + assertThat(restartStrategy.notifyFailure(failure)).isTrue(); assertThat(restartStrategy.getBackoffTime()).isEqualTo(4L); // 4 clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1)); - restartStrategy.notifyFailure(failure); + assertThat(restartStrategy.notifyFailure(failure)).isTrue(); assertThat(restartStrategy.getBackoffTime()).isEqualTo(9L); // 4 * 2.3 clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1)); - restartStrategy.notifyFailure(failure); + assertThat(restartStrategy.notifyFailure(failure)).isTrue(); assertThat(restartStrategy.getBackoffTime()).isEqualTo(21L); // 4 * 2.3 * 2.3 clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1)); } @@ -223,7 +223,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { RestartBackoffTimeStrategy restartStrategy = factory.create(); for (int i = 0; i < failureCount; i++) { clock.advanceTime(Duration.ofMillis(advanceMsEachFailure)); - restartStrategy.notifyFailure(failure); + assertThat(restartStrategy.notifyFailure(failure)).isTrue(); } return restartStrategy.getBackoffTime(); }, @@ -250,31 +250,31 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { Integer.MAX_VALUE); // ensure initial data - restartStrategy.notifyFailure(failure); + assertThat(restartStrategy.notifyFailure(failure)).isTrue(); assertThat(restartStrategy.canRestart()).isTrue(); assertThat(restartStrategy.getBackoffTime()).isEqualTo(initialBackoffMS); // ensure backoff time is initial after the first failure clock.advanceTime(resetBackoffThresholdMS + 1, TimeUnit.MILLISECONDS); - restartStrategy.notifyFailure(failure); + assertThat(restartStrategy.notifyFailure(failure)).isTrue(); assertThat(restartStrategy.canRestart()).isTrue(); assertThat(restartStrategy.getBackoffTime()).isEqualTo(initialBackoffMS); // ensure backoff increases until threshold is reached clock.advanceTime(4, TimeUnit.MILLISECONDS); - restartStrategy.notifyFailure(failure); + assertThat(restartStrategy.notifyFailure(failure)).isTrue(); assertThat(restartStrategy.canRestart()).isTrue(); assertThat(restartStrategy.getBackoffTime()).isEqualTo(2L); // ensure backoff is reset after threshold is reached clock.advanceTime(resetBackoffThresholdMS + 9 + 1, TimeUnit.MILLISECONDS); - restartStrategy.notifyFailure(failure); + assertThat(restartStrategy.notifyFailure(failure)).isTrue(); assertThat(restartStrategy.canRestart()).isTrue(); assertThat(restartStrategy.getBackoffTime()).isOne(); clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1)); // ensure backoff still increases - restartStrategy.notifyFailure(failure); + assertThat(restartStrategy.notifyFailure(failure)).isTrue(); assertThat(restartStrategy.canRestart()).isTrue(); assertThat(restartStrategy.getBackoffTime()).isEqualTo(2L); } @@ -313,7 +313,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { // After advance time it's a new round, and it reaches the maxAttempts. clock.advanceTime(1, TimeUnit.MILLISECONDS); - restartStrategy.notifyFailure(failure); + assertThat(restartStrategy.notifyFailure(failure)).isTrue(); assertThat(restartStrategy.canRestart()).isFalse(); } @@ -360,9 +360,15 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { ManualClock clock, long expectedBackoffMS, ExponentialDelayRestartBackoffTimeStrategy restartStrategy) { + boolean expectedNewAttempt = true; for (int advanceMs = 0; advanceMs < expectedBackoffMS; advanceMs++) { for (int i = 0; i < 10; i++) { - restartStrategy.notifyFailure(failure); + assertThat(restartStrategy.notifyFailure(failure)).isEqualTo(expectedNewAttempt); + if (expectedNewAttempt) { + // Only the first one is new attempt, all rest of failures aren't new attempt. + expectedNewAttempt = false; + } + assertThat(restartStrategy.canRestart()).isTrue(); assertThat(restartStrategy.getBackoffTime()) .isEqualTo(expectedBackoffMS - advanceMs); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/TestRestartBackoffTimeStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/TestRestartBackoffTimeStrategy.java index 0307891fd9e..44830a6423f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/TestRestartBackoffTimeStrategy.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/TestRestartBackoffTimeStrategy.java @@ -42,8 +42,9 @@ public class TestRestartBackoffTimeStrategy implements RestartBackoffTimeStrateg } @Override - public void notifyFailure(Throwable cause) { + public boolean notifyFailure(Throwable cause) { // ignore + return true; } public void setCanRestart(final boolean canRestart) {
