qinf commented on code in PR #23867:
URL: https://github.com/apache/flink/pull/23867#discussion_r1437967171
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java:
##########
@@ -214,28 +266,111 @@ void testMultipleSettings() throws Exception {
assertThat(restartStrategy.canRestart()).isTrue();
assertThat(restartStrategy.getBackoffTime()).isEqualTo(2L);
- clock.advanceTime(3, TimeUnit.MILLISECONDS);
- restartStrategy.notifyFailure(failure);
- assertThat(restartStrategy.canRestart()).isTrue();
- assertCorrectRandomRange(restartStrategy::getBackoffTime, 3L, 4L, 5L);
-
- clock.advanceTime(7, TimeUnit.MILLISECONDS);
- restartStrategy.notifyFailure(failure);
- assertThat(restartStrategy.canRestart()).isTrue();
- assertCorrectRandomRange(restartStrategy::getBackoffTime, 6L, 7L, 8L,
9L);
-
// ensure backoff is reset after threshold is reached
clock.advanceTime(resetBackoffThresholdMS + 9 + 1,
TimeUnit.MILLISECONDS);
restartStrategy.notifyFailure(failure);
assertThat(restartStrategy.canRestart()).isTrue();
assertThat(restartStrategy.getBackoffTime()).isOne();
+ clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1));
// ensure backoff still increases
restartStrategy.notifyFailure(failure);
assertThat(restartStrategy.canRestart()).isTrue();
assertThat(restartStrategy.getBackoffTime()).isEqualTo(2L);
}
+ @Test
+ void testMergeMultipleExceptionsIntoOneAttempt() {
+ ManualClock clock = new ManualClock();
+ long initialBackoffMS = 2L;
+ double backoffMultiplier = 2.0d;
+ final long maxBackoffMS = 6L;
+ final long resetBackoffThresholdMS = 80L;
+
+ final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
+ new ExponentialDelayRestartBackoffTimeStrategy(
+ clock,
+ initialBackoffMS,
+ maxBackoffMS,
+ backoffMultiplier,
+ resetBackoffThresholdMS,
+ 0.d,
+ 3);
+
+ // All exceptions merged into one attempt if the time is same.
+ long currentBackOffMs = initialBackoffMS;
+ checkMultipleExceptionsAreMerged(clock, currentBackOffMs,
restartStrategy);
+
+ // After advance time it's a new round, so new exception will be a new
attempt.
+ clock.advanceTime(1, TimeUnit.MILLISECONDS);
+ currentBackOffMs *= backoffMultiplier;
+ checkMultipleExceptionsAreMerged(clock, currentBackOffMs,
restartStrategy);
+
+ // After advance time it's a new round, so new exception will be a new
attempt.
+ clock.advanceTime(1, TimeUnit.MILLISECONDS);
+ currentBackOffMs = maxBackoffMS;
+ checkMultipleExceptionsAreMerged(clock, currentBackOffMs,
restartStrategy);
+
+ // After advance time it's a new round, and it reaches the maxAttempts.
+ clock.advanceTime(1, TimeUnit.MILLISECONDS);
+ restartStrategy.notifyFailure(failure);
+ assertThat(restartStrategy.canRestart()).isFalse();
+ }
+
+ @Test
+ void testMergingExceptionsWorksWithResetting() {
+ ManualClock clock = new ManualClock();
+ long initialBackoffMS = 2L;
+ double backoffMultiplier = 2.0d;
+ final long maxBackoffMS = 6L;
+ final long resetBackoffThresholdMS = 80L;
+
+ final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
+ new ExponentialDelayRestartBackoffTimeStrategy(
+ clock,
+ initialBackoffMS,
+ maxBackoffMS,
+ backoffMultiplier,
+ resetBackoffThresholdMS,
+ 0.d,
+ 3);
+
+ // Test the merging logic works well after a series of resetting.
+ for (int i = 0; i < 10; i++) {
+ // All exceptions merged into one attempt if the time is same.
+ long currentBackOffMs = initialBackoffMS;
+ checkMultipleExceptionsAreMerged(clock, currentBackOffMs,
restartStrategy);
+
+ // After advance time it's a new round, so new exception will be a
new attempt.
+ clock.advanceTime(1, TimeUnit.MILLISECONDS);
+ currentBackOffMs *= backoffMultiplier;
+ checkMultipleExceptionsAreMerged(clock, currentBackOffMs,
restartStrategy);
+
+ // After advance time it's a new round, so new exception will be a
new attempt.
+ clock.advanceTime(1, TimeUnit.MILLISECONDS);
+ currentBackOffMs = maxBackoffMS;
+ checkMultipleExceptionsAreMerged(clock, currentBackOffMs,
restartStrategy);
+
+ // After resetBackoffThresholdMS+1, the restartStrategy should be
reset.
+ clock.advanceTime(resetBackoffThresholdMS + 1,
TimeUnit.MILLISECONDS);
+ }
Review Comment:
The restartStrategy should be reset after resetBackoffThresholdMS.
It is better to change the comment and `clock.advanceTime` to:
```Java
// After resetBackoffThresholdMS, the restartStrategy should be reset.
clock.advanceTime(resetBackoffThresholdMS, TimeUnit.MILLISECONDS);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]