KarmaGYZ commented on code in PR #23773:
URL: https://github.com/apache/flink/pull/23773#discussion_r1402826269


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriverFactory.java:
##########
@@ -46,11 +51,13 @@ public ZooKeeperLeaderRetrievalDriverFactory(
     public ZooKeeperLeaderRetrievalDriver createLeaderRetrievalDriver(
             LeaderRetrievalEventHandler leaderEventHandler, FatalErrorHandler 
fatalErrorHandler)
             throws Exception {
+        driverReferenceCounter.incrementAndGet();

Review Comment:
   I think each retrievalPath should have a seperate counter, do I understand 
it correctly?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java:
##########
@@ -69,8 +68,40 @@ public class ZooKeeperLeaderRetrievalDriver implements 
LeaderRetrievalDriver {
 
     private final FatalErrorHandler fatalErrorHandler;
 
+    /**
+     * Each {@code ZooKeeperLeaderRetrievalDriver} has its own watcher 
initialized. There is a bug

Review Comment:
   Sorry, I'm not familiar with it. At least I can't find a public api to 
retrieve the watch info. Maybe you can verify with [this 
guide](https://kb.altinity.com/altinity-kb-setup-and-maintenance/altinity-kb-zookeeper/altinity-kb-how-to-check-the-list-of-watches/)
 to see whether we can get the watches of TestServer and wether there are 
leakage.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java:
##########
@@ -126,17 +161,13 @@ public void close() throws Exception {
 
         cache.close();
 
-        try {
-            if (client.getZookeeperClient().isConnected()
-                    && 
!connectionInformationPath.contains(RESOURCE_MANAGER_NODE)) {
-                client.watchers()
-                        .removeAll()
-                        .ofType(Watcher.WatcherType.Any)
-                        .forPath(connectionInformationPath);
-            }
-        } catch (KeeperException.NoWatcherException e) {
-            // Ignore the no watcher exception as it's just a safetynet to fix 
watcher leak issue.
-            // For more details, please refer to FLINK-33053.
+        if (client.getZookeeperClient().isConnected()
+                && watcherReferenceCounter.getAndIncrement() == 1) {

Review Comment:
   getAndDecrement?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to