This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ef3cefda35428104af354fc3eb563afee58bf639 Author: Rui Fan <[email protected]> AuthorDate: Thu Dec 14 14:42:38 2023 +0800 [FLINK-33737][Scheduler] Merge multiple Exceptions into one attempt for exponential-delay restart-strategy --- ...ExponentialDelayRestartBackoffTimeStrategy.java | 6 + ...nentialDelayRestartBackoffTimeStrategyTest.java | 141 ++++++++++++++++++--- 2 files changed, 132 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java index 34ce219ebd9..2ad26f8c89c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java @@ -111,6 +111,12 @@ public class ExponentialDelayRestartBackoffTimeStrategy implements RestartBackof @Override public void notifyFailure(Throwable cause) { long now = clock.absoluteTimeMillis(); + + // Merge multiple failures into one attempt if there are tasks will be restarted later. + if (now <= nextRestartTimestamp) { + return; + } + if ((now - nextRestartTimestamp) >= resetBackoffThresholdMS) { setInitialBackoff(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java index 34f86b97d11..cfe37bec94d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java @@ -23,6 +23,7 @@ import org.apache.flink.util.clock.ManualClock; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -40,13 +41,16 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { @Test void testMaxAttempts() { int maxAttempts = 13; + ManualClock clock = new ManualClock(); + long maxBackoffMS = 3L; final ExponentialDelayRestartBackoffTimeStrategy restartStrategy = new ExponentialDelayRestartBackoffTimeStrategy( - new ManualClock(), 1L, 3L, 1.2, 4L, 0.25, maxAttempts); + clock, 1L, maxBackoffMS, 1.2, 10L, 0.25, maxAttempts); for (int i = 0; i <= maxAttempts; i++) { assertThat(restartStrategy.canRestart()).isTrue(); restartStrategy.notifyFailure(failure); + clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1)); } assertThat(restartStrategy.canRestart()).isFalse(); } @@ -131,24 +135,28 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { double backoffMultiplier = 2.3; long maxBackoffMS = 300L; + ManualClock clock = new ManualClock(); final ExponentialDelayRestartBackoffTimeStrategy restartStrategy = new ExponentialDelayRestartBackoffTimeStrategy( - new ManualClock(), + clock, initialBackoffMS, maxBackoffMS, backoffMultiplier, - 8L, + Integer.MAX_VALUE, jitterFactor, 10); restartStrategy.notifyFailure(failure); assertThat(restartStrategy.getBackoffTime()).isEqualTo(4L); // 4 + clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1)); restartStrategy.notifyFailure(failure); assertThat(restartStrategy.getBackoffTime()).isEqualTo(9L); // 4 * 2.3 + clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1)); restartStrategy.notifyFailure(failure); assertThat(restartStrategy.getBackoffTime()).isEqualTo(21L); // 4 * 2.3 * 2.3 + clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1)); } @Test @@ -156,21 +164,25 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { final long initialBackoffMS = 2L; final long maxBackoffMS = 7L; + ManualClock clock = new ManualClock(); final ExponentialDelayRestartBackoffTimeStrategyFactory restartStrategyFactory = new ExponentialDelayRestartBackoffTimeStrategyFactory( - new ManualClock(), + clock, initialBackoffMS, maxBackoffMS, 2.0, - 1L, + Integer.MAX_VALUE, 0.25, Integer.MAX_VALUE); - assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 2, 3L, 4L, 5L); + assertCorrectRandomRangeWithFailureCount( + restartStrategyFactory, clock, maxBackoffMS + 1, 2, 3L, 4L, 5L); - assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 3, 6L, 7L); + assertCorrectRandomRangeWithFailureCount( + restartStrategyFactory, clock, maxBackoffMS + 1, 3, 6L, 7L); - assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 4, 7L); + assertCorrectRandomRangeWithFailureCount( + restartStrategyFactory, clock, maxBackoffMS + 1, 4, 7L); } @Test @@ -178,26 +190,31 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { double jitterFactor = 1; long maxBackoffMS = 7L; + ManualClock clock = new ManualClock(); final ExponentialDelayRestartBackoffTimeStrategyFactory restartStrategyFactory = new ExponentialDelayRestartBackoffTimeStrategyFactory( - new ManualClock(), + clock, 1L, maxBackoffMS, 2.0, - 8L, + Integer.MAX_VALUE, jitterFactor, Integer.MAX_VALUE); - assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 1, 1L, 2L); + assertCorrectRandomRangeWithFailureCount( + restartStrategyFactory, clock, maxBackoffMS + 1, 1, 1L, 2L); - assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 2, 1L, 2L, 3L, 4L); + assertCorrectRandomRangeWithFailureCount( + restartStrategyFactory, clock, maxBackoffMS + 1, 2, 1L, 2L, 3L, 4L); assertCorrectRandomRangeWithFailureCount( - restartStrategyFactory, 3, 1L, 2L, 3L, 4L, 5L, 6L, 7L); + restartStrategyFactory, clock, maxBackoffMS + 1, 3, 1L, 2L, 3L, 4L, 5L, 6L, 7L); } private void assertCorrectRandomRangeWithFailureCount( ExponentialDelayRestartBackoffTimeStrategyFactory factory, + ManualClock clock, + long advanceMsEachFailure, int failureCount, Long... expectedNumbers) throws Exception { @@ -205,6 +222,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { () -> { RestartBackoffTimeStrategy restartStrategy = factory.create(); for (int i = 0; i < failureCount; i++) { + clock.advanceTime(Duration.ofMillis(advanceMsEachFailure)); restartStrategy.notifyFailure(failure); } return restartStrategy.getBackoffTime(); @@ -218,7 +236,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { final long initialBackoffMS = 1L; final long maxBackoffMS = 9L; double backoffMultiplier = 2.0; - final long resetBackoffThresholdMS = 8L; + final long resetBackoffThresholdMS = 80L; double jitterFactor = 0.25; final ExponentialDelayRestartBackoffTimeStrategy restartStrategy = @@ -237,7 +255,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { assertThat(restartStrategy.getBackoffTime()).isEqualTo(initialBackoffMS); // ensure backoff time is initial after the first failure - clock.advanceTime(50, TimeUnit.MILLISECONDS); + clock.advanceTime(resetBackoffThresholdMS + 1, TimeUnit.MILLISECONDS); restartStrategy.notifyFailure(failure); assertThat(restartStrategy.canRestart()).isTrue(); assertThat(restartStrategy.getBackoffTime()).isEqualTo(initialBackoffMS); @@ -253,6 +271,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { restartStrategy.notifyFailure(failure); assertThat(restartStrategy.canRestart()).isTrue(); assertThat(restartStrategy.getBackoffTime()).isOne(); + clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1)); // ensure backoff still increases restartStrategy.notifyFailure(failure); @@ -260,6 +279,98 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { assertThat(restartStrategy.getBackoffTime()).isEqualTo(2L); } + @Test + void testMergeMultipleExceptionsIntoOneAttempt() { + ManualClock clock = new ManualClock(); + long initialBackoffMS = 2L; + double backoffMultiplier = 2.0d; + final long maxBackoffMS = 6L; + final long resetBackoffThresholdMS = 80L; + + final ExponentialDelayRestartBackoffTimeStrategy restartStrategy = + new ExponentialDelayRestartBackoffTimeStrategy( + clock, + initialBackoffMS, + maxBackoffMS, + backoffMultiplier, + resetBackoffThresholdMS, + 0.d, + 3); + + // All exceptions merged into one attempt if the time is same. + long currentBackOffMs = initialBackoffMS; + checkMultipleExceptionsAreMerged(clock, currentBackOffMs, restartStrategy); + + // After advance time it's a new round, so new exception will be a new attempt. + clock.advanceTime(1, TimeUnit.MILLISECONDS); + currentBackOffMs *= backoffMultiplier; + checkMultipleExceptionsAreMerged(clock, currentBackOffMs, restartStrategy); + + // After advance time it's a new round, so new exception will be a new attempt. + clock.advanceTime(1, TimeUnit.MILLISECONDS); + currentBackOffMs = maxBackoffMS; + checkMultipleExceptionsAreMerged(clock, currentBackOffMs, restartStrategy); + + // After advance time it's a new round, and it reaches the maxAttempts. + clock.advanceTime(1, TimeUnit.MILLISECONDS); + restartStrategy.notifyFailure(failure); + assertThat(restartStrategy.canRestart()).isFalse(); + } + + @Test + void testMergingExceptionsWorksWithResetting() { + ManualClock clock = new ManualClock(); + long initialBackoffMS = 2L; + double backoffMultiplier = 2.0d; + final long maxBackoffMS = 6L; + final long resetBackoffThresholdMS = 80L; + + final ExponentialDelayRestartBackoffTimeStrategy restartStrategy = + new ExponentialDelayRestartBackoffTimeStrategy( + clock, + initialBackoffMS, + maxBackoffMS, + backoffMultiplier, + resetBackoffThresholdMS, + 0.d, + 3); + + // Test the merging logic works well after a series of resetting. + for (int i = 0; i < 10; i++) { + // All exceptions merged into one attempt if the time is same. + long currentBackOffMs = initialBackoffMS; + checkMultipleExceptionsAreMerged(clock, currentBackOffMs, restartStrategy); + + // After advance time it's a new round, so new exception will be a new attempt. + clock.advanceTime(1, TimeUnit.MILLISECONDS); + currentBackOffMs *= backoffMultiplier; + checkMultipleExceptionsAreMerged(clock, currentBackOffMs, restartStrategy); + + // After advance time it's a new round, so new exception will be a new attempt. + clock.advanceTime(1, TimeUnit.MILLISECONDS); + currentBackOffMs = maxBackoffMS; + checkMultipleExceptionsAreMerged(clock, currentBackOffMs, restartStrategy); + + // After resetBackoffThresholdMS, the restartStrategy should be reset. + clock.advanceTime(resetBackoffThresholdMS, TimeUnit.MILLISECONDS); + } + } + + private void checkMultipleExceptionsAreMerged( + ManualClock clock, + long expectedBackoffMS, + ExponentialDelayRestartBackoffTimeStrategy restartStrategy) { + for (int advanceMs = 0; advanceMs < expectedBackoffMS; advanceMs++) { + for (int i = 0; i < 10; i++) { + restartStrategy.notifyFailure(failure); + assertThat(restartStrategy.canRestart()).isTrue(); + assertThat(restartStrategy.getBackoffTime()) + .isEqualTo(expectedBackoffMS - advanceMs); + } + clock.advanceTime(1, TimeUnit.MILLISECONDS); + } + } + private void assertCorrectRandomRange(Callable<Long> numberGenerator, Long... expectedNumbers) throws Exception { Set<Long> generatedNumbers = new HashSet<>();
