Repository: flink
Updated Branches:
  refs/heads/master 614cc58a1 -> 3a45a796e


[FLINK-7021] Add UnhandledErrorListener to ZooKeeperLeaderElectionService


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3a45a796
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3a45a796
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3a45a796

Branch: refs/heads/master
Commit: 3a45a796e3511c039398ff26ef282d5fc48550de
Parents: 240542f
Author: Till <[email protected]>
Authored: Wed Oct 18 14:03:10 2017 +0200
Committer: Till <[email protected]>
Committed: Thu Oct 19 14:59:00 2017 +0200

----------------------------------------------------------------------
 .../leaderelection/ZooKeeperLeaderElectionService.java | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3a45a796/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index 21e2ca1..920af24 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -19,8 +19,10 @@
 package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.NodeCache;
 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
@@ -45,7 +47,7 @@ import java.util.UUID;
  * ZooKeeper. The current leader's address as well as its leader session ID is 
published via
  * ZooKeeper as well.
  */
-public class ZooKeeperLeaderElectionService implements LeaderElectionService, 
LeaderLatchListener, NodeCacheListener {
+public class ZooKeeperLeaderElectionService implements LeaderElectionService, 
LeaderLatchListener, NodeCacheListener, UnhandledErrorListener {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperLeaderElectionService.class);
 
@@ -118,6 +120,8 @@ public class ZooKeeperLeaderElectionService implements 
LeaderElectionService, Le
 
                synchronized (lock) {
 
+                       client.getUnhandledErrorListenable().addListener(this);
+
                        leaderContender = contender;
 
                        leaderLatch.addListener(this);
@@ -146,6 +150,8 @@ public class ZooKeeperLeaderElectionService implements 
LeaderElectionService, Le
 
                LOG.info("Stopping ZooKeeperLeaderElectionService {}.", this);
 
+               client.getUnhandledErrorListenable().removeListener(this);
+
                client.getConnectionStateListenable().removeListener(listener);
 
                Exception exception = null;
@@ -401,4 +407,9 @@ public class ZooKeeperLeaderElectionService implements 
LeaderElectionService, Le
                                break;
                }
        }
+
+       @Override
+       public void unhandledError(String message, Throwable e) {
+               leaderContender.handleError(new FlinkException("Unhandled error 
in ZooKeeperLeaderElectionService: " + message, e));
+       }
 }

Reply via email to