SAMZA-1368; make sure new job model will be generated in case of barrier 
timeout.

Author: Boris Shkolnik <bor...@apache.org>

Reviewers: Shanthoosh V <svenk...@linkedin.com>

Closes #247 from sborya/onBarrierTimeout1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1c113939
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1c113939
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1c113939

Branch: refs/heads/0.14.0
Commit: 1c1139399599c1cb31249e8d7a28291e2ad9d27e
Parents: 4eb5153
Author: Boris Shkolnik <bor...@apache.org>
Authored: Fri Jul 21 15:32:58 2017 -0700
Committer: Jagadish <jagad...@apache.org>
Committed: Fri Jul 21 15:32:58 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/samza/zk/ZkJobCoordinator.java  | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1c113939/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index dd08e3f..e973099 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -313,11 +313,17 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
         debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> 
onNewJobModelConfirmed(version));
       } else {
         if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
-          // no-op
-          // In our consensus model, if the Barrier is timed-out, then it 
means that one or more initial
-          // participants failed to join. That means, they should have 
de-registered from "processors" list
-          // and that would have triggered onProcessorChange action -> a new 
round of consensus.
-          LOG.info("Barrier for version " + version + " timed out.");
+          // no-op for non-leaders
+          // for leader: make sure we do not stop - so generate a new job model
+          LOG.warn("Barrier for version " + version + " timed out.");
+          if (zkController.isLeader()) {
+            LOG.info("Leader will schedule a new job model generation");
+            
debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
 debounceTimeMs, () ->
+              {
+                // actual actions to do are the same as onProcessorChange
+                doOnProcessorChange(new ArrayList<>());
+              });
+          }
         }
       }
     }

Reply via email to