This is an automated email from the ASF dual-hosted git repository.
guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d5e151f7233 [FLINK-33053][zookeeper] Manually remove the leader
watcher after retriever closed to avoid the watcher leak at zookeeper server
side
d5e151f7233 is described below
commit d5e151f72336abcc13082fe4bb3e05fd5a785e86
Author: Yangze Guo <[email protected]>
AuthorDate: Thu Sep 14 14:50:54 2023 +0800
[FLINK-33053][zookeeper] Manually remove the leader watcher after retriever
closed to avoid the watcher leak at zookeeper server side
---
.../leaderretrieval/ZooKeeperLeaderRetrievalDriver.java | 16 ++++++++++++++++
.../org/apache/flink/runtime/util/ZooKeeperUtils.java | 3 ++-
2 files changed, 18 insertions(+), 1 deletion(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java
index ec1908e3185..a97198617a1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java
@@ -30,6 +30,8 @@ import
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cac
import
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache;
import
org.apache.flink.shaded.curator5.org.apache.curator.framework.state.ConnectionState;
import
org.apache.flink.shaded.curator5.org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +40,7 @@ import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.UUID;
+import static
org.apache.flink.runtime.util.ZooKeeperUtils.RESOURCE_MANAGER_NODE;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -122,6 +125,19 @@ public class ZooKeeperLeaderRetrievalDriver implements
LeaderRetrievalDriver {
client.getConnectionStateListenable().removeListener(connectionStateListener);
cache.close();
+
+ try {
+ if (client.getZookeeperClient().isConnected()
+ &&
!connectionInformationPath.contains(RESOURCE_MANAGER_NODE)) {
+ client.watchers()
+ .removeAll()
+ .ofType(Watcher.WatcherType.Any)
+ .forPath(connectionInformationPath);
+ }
+ } catch (KeeperException.NoWatcherException e) {
+ // Ignore the no watcher exception as it's just a safetynet to fix
watcher leak issue.
+ // For more details, please refer to FLINK-33053.
+ }
}
private void retrieveLeaderInformationFromZooKeeper() {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 3eff5ae64fb..38e7d92548b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -101,7 +101,8 @@ public class ZooKeeperUtils {
/** The prefix of the completed checkpoint file. */
public static final String HA_STORAGE_COMPLETED_CHECKPOINT =
"completedCheckpoint";
- private static final String RESOURCE_MANAGER_NODE = "resource_manager";
+ /** The prefix of the resource manager node. */
+ public static final String RESOURCE_MANAGER_NODE = "resource_manager";
private static final String DISPATCHER_NODE = "dispatcher";