Repository: samza
Updated Branches:
  refs/heads/master 475b4654c -> 8bcfded46


SAMZA-1257: make sure a dataChange listener in LeaderElection is always 
initialized.

xinyuiscool, navina please review.

Author: Boris Shkolnik <[email protected]>

Reviewers: Navina Ramesh <[email protected]>

Closes #156 from sborya/SAMZA-1257


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8bcfded4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8bcfded4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8bcfded4

Branch: refs/heads/master
Commit: 8bcfded46a4617cec6d98bc1db79fa13978916bb
Parents: 475b465
Author: Boris Shkolnik <[email protected]>
Authored: Wed May 3 17:03:08 2017 -0700
Committer: nramesh <[email protected]>
Committed: Wed May 3 17:03:08 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/samza/zk/ZkLeaderElector.java    | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/8bcfded4/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index 644864a..4ffe3e4 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -105,17 +105,17 @@ public class ZkLeaderElector implements LeaderElector {
     LOG.info("Index = " + index + " Not eligible to be a leader yet!");
     String predecessor = children.get(index - 1);
     if (!predecessor.equals(currentSubscription)) {
-      if (currentSubscription != null) {
-
-        // callback in case if the previous node gets deleted (when previous 
processor dies)
-        if (previousProcessorChangeListener == null)
-          previousProcessorChangeListener =  new 
PreviousProcessorChangeListener(leaderElectorListener);
 
+      if (currentSubscription != null) {
         LOG.debug(zLog("Unsubscribing data change for " + 
currentSubscription));
         zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + 
currentSubscription,
             previousProcessorChangeListener);
       }
       currentSubscription = predecessor;
+      // callback in case if the previous node gets deleted (when previous 
processor dies)
+      if (previousProcessorChangeListener == null)
+        previousProcessorChangeListener =  new 
PreviousProcessorChangeListener(leaderElectorListener);
+
       LOG.info(zLog("Subscribing data change for " + predecessor));
       zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + 
currentSubscription,
           previousProcessorChangeListener);

Reply via email to