Repository: flink Updated Branches: refs/heads/master 614cc58a1 -> 3a45a796e
[FLINK-7021] Add UnhandledErrorListener to ZooKeeperLeaderElectionService Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3a45a796 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3a45a796 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3a45a796 Branch: refs/heads/master Commit: 3a45a796e3511c039398ff26ef282d5fc48550de Parents: 240542f Author: Till <[email protected]> Authored: Wed Oct 18 14:03:10 2017 +0200 Committer: Till <[email protected]> Committed: Thu Oct 19 14:59:00 2017 +0200 ---------------------------------------------------------------------- .../leaderelection/ZooKeeperLeaderElectionService.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3a45a796/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java index 21e2ca1..920af24 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java @@ -19,8 +19,10 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; @@ -45,7 +47,7 @@ import java.util.UUID; * ZooKeeper. The current leader's address as well as its leader session ID is published via * ZooKeeper as well. */ -public class ZooKeeperLeaderElectionService implements LeaderElectionService, LeaderLatchListener, NodeCacheListener { +public class ZooKeeperLeaderElectionService implements LeaderElectionService, LeaderLatchListener, NodeCacheListener, UnhandledErrorListener { private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionService.class); @@ -118,6 +120,8 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le synchronized (lock) { + client.getUnhandledErrorListenable().addListener(this); + leaderContender = contender; leaderLatch.addListener(this); @@ -146,6 +150,8 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le LOG.info("Stopping ZooKeeperLeaderElectionService {}.", this); + client.getUnhandledErrorListenable().removeListener(this); + client.getConnectionStateListenable().removeListener(listener); Exception exception = null; @@ -401,4 +407,9 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le break; } } + + @Override + public void unhandledError(String message, Throwable e) { + leaderContender.handleError(new FlinkException("Unhandled error in ZooKeeperLeaderElectionService: " + message, e)); + } }
