This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f9738d63391668396072570454fdc1eb61699098
Author: Rui Fan <[email protected]>
AuthorDate: Sun Dec 3 22:57:05 2023 +0800

    [FLINK-33737][Scheduler][refactor] Replace currentBackoffMS and 
lastFailureTimestamp with nextRestartTimestamp
---
 ...ExponentialDelayRestartBackoffTimeStrategy.java | 67 +++++++++++++------
 ...nentialDelayRestartBackoffTimeStrategyTest.java | 78 ++++++++++++++--------
 2 files changed, 96 insertions(+), 49 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java
index 17e232bcd3b..34ce219ebd9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph.failover;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.util.clock.Clock;
@@ -27,6 +28,7 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Restart strategy which tries to restart indefinitely number of times with 
an exponential backoff
@@ -37,6 +39,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ExponentialDelayRestartBackoffTimeStrategy implements 
RestartBackoffTimeStrategy {
 
+    private static final long DEFAULT_NEXT_RESTART_TIMESTAMP = 
Integer.MIN_VALUE;
+
     private final long initialBackoffMS;
 
     private final long maxBackoffMS;
@@ -53,9 +57,7 @@ public class ExponentialDelayRestartBackoffTimeStrategy 
implements RestartBackof
 
     private int currentRestartAttempt;
 
-    private long currentBackoffMS;
-
-    private long lastFailureTimestamp;
+    private long nextRestartTimestamp;
 
     ExponentialDelayRestartBackoffTimeStrategy(
             Clock clock,
@@ -90,7 +92,7 @@ public class ExponentialDelayRestartBackoffTimeStrategy 
implements RestartBackof
         this.attemptsBeforeResetBackoff = attemptsBeforeResetBackoff;
 
         this.clock = checkNotNull(clock);
-        this.lastFailureTimestamp = 0;
+        this.nextRestartTimestamp = DEFAULT_NEXT_RESTART_TIMESTAMP;
     }
 
     @Override
@@ -100,20 +102,20 @@ public class ExponentialDelayRestartBackoffTimeStrategy 
implements RestartBackof
 
     @Override
     public long getBackoffTime() {
-        long backoffWithJitter = currentBackoffMS + calculateJitterBackoffMS();
-        return Math.min(backoffWithJitter, maxBackoffMS);
+        checkState(
+                nextRestartTimestamp != DEFAULT_NEXT_RESTART_TIMESTAMP,
+                "Please call notifyFailure first.");
+        return Math.max(0, nextRestartTimestamp - clock.absoluteTimeMillis());
     }
 
     @Override
     public void notifyFailure(Throwable cause) {
         long now = clock.absoluteTimeMillis();
-        if ((now - lastFailureTimestamp) >= (resetBackoffThresholdMS + 
currentBackoffMS)) {
+        if ((now - nextRestartTimestamp) >= resetBackoffThresholdMS) {
             setInitialBackoff();
-        } else {
-            increaseBackoff();
         }
+        nextRestartTimestamp = now + calculateActualBackoffTime();
         currentRestartAttempt++;
-        lastFailureTimestamp = now;
     }
 
     @Override
@@ -132,23 +134,23 @@ public class ExponentialDelayRestartBackoffTimeStrategy 
implements RestartBackof
                 + attemptsBeforeResetBackoff
                 + ", currentRestartAttempt="
                 + currentRestartAttempt
-                + ", currentBackoffMS="
-                + currentBackoffMS
-                + ", lastFailureTimestamp="
-                + lastFailureTimestamp
+                + ", nextRestartTimestamp="
+                + nextRestartTimestamp
                 + ")";
     }
 
     private void setInitialBackoff() {
-        currentBackoffMS = initialBackoffMS;
         currentRestartAttempt = 0;
     }
 
-    private void increaseBackoff() {
-        if (currentBackoffMS < maxBackoffMS) {
-            currentBackoffMS *= backoffMultiplier;
-        }
-        currentBackoffMS = Math.max(initialBackoffMS, 
Math.min(currentBackoffMS, maxBackoffMS));
+    private long calculateActualBackoffTime() {
+        long currentBackoffTime =
+                (long) (initialBackoffMS * Math.pow(backoffMultiplier, 
currentRestartAttempt));
+        return Math.max(
+                initialBackoffMS,
+                Math.min(
+                        currentBackoffTime + 
calculateJitterBackoffMS(currentBackoffTime),
+                        maxBackoffMS));
     }
 
     /**
@@ -159,7 +161,7 @@ public class ExponentialDelayRestartBackoffTimeStrategy 
implements RestartBackof
      *
      * @return random value in interval [-n, n], where n represents jitter * 
current backoff
      */
-    private long calculateJitterBackoffMS() {
+    private long calculateJitterBackoffMS(long currentBackoffMS) {
         if (jitterFactor == 0) {
             return 0;
         } else {
@@ -210,6 +212,7 @@ public class ExponentialDelayRestartBackoffTimeStrategy 
implements RestartBackof
     /** The factory for creating {@link 
ExponentialDelayRestartBackoffTimeStrategy}. */
     public static class ExponentialDelayRestartBackoffTimeStrategyFactory 
implements Factory {
 
+        private final Clock clock;
         private final long initialBackoffMS;
         private final long maxBackoffMS;
         private final double backoffMultiplier;
@@ -224,6 +227,26 @@ public class ExponentialDelayRestartBackoffTimeStrategy 
implements RestartBackof
                 long resetBackoffThresholdMS,
                 double jitterFactor,
                 int attemptsBeforeResetBackoff) {
+            this(
+                    SystemClock.getInstance(),
+                    initialBackoffMS,
+                    maxBackoffMS,
+                    backoffMultiplier,
+                    resetBackoffThresholdMS,
+                    jitterFactor,
+                    attemptsBeforeResetBackoff);
+        }
+
+        @VisibleForTesting
+        ExponentialDelayRestartBackoffTimeStrategyFactory(
+                Clock clock,
+                long initialBackoffMS,
+                long maxBackoffMS,
+                double backoffMultiplier,
+                long resetBackoffThresholdMS,
+                double jitterFactor,
+                int attemptsBeforeResetBackoff) {
+            this.clock = clock;
             this.initialBackoffMS = initialBackoffMS;
             this.maxBackoffMS = maxBackoffMS;
             this.backoffMultiplier = backoffMultiplier;
@@ -235,7 +258,7 @@ public class ExponentialDelayRestartBackoffTimeStrategy 
implements RestartBackof
         @Override
         public RestartBackoffTimeStrategy create() {
             return new ExponentialDelayRestartBackoffTimeStrategy(
-                    SystemClock.getInstance(),
+                    clock,
                     initialBackoffMS,
                     maxBackoffMS,
                     backoffMultiplier,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java
index a52b02bda91..34f86b97d11 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph.failover;
 
+import 
org.apache.flink.runtime.executiongraph.failover.ExponentialDelayRestartBackoffTimeStrategy.ExponentialDelayRestartBackoffTimeStrategyFactory;
 import org.apache.flink.util.clock.ManualClock;
 
 import org.junit.jupiter.api.Test;
@@ -29,6 +30,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Unit tests for {@link ExponentialDelayRestartBackoffTimeStrategy}. */
 class ExponentialDelayRestartBackoffTimeStrategyTest {
@@ -49,6 +51,19 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
         assertThat(restartStrategy.canRestart()).isFalse();
     }
 
+    @Test
+    void testNotCallNotifyFailure() {
+        long initialBackoffMS = 42L;
+
+        final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
+                new ExponentialDelayRestartBackoffTimeStrategy(
+                        new ManualClock(), initialBackoffMS, 45L, 2.0, 8L, 0, 
10);
+
+        assertThatThrownBy(restartStrategy::getBackoffTime)
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage("Please call notifyFailure first.");
+    }
+
     @Test
     void testInitialBackoff() {
         long initialBackoffMS = 42L;
@@ -57,6 +72,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
                 new ExponentialDelayRestartBackoffTimeStrategy(
                         new ManualClock(), initialBackoffMS, 45L, 2.0, 8L, 0, 
Integer.MAX_VALUE);
 
+        restartStrategy.notifyFailure(failure);
         
assertThat(restartStrategy.getBackoffTime()).isEqualTo(initialBackoffMS);
     }
 
@@ -90,6 +106,8 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
                         0.25,
                         Integer.MAX_VALUE);
 
+        restartStrategy.notifyFailure(failure);
+
         clock.advanceTime(
                 resetBackoffThresholdMS + restartStrategy.getBackoffTime() - 1,
                 TimeUnit.MILLISECONDS);
@@ -123,11 +141,14 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
                         jitterFactor,
                         10);
 
+        restartStrategy.notifyFailure(failure);
+        assertThat(restartStrategy.getBackoffTime()).isEqualTo(4L); // 4
+
         restartStrategy.notifyFailure(failure);
         assertThat(restartStrategy.getBackoffTime()).isEqualTo(9L); // 4 * 2.3
 
         restartStrategy.notifyFailure(failure);
-        assertThat(restartStrategy.getBackoffTime()).isEqualTo(20L); // 9 * 2.3
+        assertThat(restartStrategy.getBackoffTime()).isEqualTo(21L); // 4 * 
2.3 * 2.3
     }
 
     @Test
@@ -135,8 +156,8 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
         final long initialBackoffMS = 2L;
         final long maxBackoffMS = 7L;
 
-        final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
-                new ExponentialDelayRestartBackoffTimeStrategy(
+        final ExponentialDelayRestartBackoffTimeStrategyFactory 
restartStrategyFactory =
+                new ExponentialDelayRestartBackoffTimeStrategyFactory(
                         new ManualClock(),
                         initialBackoffMS,
                         maxBackoffMS,
@@ -145,14 +166,11 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
                         0.25,
                         Integer.MAX_VALUE);
 
-        restartStrategy.notifyFailure(failure);
-        assertCorrectRandomRange(restartStrategy::getBackoffTime, 3L, 4L, 5L);
+        assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 2, 
3L, 4L, 5L);
 
-        restartStrategy.notifyFailure(failure);
-        assertCorrectRandomRange(restartStrategy::getBackoffTime, 6L, 7L);
+        assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 3, 
6L, 7L);
 
-        restartStrategy.notifyFailure(failure);
-        assertCorrectRandomRange(restartStrategy::getBackoffTime, 6L, 7L);
+        assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 4, 
7L);
     }
 
     @Test
@@ -160,8 +178,8 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
         double jitterFactor = 1;
         long maxBackoffMS = 7L;
 
-        final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
-                new ExponentialDelayRestartBackoffTimeStrategy(
+        final ExponentialDelayRestartBackoffTimeStrategyFactory 
restartStrategyFactory =
+                new ExponentialDelayRestartBackoffTimeStrategyFactory(
                         new ManualClock(),
                         1L,
                         maxBackoffMS,
@@ -170,17 +188,32 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
                         jitterFactor,
                         Integer.MAX_VALUE);
 
-        assertCorrectRandomRange(restartStrategy::getBackoffTime, 0L, 1L, 2L);
+        assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 1, 
1L, 2L);
 
-        restartStrategy.notifyFailure(failure);
-        assertCorrectRandomRange(restartStrategy::getBackoffTime, 0L, 1L, 2L, 
3L, 4L);
+        assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 2, 
1L, 2L, 3L, 4L);
 
-        restartStrategy.notifyFailure(failure);
-        assertCorrectRandomRange(restartStrategy::getBackoffTime, 0L, 1L, 2L, 
3L, 4L, 5L, 6L, 7L);
+        assertCorrectRandomRangeWithFailureCount(
+                restartStrategyFactory, 3, 1L, 2L, 3L, 4L, 5L, 6L, 7L);
+    }
+
+    private void assertCorrectRandomRangeWithFailureCount(
+            ExponentialDelayRestartBackoffTimeStrategyFactory factory,
+            int failureCount,
+            Long... expectedNumbers)
+            throws Exception {
+        assertCorrectRandomRange(
+                () -> {
+                    RestartBackoffTimeStrategy restartStrategy = 
factory.create();
+                    for (int i = 0; i < failureCount; i++) {
+                        restartStrategy.notifyFailure(failure);
+                    }
+                    return restartStrategy.getBackoffTime();
+                },
+                expectedNumbers);
     }
 
     @Test
