[FLINK-7021] [core] Handle Zookeeper leader retrieval error in TaskManager and 
throw RuntimeException

This closes #4214.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/45aceb4d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/45aceb4d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/45aceb4d

Branch: refs/heads/master
Commit: 45aceb4d8d0e2f4b8af2fc04c0ff403e7fc001b6
Parents: 614cc58
Author: Scott Kidder <[email protected]>
Authored: Wed Jun 28 07:48:33 2017 -0700
Committer: Till <[email protected]>
Committed: Thu Oct 19 14:59:00 2017 +0200

----------------------------------------------------------------------
 .../leaderretrieval/ZooKeeperLeaderRetrievalService.java  | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/45aceb4d/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
index c60fe2c..d5e172b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.leaderretrieval;
 
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.NodeCache;
 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
@@ -41,7 +43,7 @@ import java.util.UUID;
  * been elected by the {@link 
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService}.
  * The leader address as well as the current leader session ID is retrieved 
from ZooKeeper.
  */
-public class ZooKeeperLeaderRetrievalService implements 
LeaderRetrievalService, NodeCacheListener {
+public class ZooKeeperLeaderRetrievalService implements 
LeaderRetrievalService, NodeCacheListener, UnhandledErrorListener {
        private static final Logger LOG = LoggerFactory.getLogger(
                ZooKeeperLeaderRetrievalService.class);
 
@@ -97,6 +99,7 @@ public class ZooKeeperLeaderRetrievalService implements 
LeaderRetrievalService,
                synchronized (lock) {
                        leaderListener = listener;
 
+                       client.getUnhandledErrorListenable().addListener(this);
                        cache.getListenable().addListener(this);
                        cache.start();
 
@@ -196,4 +199,9 @@ public class ZooKeeperLeaderRetrievalService implements 
LeaderRetrievalService,
                                break;
                }
        }
+
+       @Override
+       public void unhandledError(String s, Throwable throwable) {
+               leaderListener.handleError(new FlinkException("Unhandled error 
in ZooKeeperLeaderRetrievalService:" + s, throwable));
+       }
 }

Reply via email to