This is an automated email from the ASF dual-hosted git repository.

huweihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0acfc1a51e8802ebd0eb1ab5c00bbfab5032ebdf
Author: Xiangyu Feng <[email protected]>
AuthorDate: Fri Dec 1 23:59:36 2023 +0800

    [FLINK-33702][core] Add the IncrementalDelayRetryStrategy implementation of 
RetryStrategy
---
 .../concurrent/IncrementalDelayRetryStrategy.java  | 80 ++++++++++++++++++++
 .../IncrementalDelayRetryStrategyTest.java         | 85 ++++++++++++++++++++++
 2 files changed, 165 insertions(+)

diff --git 
a/flink-core/src/main/java/org/apache/flink/util/concurrent/IncrementalDelayRetryStrategy.java
 
b/flink-core/src/main/java/org/apache/flink/util/concurrent/IncrementalDelayRetryStrategy.java
new file mode 100644
index 00000000000..a412a307e3d
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/util/concurrent/IncrementalDelayRetryStrategy.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.concurrent;
+
+import org.apache.flink.util.Preconditions;
+
+import java.time.Duration;
+
+/** An implementation of {@link RetryStrategy} that retries at an incremental 
delay with a cap. */
+public class IncrementalDelayRetryStrategy implements RetryStrategy {
+    private final int remainingRetries;
+    private final Duration currentRetryDelay;
+    private final Duration increment;
+    private final Duration maxRetryDelay;
+
+    /**
+     * @param remainingRetries number of times to retry
+     * @param currentRetryDelay the current delay between retries
+     * @param increment the delay increment between retries
+     * @param maxRetryDelay the max delay between retries
+     */
+    public IncrementalDelayRetryStrategy(
+            int remainingRetries,
+            Duration currentRetryDelay,
+            Duration increment,
+            Duration maxRetryDelay) {
+        Preconditions.checkArgument(
+                remainingRetries >= 0, "The number of retries must be greater 
or equal to 0.");
+        this.remainingRetries = remainingRetries;
+        Preconditions.checkArgument(
+                currentRetryDelay.toMillis() >= 0, "The currentRetryDelay must 
be positive");
+        this.currentRetryDelay = currentRetryDelay;
+        Preconditions.checkArgument(
+                increment.toMillis() >= 0, "The delay increment must be 
greater or equal to 0.");
+        this.increment = increment;
+        Preconditions.checkArgument(
+                maxRetryDelay.toMillis() >= 0, "The maxRetryDelay must be 
positive");
+        this.maxRetryDelay = maxRetryDelay;
+    }
+
+    @Override
+    public int getNumRemainingRetries() {
+        return remainingRetries;
+    }
+
+    @Override
+    public Duration getRetryDelay() {
+        return currentRetryDelay;
+    }
+
+    @Override
+    public RetryStrategy getNextRetryStrategy() {
+        int nextRemainingRetries = remainingRetries - 1;
+        Preconditions.checkState(
+                nextRemainingRetries >= 0, "The number of remaining retries 
must not be negative");
+        long nextRetryDelayMillis =
+                Math.min(currentRetryDelay.plus(increment).toMillis(), 
maxRetryDelay.toMillis());
+        return new IncrementalDelayRetryStrategy(
+                nextRemainingRetries,
+                Duration.ofMillis(nextRetryDelayMillis),
+                increment,
+                maxRetryDelay);
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/util/concurrent/IncrementalDelayRetryStrategyTest.java
 
b/flink-core/src/test/java/org/apache/flink/util/concurrent/IncrementalDelayRetryStrategyTest.java
new file mode 100644
index 00000000000..7920d40a29d
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/util/concurrent/IncrementalDelayRetryStrategyTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.concurrent;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link IncrementalDelayRetryStrategy}. */
+public class IncrementalDelayRetryStrategyTest extends TestLogger {
+
+    @Test
+    public void testGettersNotCapped() throws Exception {
+        RetryStrategy retryStrategy =
+                new IncrementalDelayRetryStrategy(
+                        10, Duration.ofMillis(5L), Duration.ofMillis(4L), 
Duration.ofMillis(20L));
+        assertThat(retryStrategy.getNumRemainingRetries()).isEqualTo(10);
+        
assertThat(retryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(5L));
+
+        RetryStrategy nextRetryStrategy = retryStrategy.getNextRetryStrategy();
+        assertThat(nextRetryStrategy.getNumRemainingRetries()).isEqualTo(9);
+        
assertThat(nextRetryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(9L));
+    }
+
+    @Test
+    public void testGettersHitCapped() throws Exception {
+        RetryStrategy retryStrategy =
+                new IncrementalDelayRetryStrategy(
+                        5, Duration.ofMillis(15L), Duration.ofMillis(10L), 
Duration.ofMillis(20L));
+        assertThat(retryStrategy.getNumRemainingRetries()).isEqualTo(5);
+        
assertThat(retryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(15L));
+
+        RetryStrategy nextRetryStrategy = retryStrategy.getNextRetryStrategy();
+        assertThat(nextRetryStrategy.getNumRemainingRetries()).isEqualTo(4);
+        
assertThat(nextRetryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(20L));
+    }
+
+    @Test
+    public void testGettersAtCap() throws Exception {
+        RetryStrategy retryStrategy =
+                new IncrementalDelayRetryStrategy(
+                        5, Duration.ofMillis(20L), Duration.ofMillis(5L), 
Duration.ofMillis(20L));
+        assertThat(retryStrategy.getNumRemainingRetries()).isEqualTo(5);
+        
assertThat(retryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(20L));
+
+        RetryStrategy nextRetryStrategy = retryStrategy.getNextRetryStrategy();
+        assertThat(nextRetryStrategy.getNumRemainingRetries()).isEqualTo(4);
+        
assertThat(nextRetryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(20L));
+    }
+
+    /** Tests that getting a next RetryStrategy below zero remaining retries 
fails. */
+    @Test
+    public void testRetryFailure() {
+        assertThatThrownBy(
+                        () ->
+                                new IncrementalDelayRetryStrategy(
+                                                0,
+                                                Duration.ofMillis(20L),
+                                                Duration.ofMillis(5L),
+                                                Duration.ofMillis(20L))
+                                        .getNextRetryStrategy())
+                .isInstanceOf(IllegalStateException.class);
+    }
+}

Reply via email to