This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ef3cefda35428104af354fc3eb563afee58bf639
Author: Rui Fan <[email protected]>
AuthorDate: Thu Dec 14 14:42:38 2023 +0800

    [FLINK-33737][Scheduler] Merge multiple Exceptions into one attempt for 
exponential-delay restart-strategy
---
 ...ExponentialDelayRestartBackoffTimeStrategy.java |   6 +
 ...nentialDelayRestartBackoffTimeStrategyTest.java | 141 ++++++++++++++++++---
 2 files changed, 132 insertions(+), 15 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java
index 34ce219ebd9..2ad26f8c89c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java
@@ -111,6 +111,12 @@ public class ExponentialDelayRestartBackoffTimeStrategy 
implements RestartBackof
     @Override
     public void notifyFailure(Throwable cause) {
         long now = clock.absoluteTimeMillis();
+
+        // Merge multiple failures into one attempt if there are tasks will be 
restarted later.
+        if (now <= nextRestartTimestamp) {
+            return;
+        }
+
         if ((now - nextRestartTimestamp) >= resetBackoffThresholdMS) {
             setInitialBackoff();
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java
index 34f86b97d11..cfe37bec94d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.util.clock.ManualClock;
 
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
@@ -40,13 +41,16 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
     @Test
     void testMaxAttempts() {
         int maxAttempts = 13;
+        ManualClock clock = new ManualClock();
+        long maxBackoffMS = 3L;
         final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
                 new ExponentialDelayRestartBackoffTimeStrategy(
-                        new ManualClock(), 1L, 3L, 1.2, 4L, 0.25, maxAttempts);
+                        clock, 1L, maxBackoffMS, 1.2, 10L, 0.25, maxAttempts);
 
         for (int i = 0; i <= maxAttempts; i++) {
             assertThat(restartStrategy.canRestart()).isTrue();
             restartStrategy.notifyFailure(failure);
+            clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1));
         }
         assertThat(restartStrategy.canRestart()).isFalse();
     }
@@ -131,24 +135,28 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
         double backoffMultiplier = 2.3;
         long maxBackoffMS = 300L;
 
+        ManualClock clock = new ManualClock();
         final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
                 new ExponentialDelayRestartBackoffTimeStrategy(
-                        new ManualClock(),
+                        clock,
                         initialBackoffMS,
                         maxBackoffMS,
                         backoffMultiplier,
-                        8L,
+                        Integer.MAX_VALUE,
                         jitterFactor,
                         10);
 
         restartStrategy.notifyFailure(failure);
         assertThat(restartStrategy.getBackoffTime()).isEqualTo(4L); // 4
+        clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1));
 
         restartStrategy.notifyFailure(failure);
         assertThat(restartStrategy.getBackoffTime()).isEqualTo(9L); // 4 * 2.3
+        clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1));
 
         restartStrategy.notifyFailure(failure);
         assertThat(restartStrategy.getBackoffTime()).isEqualTo(21L); // 4 * 
2.3 * 2.3
+        clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1));
     }
 
     @Test
@@ -156,21 +164,25 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
         final long initialBackoffMS = 2L;
         final long maxBackoffMS = 7L;
 
+        ManualClock clock = new ManualClock();
         final ExponentialDelayRestartBackoffTimeStrategyFactory 
restartStrategyFactory =
                 new ExponentialDelayRestartBackoffTimeStrategyFactory(
-                        new ManualClock(),
+                        clock,
                         initialBackoffMS,
                         maxBackoffMS,
                         2.0,
-                        1L,
+                        Integer.MAX_VALUE,
                         0.25,
                         Integer.MAX_VALUE);
 
-        assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 2, 
3L, 4L, 5L);
+        assertCorrectRandomRangeWithFailureCount(
+                restartStrategyFactory, clock, maxBackoffMS + 1, 2, 3L, 4L, 
5L);
 
-        assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 3, 
6L, 7L);
+        assertCorrectRandomRangeWithFailureCount(
+                restartStrategyFactory, clock, maxBackoffMS + 1, 3, 6L, 7L);
 
