This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.0 by this push:
new 2a5ad332ca9 [FLINK-38499][runtime] Limit max sleep time in Curator for
Zookeeper HA
2a5ad332ca9 is described below
commit 2a5ad332ca9928b6fcb98fbfe57ff9099a0f72d3
Author: Mingliang Liu <[email protected]>
AuthorDate: Sat Oct 11 00:41:29 2025 -0700
[FLINK-38499][runtime] Limit max sleep time in Curator for Zookeeper HA
This closes #27104
---
docs/content.zh/docs/deployment/ha/zookeeper_ha.md | 15 ++++++++++++++-
docs/content/docs/deployment/ha/zookeeper_ha.md | 13 +++++++++++++
.../generated/expert_high_availability_zk_section.html | 6 ++++++
.../generated/high_availability_configuration.html | 6 ++++++
.../flink/configuration/HighAvailabilityOptions.java | 9 +++++++++
.../org/apache/flink/runtime/util/ZooKeeperUtils.java | 12 ++++++++++--
.../leaderelection/ZooKeeperLeaderElectionTest.java | 4 ++--
.../org/apache/flink/runtime/util/ZooKeeperUtilsTest.java | 4 ++--
8 files changed, 62 insertions(+), 7 deletions(-)
diff --git a/docs/content.zh/docs/deployment/ha/zookeeper_ha.md
b/docs/content.zh/docs/deployment/ha/zookeeper_ha.md
index edd7ab6a4a0..57f0c8bbe8e 100644
--- a/docs/content.zh/docs/deployment/ha/zookeeper_ha.md
+++ b/docs/content.zh/docs/deployment/ha/zookeeper_ha.md
@@ -98,7 +98,20 @@ zookeeper.sasl.login-context-name: Client
{{< top >}}
-## Advanced Configuration
+## 高级配置
+
+### ZooKeeper 客户端重试配置
+
+当 ZooKeeper 连接失败或中断时,Flink 会使用有界指数退避策略自动重试连接。该策略会逐步增加重试之间的等待时间(每次加倍),以避免对
ZooKeeper 集群造成过大压力,同时限制最大等待时间以确保能够相对快速地恢复。
+
+- **[high-availability.zookeeper.client.retry-wait]({{< ref
"docs/deployment/config" >}}#high-availability-zookeeper-client-retry-wait)**
(默认值: `5s`):
+ 连续重试之间的初始等待时间。该值会随着每次重试而加倍(指数退避)。
+
+- **[high-availability.zookeeper.client.max-retry-wait]({{< ref
"docs/deployment/config"
>}}#high-availability-zookeeper-client-max-retry-wait)** (默认值: `60s`):
+ 重试之间的最大等待时间。这限制了指数退避,以确保在 ZooKeeper 长时间故障期间恢复不会变得过慢。
+
+- **[high-availability.zookeeper.client.max-retry-attempts]({{< ref
"docs/deployment/config"
>}}#high-availability-zookeeper-client-max-retry-attempts)** (默认值: `3`):
+ 放弃之前的最大连接重试次数。在这么多次失败尝试后,操作将失败。
### Tolerating Suspended ZooKeeper Connections
diff --git a/docs/content/docs/deployment/ha/zookeeper_ha.md
b/docs/content/docs/deployment/ha/zookeeper_ha.md
index 6d33430cb6d..07de0edc629 100644
--- a/docs/content/docs/deployment/ha/zookeeper_ha.md
+++ b/docs/content/docs/deployment/ha/zookeeper_ha.md
@@ -107,6 +107,19 @@ You can also find further details on [how Flink sets up
Kerberos-based security
## Advanced Configuration
+### ZooKeeper Client Retry Configuration
+
+When ZooKeeper connections fail or are interrupted, Flink automatically
retries the connection using a bounded exponential backoff strategy. This
strategy progressively increases the wait time between retries (doubling each
time) to avoid overwhelming the ZooKeeper cluster, while capping the maximum
wait time to ensure reasonably fast recovery.
+
+- **[high-availability.zookeeper.client.retry-wait]({{< ref
"docs/deployment/config" >}}#high-availability-zookeeper-client-retry-wait)**
(default: `5s`):
+ Initial wait time between consecutive retries. This value doubles with each
retry (exponential backoff).
+
+- **[high-availability.zookeeper.client.max-retry-wait]({{< ref
"docs/deployment/config"
>}}#high-availability-zookeeper-client-max-retry-wait)** (default: `60s`):
+ Maximum wait time between retries. This caps the exponential backoff to
ensure recovery doesn't become unreasonably slow during extended ZooKeeper
outages.
+
+- **[high-availability.zookeeper.client.max-retry-attempts]({{< ref
"docs/deployment/config"
>}}#high-availability-zookeeper-client-max-retry-attempts)** (default: `3`):
+ Maximum number of connection retry attempts before giving up. After this
many failed attempts, the operation will fail.
+
### Tolerating Suspended ZooKeeper Connections
Per default, Flink's ZooKeeper client treats suspended ZooKeeper connections
as an error.
diff --git
a/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html
b/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html
index 5d65775b1e1..c7ce745452d 100644
--- a/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html
+++ b/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html
@@ -32,6 +32,12 @@
<td>Integer</td>
<td>Defines the number of connection retries before the client
gives up.</td>
</tr>
+ <tr>
+ <td><h5>high-availability.zookeeper.client.max-retry-wait</h5></td>
+ <td style="word-wrap: break-word;">1 min</td>
+ <td>Duration</td>
+ <td>Defines the maximum retry wait time in milliseconds for each
attempt. This caps the exponential backoff to prevent excessively long waits
between retries.</td>
+ </tr>
<tr>
<td><h5>high-availability.zookeeper.client.retry-wait</h5></td>
<td style="word-wrap: break-word;">5 s</td>
diff --git
a/docs/layouts/shortcodes/generated/high_availability_configuration.html
b/docs/layouts/shortcodes/generated/high_availability_configuration.html
index 4177fb5e781..26a6b300907 100644
--- a/docs/layouts/shortcodes/generated/high_availability_configuration.html
+++ b/docs/layouts/shortcodes/generated/high_availability_configuration.html
@@ -68,6 +68,12 @@
<td>Integer</td>
<td>Defines the number of connection retries before the client
gives up.</td>
</tr>
+ <tr>
+ <td><h5>high-availability.zookeeper.client.max-retry-wait</h5></td>
+ <td style="word-wrap: break-word;">1 min</td>
+ <td>Duration</td>
+ <td>Defines the maximum retry wait time in milliseconds for each
attempt. This caps the exponential backoff to prevent excessively long waits
between retries.</td>
+ </tr>
<tr>
<td><h5>high-availability.zookeeper.client.retry-wait</h5></td>
<td style="word-wrap: break-word;">5 s</td>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index c02d9b12bd6..30480de619e 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -166,6 +166,15 @@ public class HighAvailabilityOptions {
.withDeprecatedKeys("recovery.zookeeper.client.retry-wait")
.withDescription("Defines the pause between consecutive
retries.");
+
@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
+ public static final ConfigOption<Duration> ZOOKEEPER_MAX_RETRY_WAIT =
+ key("high-availability.zookeeper.client.max-retry-wait")
+ .durationType()
+ .defaultValue(Duration.ofMillis(60000))
+ .withDescription(
+ "Defines the maximum retry wait time in
milliseconds for each attempt. "
+ + "This caps the exponential backoff to
prevent excessively long waits between retries.");
+
@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
public static final ConfigOption<Integer> ZOOKEEPER_MAX_RETRY_ATTEMPTS =
key("high-availability.zookeeper.client.max-retry-attempts")
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index c4292aa6c16..a1ed8f78f0e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -65,7 +65,7 @@ import
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cac
import
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCacheListener;
import
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCacheSelector;
import
org.apache.flink.shaded.curator5.org.apache.curator.framework.state.SessionConnectionStateErrorPolicy;
-import
org.apache.flink.shaded.curator5.org.apache.curator.retry.ExponentialBackoffRetry;
+import
org.apache.flink.shaded.curator5.org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.CreateMode;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooDefs;
@@ -210,6 +210,12 @@ public class ZooKeeperUtils {
int maxRetryAttempts =
configuration.get(HighAvailabilityOptions.ZOOKEEPER_MAX_RETRY_ATTEMPTS);
+ int maxRetryWait =
+ Math.toIntExact(
+ configuration
+
.get(HighAvailabilityOptions.ZOOKEEPER_MAX_RETRY_WAIT)
+ .toMillis());
+
String root =
configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT);
String namespace =
configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
@@ -252,7 +258,9 @@ public class ZooKeeperUtils {
.connectString(zkQuorum)
.sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(connectionTimeout)
- .retryPolicy(new ExponentialBackoffRetry(retryWait,
maxRetryAttempts))
+ .retryPolicy(
+ new BoundedExponentialBackoffRetry(
+ retryWait, maxRetryWait,
maxRetryAttempts))
// Curator prepends a '/' manually and throws an
Exception if the
// namespace starts with a '/'.
.namespace(trimStartingSlash(rootWithNamespace))
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 333dffba724..62157dc66e4 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -43,7 +43,7 @@ import
org.apache.flink.shaded.curator5.org.apache.curator.framework.api.CreateB
import
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData;
import
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCache;
import
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener;
-import
org.apache.flink.shaded.curator5.org.apache.curator.retry.ExponentialBackoffRetry;
+import
org.apache.flink.shaded.curator5.org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.CreateMode;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.ACL;
@@ -558,7 +558,7 @@ class ZooKeeperLeaderElectionTest {
final CuratorFrameworkFactory.Builder curatorFrameworkBuilder =
CuratorFrameworkFactory.builder()
.connectString(zooKeeperResource.getCustomExtension().getConnectString())
- .retryPolicy(new ExponentialBackoffRetry(1, 0))
+ .retryPolicy(new BoundedExponentialBackoffRetry(1, 10,
0))
.aclProvider(
new ACLProvider() {
// trigger background exception
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTest.java
index 2b44e4cf357..4799e0a51c1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import
org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFrameworkFactory;
-import
org.apache.flink.shaded.curator5.org.apache.curator.retry.ExponentialBackoffRetry;
+import
org.apache.flink.shaded.curator5.org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.junit.jupiter.api.Test;
@@ -94,7 +94,7 @@ class ZooKeeperUtilsTest {
final CuratorFrameworkFactory.Builder curatorFrameworkBuilder =
CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
- .retryPolicy(new ExponentialBackoffRetry(1, 1))
+ .retryPolicy(new BoundedExponentialBackoffRetry(1, 10,
1))
.zookeeperFactory(
(s, i, watcher, b) -> {
throw new RuntimeException(errorMsg);