SAMZA-1365: Calling zkClient.close from zkWatch impl blocks indefinitely. Author: Shanthoosh Venkataraman <[email protected]>
Reviewers: Navina Ramesh <[email protected]> Closes #253 from shanthoosh/SAMZA-1365 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0a4ecb23 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0a4ecb23 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0a4ecb23 Branch: refs/heads/0.14.0 Commit: 0a4ecb232a4fec0d083ecc065a0ad8d4518600ae Parents: fde5564 Author: Shanthoosh Venkataraman <[email protected]> Authored: Thu Aug 3 14:32:09 2017 -0700 Committer: navina <[email protected]> Committed: Thu Aug 3 14:32:09 2017 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/samza/zk/ZkJobCoordinator.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/0a4ecb23/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 2204240..9f64b3a 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -333,6 +333,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { /// listener to handle session expiration class ZkSessionStateChangedListener implements IZkStateListener { + private static final String ZK_SESSION_ERROR = "ZK_SESSION_ERROR"; + @Override public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception { @@ -367,7 +369,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { throws Exception { // this means we cannot connect to zookeeper LOG.info("handleSessionEstablishmentError received for processor=" + processorId, error); - stop(); + debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop()); } }
