This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f9738d63391668396072570454fdc1eb61699098 Author: Rui Fan <[email protected]> AuthorDate: Sun Dec 3 22:57:05 2023 +0800 [FLINK-33737][Scheduler][refactor] Replace currentBackoffMS and lastFailureTimestamp with nextRestartTimestamp --- ...ExponentialDelayRestartBackoffTimeStrategy.java | 67 +++++++++++++------ ...nentialDelayRestartBackoffTimeStrategyTest.java | 78 ++++++++++++++-------- 2 files changed, 96 insertions(+), 49 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java index 17e232bcd3b..34ce219ebd9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph.failover; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.util.clock.Clock; @@ -27,6 +28,7 @@ import java.util.concurrent.ThreadLocalRandom; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * Restart strategy which tries to restart indefinitely number of times with an exponential backoff @@ -37,6 +39,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class ExponentialDelayRestartBackoffTimeStrategy implements RestartBackoffTimeStrategy { + private static final long DEFAULT_NEXT_RESTART_TIMESTAMP = Integer.MIN_VALUE; + private final long initialBackoffMS; private final long maxBackoffMS; @@ -53,9 +57,7 @@ public class ExponentialDelayRestartBackoffTimeStrategy implements RestartBackof private int currentRestartAttempt; - private long currentBackoffMS; - - private long lastFailureTimestamp; + private long nextRestartTimestamp; ExponentialDelayRestartBackoffTimeStrategy( Clock clock, @@ -90,7 +92,7 @@ public class ExponentialDelayRestartBackoffTimeStrategy implements RestartBackof this.attemptsBeforeResetBackoff = attemptsBeforeResetBackoff; this.clock = checkNotNull(clock); - this.lastFailureTimestamp = 0; + this.nextRestartTimestamp = DEFAULT_NEXT_RESTART_TIMESTAMP; } @Override @@ -100,20 +102,20 @@ public class ExponentialDelayRestartBackoffTimeStrategy implements RestartBackof @Override public long getBackoffTime() { - long backoffWithJitter = currentBackoffMS + calculateJitterBackoffMS(); - return Math.min(backoffWithJitter, maxBackoffMS); + checkState( + nextRestartTimestamp != DEFAULT_NEXT_RESTART_TIMESTAMP, + "Please call notifyFailure first."); + return Math.max(0, nextRestartTimestamp - clock.absoluteTimeMillis()); } @Override public void notifyFailure(Throwable cause) { long now = clock.absoluteTimeMillis(); - if ((now - lastFailureTimestamp) >= (resetBackoffThresholdMS + currentBackoffMS)) { + if ((now - nextRestartTimestamp) >= resetBackoffThresholdMS) { setInitialBackoff(); - } else { - increaseBackoff(); } + nextRestartTimestamp = now + calculateActualBackoffTime(); currentRestartAttempt++; - lastFailureTimestamp = now; } @Override @@ -132,23 +134,23 @@ public class ExponentialDelayRestartBackoffTimeStrategy implements RestartBackof + attemptsBeforeResetBackoff + ", currentRestartAttempt=" + currentRestartAttempt - + ", currentBackoffMS=" - + currentBackoffMS - + ", lastFailureTimestamp=" - + lastFailureTimestamp + + ", nextRestartTimestamp=" + + nextRestartTimestamp + ")"; } private void setInitialBackoff() { - currentBackoffMS = initialBackoffMS; currentRestartAttempt = 0; } - private void increaseBackoff() { - if (currentBackoffMS < maxBackoffMS) { - currentBackoffMS *= backoffMultiplier; - } - currentBackoffMS = Math.max(initialBackoffMS, Math.min(currentBackoffMS, maxBackoffMS)); + private long calculateActualBackoffTime() { + long currentBackoffTime = + (long) (initialBackoffMS * Math.pow(backoffMultiplier, currentRestartAttempt)); + return Math.max( + initialBackoffMS, + Math.min( + currentBackoffTime + calculateJitterBackoffMS(currentBackoffTime), + maxBackoffMS)); } /** @@ -159,7 +161,7 @@ public class ExponentialDelayRestartBackoffTimeStrategy implements RestartBackof * * @return random value in interval [-n, n], where n represents jitter * current backoff */ - private long calculateJitterBackoffMS() { + private long calculateJitterBackoffMS(long currentBackoffMS) { if (jitterFactor == 0) { return 0; } else { @@ -210,6 +212,7 @@ public class ExponentialDelayRestartBackoffTimeStrategy implements RestartBackof /** The factory for creating {@link ExponentialDelayRestartBackoffTimeStrategy}. */ public static class ExponentialDelayRestartBackoffTimeStrategyFactory implements Factory { + private final Clock clock; private final long initialBackoffMS; private final long maxBackoffMS; private final double backoffMultiplier; @@ -224,6 +227,26 @@ public class ExponentialDelayRestartBackoffTimeStrategy implements RestartBackof long resetBackoffThresholdMS, double jitterFactor, int attemptsBeforeResetBackoff) { + this( + SystemClock.getInstance(), + initialBackoffMS, + maxBackoffMS, + backoffMultiplier, + resetBackoffThresholdMS, + jitterFactor, + attemptsBeforeResetBackoff); + } + + @VisibleForTesting + ExponentialDelayRestartBackoffTimeStrategyFactory( + Clock clock, + long initialBackoffMS, + long maxBackoffMS, + double backoffMultiplier, + long resetBackoffThresholdMS, + double jitterFactor, + int attemptsBeforeResetBackoff) { + this.clock = clock; this.initialBackoffMS = initialBackoffMS; this.maxBackoffMS = maxBackoffMS; this.backoffMultiplier = backoffMultiplier; @@ -235,7 +258,7 @@ public class ExponentialDelayRestartBackoffTimeStrategy implements RestartBackof @Override public RestartBackoffTimeStrategy create() { return new ExponentialDelayRestartBackoffTimeStrategy( - SystemClock.getInstance(), + clock, initialBackoffMS, maxBackoffMS, backoffMultiplier, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java index a52b02bda91..34f86b97d11 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph.failover; +import org.apache.flink.runtime.executiongraph.failover.ExponentialDelayRestartBackoffTimeStrategy.ExponentialDelayRestartBackoffTimeStrategyFactory; import org.apache.flink.util.clock.ManualClock; import org.junit.jupiter.api.Test; @@ -29,6 +30,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Unit tests for {@link ExponentialDelayRestartBackoffTimeStrategy}. */ class ExponentialDelayRestartBackoffTimeStrategyTest { @@ -49,6 +51,19 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { assertThat(restartStrategy.canRestart()).isFalse(); } + @Test + void testNotCallNotifyFailure() { + long initialBackoffMS = 42L; + + final ExponentialDelayRestartBackoffTimeStrategy restartStrategy = + new ExponentialDelayRestartBackoffTimeStrategy( + new ManualClock(), initialBackoffMS, 45L, 2.0, 8L, 0, 10); + + assertThatThrownBy(restartStrategy::getBackoffTime) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Please call notifyFailure first."); + } + @Test void testInitialBackoff() { long initialBackoffMS = 42L; @@ -57,6 +72,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { new ExponentialDelayRestartBackoffTimeStrategy( new ManualClock(), initialBackoffMS, 45L, 2.0, 8L, 0, Integer.MAX_VALUE); + restartStrategy.notifyFailure(failure); assertThat(restartStrategy.getBackoffTime()).isEqualTo(initialBackoffMS); } @@ -90,6 +106,8 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { 0.25, Integer.MAX_VALUE); + restartStrategy.notifyFailure(failure); + clock.advanceTime( resetBackoffThresholdMS + restartStrategy.getBackoffTime() - 1, TimeUnit.MILLISECONDS); @@ -123,11 +141,14 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { jitterFactor, 10); + restartStrategy.notifyFailure(failure); + assertThat(restartStrategy.getBackoffTime()).isEqualTo(4L); // 4 + restartStrategy.notifyFailure(failure); assertThat(restartStrategy.getBackoffTime()).isEqualTo(9L); // 4 * 2.3 restartStrategy.notifyFailure(failure); - assertThat(restartStrategy.getBackoffTime()).isEqualTo(20L); // 9 * 2.3 + assertThat(restartStrategy.getBackoffTime()).isEqualTo(21L); // 4 * 2.3 * 2.3 } @Test @@ -135,8 +156,8 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { final long initialBackoffMS = 2L; final long maxBackoffMS = 7L; - final ExponentialDelayRestartBackoffTimeStrategy restartStrategy = - new ExponentialDelayRestartBackoffTimeStrategy( + final ExponentialDelayRestartBackoffTimeStrategyFactory restartStrategyFactory = + new ExponentialDelayRestartBackoffTimeStrategyFactory( new ManualClock(), initialBackoffMS, maxBackoffMS, @@ -145,14 +166,11 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { 0.25, Integer.MAX_VALUE); - restartStrategy.notifyFailure(failure); - assertCorrectRandomRange(restartStrategy::getBackoffTime, 3L, 4L, 5L); + assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 2, 3L, 4L, 5L); - restartStrategy.notifyFailure(failure); - assertCorrectRandomRange(restartStrategy::getBackoffTime, 6L, 7L); + assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 3, 6L, 7L); - restartStrategy.notifyFailure(failure); - assertCorrectRandomRange(restartStrategy::getBackoffTime, 6L, 7L); + assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 4, 7L); } @Test @@ -160,8 +178,8 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { double jitterFactor = 1; long maxBackoffMS = 7L; - final ExponentialDelayRestartBackoffTimeStrategy restartStrategy = - new ExponentialDelayRestartBackoffTimeStrategy( + final ExponentialDelayRestartBackoffTimeStrategyFactory restartStrategyFactory = + new ExponentialDelayRestartBackoffTimeStrategyFactory( new ManualClock(), 1L, maxBackoffMS, @@ -170,17 +188,32 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { jitterFactor, Integer.MAX_VALUE); - assertCorrectRandomRange(restartStrategy::getBackoffTime, 0L, 1L, 2L); + assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 1, 1L, 2L); - restartStrategy.notifyFailure(failure); - assertCorrectRandomRange(restartStrategy::getBackoffTime, 0L, 1L, 2L, 3L, 4L); + assertCorrectRandomRangeWithFailureCount(restartStrategyFactory, 2, 1L, 2L, 3L, 4L); - restartStrategy.notifyFailure(failure); - assertCorrectRandomRange(restartStrategy::getBackoffTime, 0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L); + assertCorrectRandomRangeWithFailureCount( + restartStrategyFactory, 3, 1L, 2L, 3L, 4L, 5L, 6L, 7L); + } + + private void assertCorrectRandomRangeWithFailureCount( + ExponentialDelayRestartBackoffTimeStrategyFactory factory, + int failureCount, + Long... expectedNumbers) + throws Exception { + assertCorrectRandomRange( + () -> { + RestartBackoffTimeStrategy restartStrategy = factory.create(); + for (int i = 0; i < failureCount; i++) { + restartStrategy.notifyFailure(failure); + } + return restartStrategy.getBackoffTime(); + }, + expectedNumbers); } @Test - void testMultipleSettings() throws Exception { + void testMultipleSettings() { ManualClock clock = new ManualClock(); final long initialBackoffMS = 1L; final long maxBackoffMS = 9L; @@ -199,6 +232,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { Integer.MAX_VALUE); // ensure initial data + restartStrategy.notifyFailure(failure); assertThat(restartStrategy.canRestart()).isTrue(); assertThat(restartStrategy.getBackoffTime()).isEqualTo(initialBackoffMS); @@ -214,16 +248,6 @@ class ExponentialDelayRestartBackoffTimeStrategyTest { assertThat(restartStrategy.canRestart()).isTrue(); assertThat(restartStrategy.getBackoffTime()).isEqualTo(2L); - clock.advanceTime(3, TimeUnit.MILLISECONDS); - restartStrategy.notifyFailure(failure); - assertThat(restartStrategy.canRestart()).isTrue(); - assertCorrectRandomRange(restartStrategy::getBackoffTime, 3L, 4L, 5L); - - clock.advanceTime(7, TimeUnit.MILLISECONDS); - restartStrategy.notifyFailure(failure); - assertThat(restartStrategy.canRestart()).isTrue(); - assertCorrectRandomRange(restartStrategy::getBackoffTime, 6L, 7L, 8L, 9L); - // ensure backoff is reset after threshold is reached clock.advanceTime(resetBackoffThresholdMS + 9 + 1, TimeUnit.MILLISECONDS); restartStrategy.notifyFailure(failure);
