Repository: samza Updated Branches: refs/heads/master 475b4654c -> 8bcfded46
SAMZA-1257: make sure a dataChange listener in LeaderElection is always initialized. xinyuiscool, navina please review. Author: Boris Shkolnik <[email protected]> Reviewers: Navina Ramesh <[email protected]> Closes #156 from sborya/SAMZA-1257 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8bcfded4 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8bcfded4 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8bcfded4 Branch: refs/heads/master Commit: 8bcfded46a4617cec6d98bc1db79fa13978916bb Parents: 475b465 Author: Boris Shkolnik <[email protected]> Authored: Wed May 3 17:03:08 2017 -0700 Committer: nramesh <[email protected]> Committed: Wed May 3 17:03:08 2017 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/samza/zk/ZkLeaderElector.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/8bcfded4/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java index 644864a..4ffe3e4 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java @@ -105,17 +105,17 @@ public class ZkLeaderElector implements LeaderElector { LOG.info("Index = " + index + " Not eligible to be a leader yet!"); String predecessor = children.get(index - 1); if (!predecessor.equals(currentSubscription)) { - if (currentSubscription != null) { - - // callback in case if the previous node gets deleted (when previous processor dies) - if (previousProcessorChangeListener == null) - previousProcessorChangeListener = new PreviousProcessorChangeListener(leaderElectorListener); + if (currentSubscription != null) { LOG.debug(zLog("Unsubscribing data change for " + currentSubscription)); zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, previousProcessorChangeListener); } currentSubscription = predecessor; + // callback in case if the previous node gets deleted (when previous processor dies) + if (previousProcessorChangeListener == null) + previousProcessorChangeListener = new PreviousProcessorChangeListener(leaderElectorListener); + LOG.info(zLog("Subscribing data change for " + predecessor)); zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, previousProcessorChangeListener);