-        assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 4, 
7L);
+        assertCorrectRandomRangeWithFailureCount(
+                restartStrategyFactory, clock, maxBackoffMS + 1, 4, 7L);
     }
 
     @Test
@@ -178,26 +190,31 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
         double jitterFactor = 1;
         long maxBackoffMS = 7L;
 
+        ManualClock clock = new ManualClock();
         final ExponentialDelayRestartBackoffTimeStrategyFactory 
restartStrategyFactory =
                 new ExponentialDelayRestartBackoffTimeStrategyFactory(
-                        new ManualClock(),
+                        clock,
                         1L,
                         maxBackoffMS,
                         2.0,
-                        8L,
+                        Integer.MAX_VALUE,
                         jitterFactor,
                         Integer.MAX_VALUE);
 
-        assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 1, 
1L, 2L);
+        assertCorrectRandomRangeWithFailureCount(
+                restartStrategyFactory, clock, maxBackoffMS + 1, 1, 1L, 2L);
 
-        assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 2, 
1L, 2L, 3L, 4L);
+        assertCorrectRandomRangeWithFailureCount(
+                restartStrategyFactory, clock, maxBackoffMS + 1, 2, 1L, 2L, 
3L, 4L);
 
         assertCorrectRandomRangeWithFailureCount(
-                restartStrategyFactory, 3, 1L, 2L, 3L, 4L, 5L, 6L, 7L);
+                restartStrategyFactory, clock, maxBackoffMS + 1, 3, 1L, 2L, 
3L, 4L, 5L, 6L, 7L);
     }
 
     private void assertCorrectRandomRangeWithFailureCount(
             ExponentialDelayRestartBackoffTimeStrategyFactory factory,
+            ManualClock clock,
+            long advanceMsEachFailure,
             int failureCount,
             Long... expectedNumbers)
             throws Exception {
@@ -205,6 +222,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
                 () -> {
                     RestartBackoffTimeStrategy restartStrategy = 
factory.create();
                     for (int i = 0; i < failureCount; i++) {
+                        
clock.advanceTime(Duration.ofMillis(advanceMsEachFailure));
                         restartStrategy.notifyFailure(failure);
                     }
                     return restartStrategy.getBackoffTime();
@@ -218,7 +236,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
         final long initialBackoffMS = 1L;
         final long maxBackoffMS = 9L;
         double backoffMultiplier = 2.0;
-        final long resetBackoffThresholdMS = 8L;
+        final long resetBackoffThresholdMS = 80L;
         double jitterFactor = 0.25;
 
         final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
@@ -237,7 +255,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
         
assertThat(restartStrategy.getBackoffTime()).isEqualTo(initialBackoffMS);
 
         // ensure backoff time is initial after the first failure
-        clock.advanceTime(50, TimeUnit.MILLISECONDS);
+        clock.advanceTime(resetBackoffThresholdMS + 1, TimeUnit.MILLISECONDS);
         restartStrategy.notifyFailure(failure);
         assertThat(restartStrategy.canRestart()).isTrue();
         
assertThat(restartStrategy.getBackoffTime()).isEqualTo(initialBackoffMS);
@@ -253,6 +271,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
         restartStrategy.notifyFailure(failure);
         assertThat(restartStrategy.canRestart()).isTrue();
         assertThat(restartStrategy.getBackoffTime()).isOne();
+        clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1));
 
         // ensure backoff still increases
         restartStrategy.notifyFailure(failure);
@@ -260,6 +279,98 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
         assertThat(restartStrategy.getBackoffTime()).isEqualTo(2L);
     }
 