-    void testMultipleSettings() throws Exception {
+    void testMultipleSettings() {
         ManualClock clock = new ManualClock();
         final long initialBackoffMS = 1L;
         final long maxBackoffMS = 9L;
@@ -199,6 +232,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
                         Integer.MAX_VALUE);
 
         // ensure initial data
+        restartStrategy.notifyFailure(failure);
         assertThat(restartStrategy.canRestart()).isTrue();
         
assertThat(restartStrategy.getBackoffTime()).isEqualTo(initialBackoffMS);
 
@@ -214,16 +248,6 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
         assertThat(restartStrategy.canRestart()).isTrue();
         assertThat(restartStrategy.getBackoffTime()).isEqualTo(2L);
 
-        clock.advanceTime(3, TimeUnit.MILLISECONDS);
-        restartStrategy.notifyFailure(failure);
-        assertThat(restartStrategy.canRestart()).isTrue();
-        assertCorrectRandomRange(restartStrategy::getBackoffTime, 3L, 4L, 5L);
-
-        clock.advanceTime(7, TimeUnit.MILLISECONDS);
-        restartStrategy.notifyFailure(failure);
-        assertThat(restartStrategy.canRestart()).isTrue();
-        assertCorrectRandomRange(restartStrategy::getBackoffTime, 6L, 7L, 8L, 
9L);
-
         // ensure backoff is reset after threshold is reached
         clock.advanceTime(resetBackoffThresholdMS + 9 + 1, 
TimeUnit.MILLISECONDS);
         restartStrategy.notifyFailure(failure);

Reply via email to