[FLINK-7021] [core] Handle Zookeeper leader retrieval error in TaskManager and throw RuntimeException
This closes #4214. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/45aceb4d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/45aceb4d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/45aceb4d Branch: refs/heads/master Commit: 45aceb4d8d0e2f4b8af2fc04c0ff403e7fc001b6 Parents: 614cc58 Author: Scott Kidder <[email protected]> Authored: Wed Jun 28 07:48:33 2017 -0700 Committer: Till <[email protected]> Committed: Thu Oct 19 14:59:00 2017 +0200 ---------------------------------------------------------------------- .../leaderretrieval/ZooKeeperLeaderRetrievalService.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/45aceb4d/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java index c60fe2c..d5e172b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java @@ -18,9 +18,11 @@ package org.apache.flink.runtime.leaderretrieval; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; @@ -41,7 +43,7 @@ import java.util.UUID; * been elected by the {@link org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService}. * The leader address as well as the current leader session ID is retrieved from ZooKeeper. */ -public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService, NodeCacheListener { +public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService, NodeCacheListener, UnhandledErrorListener { private static final Logger LOG = LoggerFactory.getLogger( ZooKeeperLeaderRetrievalService.class); @@ -97,6 +99,7 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService, synchronized (lock) { leaderListener = listener; + client.getUnhandledErrorListenable().addListener(this); cache.getListenable().addListener(this); cache.start(); @@ -196,4 +199,9 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService, break; } } + + @Override + public void unhandledError(String s, Throwable throwable) { + leaderListener.handleError(new FlinkException("Unhandled error in ZooKeeperLeaderRetrievalService:" + s, throwable)); + } }
