This is an automated email from the ASF dual-hosted git repository. huweihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0acfc1a51e8802ebd0eb1ab5c00bbfab5032ebdf Author: Xiangyu Feng <[email protected]> AuthorDate: Fri Dec 1 23:59:36 2023 +0800 [FLINK-33702][core] Add the IncrementalDelayRetryStrategy implementation of RetryStrategy --- .../concurrent/IncrementalDelayRetryStrategy.java | 80 ++++++++++++++++++++ .../IncrementalDelayRetryStrategyTest.java | 85 ++++++++++++++++++++++ 2 files changed, 165 insertions(+) diff --git a/flink-core/src/main/java/org/apache/flink/util/concurrent/IncrementalDelayRetryStrategy.java b/flink-core/src/main/java/org/apache/flink/util/concurrent/IncrementalDelayRetryStrategy.java new file mode 100644 index 00000000000..a412a307e3d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/IncrementalDelayRetryStrategy.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util.concurrent; + +import org.apache.flink.util.Preconditions; + +import java.time.Duration; + +/** An implementation of {@link RetryStrategy} that retries at an incremental delay with a cap. */ +public class IncrementalDelayRetryStrategy implements RetryStrategy { + private final int remainingRetries; + private final Duration currentRetryDelay; + private final Duration increment; + private final Duration maxRetryDelay; + + /** + * @param remainingRetries number of times to retry + * @param currentRetryDelay the current delay between retries + * @param increment the delay increment between retries + * @param maxRetryDelay the max delay between retries + */ + public IncrementalDelayRetryStrategy( + int remainingRetries, + Duration currentRetryDelay, + Duration increment, + Duration maxRetryDelay) { + Preconditions.checkArgument( + remainingRetries >= 0, "The number of retries must be greater or equal to 0."); + this.remainingRetries = remainingRetries; + Preconditions.checkArgument( + currentRetryDelay.toMillis() >= 0, "The currentRetryDelay must be positive"); + this.currentRetryDelay = currentRetryDelay; + Preconditions.checkArgument( + increment.toMillis() >= 0, "The delay increment must be greater or equal to 0."); + this.increment = increment; + Preconditions.checkArgument( + maxRetryDelay.toMillis() >= 0, "The maxRetryDelay must be positive"); + this.maxRetryDelay = maxRetryDelay; + } + + @Override + public int getNumRemainingRetries() { + return remainingRetries; + } + + @Override + public Duration getRetryDelay() { + return currentRetryDelay; + } + + @Override + public RetryStrategy getNextRetryStrategy() { + int nextRemainingRetries = remainingRetries - 1; + Preconditions.checkState( + nextRemainingRetries >= 0, "The number of remaining retries must not be negative"); + long nextRetryDelayMillis = + Math.min(currentRetryDelay.plus(increment).toMillis(), maxRetryDelay.toMillis()); + return new IncrementalDelayRetryStrategy( + nextRemainingRetries, + Duration.ofMillis(nextRetryDelayMillis), + increment, + maxRetryDelay); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/util/concurrent/IncrementalDelayRetryStrategyTest.java b/flink-core/src/test/java/org/apache/flink/util/concurrent/IncrementalDelayRetryStrategyTest.java new file mode 100644 index 00000000000..7920d40a29d --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/util/concurrent/IncrementalDelayRetryStrategyTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util.concurrent; + +import org.apache.flink.util.TestLogger; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link IncrementalDelayRetryStrategy}. */ +public class IncrementalDelayRetryStrategyTest extends TestLogger { + + @Test + public void testGettersNotCapped() throws Exception { + RetryStrategy retryStrategy = + new IncrementalDelayRetryStrategy( + 10, Duration.ofMillis(5L), Duration.ofMillis(4L), Duration.ofMillis(20L)); + assertThat(retryStrategy.getNumRemainingRetries()).isEqualTo(10); + assertThat(retryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(5L)); + + RetryStrategy nextRetryStrategy = retryStrategy.getNextRetryStrategy(); + assertThat(nextRetryStrategy.getNumRemainingRetries()).isEqualTo(9); + assertThat(nextRetryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(9L)); + } + + @Test + public void testGettersHitCapped() throws Exception { + RetryStrategy retryStrategy = + new IncrementalDelayRetryStrategy( + 5, Duration.ofMillis(15L), Duration.ofMillis(10L), Duration.ofMillis(20L)); + assertThat(retryStrategy.getNumRemainingRetries()).isEqualTo(5); + assertThat(retryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(15L)); + + RetryStrategy nextRetryStrategy = retryStrategy.getNextRetryStrategy(); + assertThat(nextRetryStrategy.getNumRemainingRetries()).isEqualTo(4); + assertThat(nextRetryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(20L)); + } + + @Test + public void testGettersAtCap() throws Exception { + RetryStrategy retryStrategy = + new IncrementalDelayRetryStrategy( + 5, Duration.ofMillis(20L), Duration.ofMillis(5L), Duration.ofMillis(20L)); + assertThat(retryStrategy.getNumRemainingRetries()).isEqualTo(5); + assertThat(retryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(20L)); + + RetryStrategy nextRetryStrategy = retryStrategy.getNextRetryStrategy(); + assertThat(nextRetryStrategy.getNumRemainingRetries()).isEqualTo(4); + assertThat(nextRetryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(20L)); + } + + /** Tests that getting a next RetryStrategy below zero remaining retries fails. */ + @Test + public void testRetryFailure() { + assertThatThrownBy( + () -> + new IncrementalDelayRetryStrategy( + 0, + Duration.ofMillis(20L), + Duration.ofMillis(5L), + Duration.ofMillis(20L)) + .getNextRetryStrategy()) + .isInstanceOf(IllegalStateException.class); + } +}
