KarmaGYZ commented on code in PR #23773:
URL: https://github.com/apache/flink/pull/23773#discussion_r1402826269
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriverFactory.java:
##########
@@ -46,11 +51,13 @@ public ZooKeeperLeaderRetrievalDriverFactory(
public ZooKeeperLeaderRetrievalDriver createLeaderRetrievalDriver(
LeaderRetrievalEventHandler leaderEventHandler, FatalErrorHandler
fatalErrorHandler)
throws Exception {
+ driverReferenceCounter.incrementAndGet();
Review Comment:
I think each retrievalPath should have a seperate counter, do I understand
it correctly?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java:
##########
@@ -69,8 +68,40 @@ public class ZooKeeperLeaderRetrievalDriver implements
LeaderRetrievalDriver {
private final FatalErrorHandler fatalErrorHandler;
+ /**
+ * Each {@code ZooKeeperLeaderRetrievalDriver} has its own watcher
initialized. There is a bug
Review Comment:
Sorry, I'm not familiar with it. At least I can't find a public api to
retrieve the watch info. Maybe you can verify with [this
guide](https://kb.altinity.com/altinity-kb-setup-and-maintenance/altinity-kb-zookeeper/altinity-kb-how-to-check-the-list-of-watches/)
to see whether we can get the watches of TestServer and wether there are
leakage.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java:
##########
@@ -126,17 +161,13 @@ public void close() throws Exception {
cache.close();
- try {
- if (client.getZookeeperClient().isConnected()
- &&
!connectionInformationPath.contains(RESOURCE_MANAGER_NODE)) {
- client.watchers()
- .removeAll()
- .ofType(Watcher.WatcherType.Any)
- .forPath(connectionInformationPath);
- }
- } catch (KeeperException.NoWatcherException e) {
- // Ignore the no watcher exception as it's just a safetynet to fix
watcher leak issue.
- // For more details, please refer to FLINK-33053.
+ if (client.getZookeeperClient().isConnected()
+ && watcherReferenceCounter.getAndIncrement() == 1) {
Review Comment:
getAndDecrement?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]