+    @Test
+    void testMergeMultipleExceptionsIntoOneAttempt() {
+        ManualClock clock = new ManualClock();
+        long initialBackoffMS = 2L;
+        double backoffMultiplier = 2.0d;
+        final long maxBackoffMS = 6L;
+        final long resetBackoffThresholdMS = 80L;
+
+        final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
+                new ExponentialDelayRestartBackoffTimeStrategy(
+                        clock,
+                        initialBackoffMS,
+                        maxBackoffMS,
+                        backoffMultiplier,
+                        resetBackoffThresholdMS,
+                        0.d,
+                        3);
+
+        // All exceptions merged into one attempt if the time is same.
+        long currentBackOffMs = initialBackoffMS;
+        checkMultipleExceptionsAreMerged(clock, currentBackOffMs, 
restartStrategy);
+
+        // After advance time it's a new round, so new exception will be a new 
attempt.
+        clock.advanceTime(1, TimeUnit.MILLISECONDS);
+        currentBackOffMs *= backoffMultiplier;
+        checkMultipleExceptionsAreMerged(clock, currentBackOffMs, 
restartStrategy);
+
+        // After advance time it's a new round, so new exception will be a new 
attempt.
+        clock.advanceTime(1, TimeUnit.MILLISECONDS);
+        currentBackOffMs = maxBackoffMS;
+        checkMultipleExceptionsAreMerged(clock, currentBackOffMs, 
restartStrategy);
+
+        // After advance time it's a new round, and it reaches the maxAttempts.
+        clock.advanceTime(1, TimeUnit.MILLISECONDS);
+        restartStrategy.notifyFailure(failure);
+        assertThat(restartStrategy.canRestart()).isFalse();
+    }
+
+    @Test
+    void testMergingExceptionsWorksWithResetting() {
+        ManualClock clock = new ManualClock();
+        long initialBackoffMS = 2L;
+        double backoffMultiplier = 2.0d;
+        final long maxBackoffMS = 6L;
+        final long resetBackoffThresholdMS = 80L;
+
+        final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
+                new ExponentialDelayRestartBackoffTimeStrategy(
+                        clock,
+                        initialBackoffMS,
+                        maxBackoffMS,
+                        backoffMultiplier,
+                        resetBackoffThresholdMS,
+                        0.d,
+                        3);
+
+        // Test the merging logic works well after a series of resetting.
+        for (int i = 0; i < 10; i++) {
+            // All exceptions merged into one attempt if the time is same.
+            long currentBackOffMs = initialBackoffMS;
+            checkMultipleExceptionsAreMerged(clock, currentBackOffMs, 
restartStrategy);
+
+            // After advance time it's a new round, so new exception will be a 
new attempt.
+            clock.advanceTime(1, TimeUnit.MILLISECONDS);
+            currentBackOffMs *= backoffMultiplier;
+            checkMultipleExceptionsAreMerged(clock, currentBackOffMs, 
restartStrategy);
+
+            // After advance time it's a new round, so new exception will be a 
new attempt.
+            clock.advanceTime(1, TimeUnit.MILLISECONDS);
+            currentBackOffMs = maxBackoffMS;
+            checkMultipleExceptionsAreMerged(clock, currentBackOffMs, 
restartStrategy);
+
+            // After resetBackoffThresholdMS, the restartStrategy should be 
reset.
+            clock.advanceTime(resetBackoffThresholdMS, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private void checkMultipleExceptionsAreMerged(
+            ManualClock clock,
+            long expectedBackoffMS,
+            ExponentialDelayRestartBackoffTimeStrategy restartStrategy) {
+        for (int advanceMs = 0; advanceMs < expectedBackoffMS; advanceMs++) {
+            for (int i = 0; i < 10; i++) {
+                restartStrategy.notifyFailure(failure);
+                assertThat(restartStrategy.canRestart()).isTrue();
+                assertThat(restartStrategy.getBackoffTime())
+                        .isEqualTo(expectedBackoffMS - advanceMs);
+            }
+            clock.advanceTime(1, TimeUnit.MILLISECONDS);
+        }
+    }
+
     private void assertCorrectRandomRange(Callable<Long> numberGenerator, 
Long... expectedNumbers)
             throws Exception {
         Set<Long> generatedNumbers = new HashSet<>();

Reply via email to