Repository: samza
Updated Branches:
  refs/heads/master 67b195363 -> fb7aa73f5


SAMZA-1251 - Remove DebounceTimer dependency from ZkLeaderElector & ZkController

Addresses the following:
* Makes LeaderElectionListener to be explicitly registered by the caller
* Removes debouncetimer dependency from ZkLeaderElector implementation
* [Bug] onBecomeLeader was scheduling a task in timer under "OnBecomeLeader", 
when it should actually be the same as "OnProcessorChange". Otherwise, it will 
not cancel when there is a new OnProcessorChange event.
* [Transient Test Failure] `TestScheduleAfterDebounceTime` tests were relying 
on timing controlled by sleep. Fixed it by using latch

Author: Navina Ramesh <[email protected]>

Reviewers: Xinyu Liu <[email protected]>, Boris Shkolnik <[email protected]>

Closes #153 from navina/SAMZA-1251


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fb7aa73f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fb7aa73f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fb7aa73f

Branch: refs/heads/master
Commit: fb7aa73f5a574a7a57013b49f22896e122668e70
Parents: 67b1953
Author: Navina Ramesh <[email protected]>
Authored: Fri May 5 13:50:08 2017 -0700
Committer: nramesh <[email protected]>
Committed: Fri May 5 13:50:08 2017 -0700

----------------------------------------------------------------------
 .../coordinator/BarrierForVersionUpgrade.java   |   9 +-
 .../samza/coordinator/CoordinationUtils.java    |   5 +-
 .../apache/samza/coordinator/LeaderElector.java |  11 +-
 .../samza/runtime/LocalApplicationRunner.java   |   6 +-
 .../samza/zk/ScheduleAfterDebounceTime.java     |  14 +-
 .../samza/zk/ZkBarrierForVersionUpgrade.java    |  28 +--
 .../java/org/apache/samza/zk/ZkController.java  |  11 +-
 .../org/apache/samza/zk/ZkControllerImpl.java   |  67 ++-----
 .../apache/samza/zk/ZkControllerListener.java   |   9 +-
 .../samza/zk/ZkCoordinationServiceFactory.java  |   4 +-
 .../apache/samza/zk/ZkCoordinationUtils.java    |   6 +-
 .../org/apache/samza/zk/ZkJobCoordinator.java   |  89 +++++----
 .../samza/zk/ZkJobCoordinatorFactory.java       |   4 +-
 .../org/apache/samza/zk/ZkLeaderElector.java    |  57 +++---
 .../samza/job/local/ThreadJobFactory.scala      |   1 -
 .../runtime/TestLocalApplicationRunner.java     |  10 +-
 .../samza/zk/TestScheduleAfterDebounceTime.java |  69 +++----
 .../apache/samza/zk/TestZkLeaderElector.java    | 189 ++++++-------------
 .../samza/container/TestSamzaContainer.scala    |   2 +-
 19 files changed, 254 insertions(+), 337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java
index 145d81c..664cef8 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java
@@ -28,11 +28,12 @@ import java.util.List;
  */
 public interface BarrierForVersionUpgrade {
   /**
-   * Barrier is usually started by the leader.
-   * @param version - for which the barrier is created
-   * @param participatns - list of participants that need to join for barrier 
to complete
+   * Barrier is usually started by the leader. Creates the Barrier paths in ZK
+   *
+   * @param version - String, representing the version of the JobModel for 
which the barrier is created
+   * @param participants - {@link List} of participants that need to join for 
barrier to complete
    */
-  void start(String version, List<String> participatns);
+  void start(String version, List<String> participants);
 
   /**
    * Called by the processor.

http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java 
b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
index 39bda24..952aa51 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
@@ -18,7 +18,9 @@
  */
 package org.apache.samza.coordinator;
 
-/**  THIS API WILL CHANGE
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
  *
  * Coordination service provides synchronization primitives.
  * The actual implementation (for example ZK based) is left to each 
implementation class.
@@ -27,6 +29,7 @@ package org.apache.samza.coordinator;
  *   - Latch
  *   - barrier for version upgrades
  */
[email protected]
 public interface CoordinationUtils {
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java 
b/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java
index c6c8bbb..c624f83 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java
@@ -32,11 +32,16 @@ import org.apache.samza.annotation.InterfaceStability;
 @InterfaceStability.Evolving
 public interface LeaderElector {
   /**
-   * Async method that helps the caller participate in leader election.
+   * Register a LeaderElectorListener
    *
-   * @param leaderElectorListener to be invoked if the caller is chosen as a 
leader through the leader election process
+   * @param listener {@link LeaderElectorListener} interfaces to be invoked 
upon completion of leader election participation
    */
-  void tryBecomeLeader(LeaderElectorListener leaderElectorListener);
+  void setLeaderElectorListener(LeaderElectorListener listener);
+
+  /**
+   * Async method that helps the caller participate in leader election.
+   **/
+  void tryBecomeLeader();
 
   /**
    * Method that allows a caller to resign from leadership role. Caller can 
resign from leadership due to various

http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 9fed202..b1f0aba 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -35,6 +35,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.Latch;
+import org.apache.samza.coordinator.LeaderElector;
 import org.apache.samza.execution.ExecutionPlan;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.processor.StreamProcessor;
@@ -48,7 +49,6 @@ import org.apache.samza.zk.ZkJobCoordinatorFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * This class implements the {@link ApplicationRunner} that runs the 
applications in standalone environment
  */
@@ -213,10 +213,12 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
     if (!intStreams.isEmpty()) {
       if (coordinationUtils != null) {
         Latch initLatch = coordinationUtils.getLatch(1, INIT_LATCH_ID);
-        coordinationUtils.getLeaderElector().tryBecomeLeader(() -> {
+        LeaderElector leaderElector = coordinationUtils.getLeaderElector();
+        leaderElector.setLeaderElectorListener(() -> {
             getStreamManager().createStreams(intStreams);
             initLatch.countDown();
           });
+        leaderElector.tryBecomeLeader();
         initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES);
       } else {
         // each application process will try creating the streams, which

http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java 
b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index 289d900..21572f5 100644
--- 
a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++ 
b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -39,13 +39,9 @@ import org.slf4j.LoggerFactory;
  * ZK based standalone app.
  */
 public class ScheduleAfterDebounceTime {
-  public static final Logger LOG = 
LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
   public static final long TIMEOUT_MS = 1000 * 10; // timeout to wait for a 
task to complete
 
-  // Names of actions.
-  // When the same action is scheduled it needs to cancel the previous one.
-  // To accomplish that we keep the previous future in a map, keyed by the 
action name.
-
   // Here we predefine some actions which are used in the ZK based standalone 
app.
   // Action name when the JobModel version changes
   public static final String JOB_MODEL_VERSION_CHANGE = 
"JobModelVersionChange";
@@ -56,28 +52,28 @@ public class ScheduleAfterDebounceTime {
   public static final int DEBOUNCE_TIME_MS = 2000;
 
   private final ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(
-      new 
ThreadFactoryBuilder().setNameFormat("zk-debounce-thread-%d").setDaemon(true).build());
+      new 
ThreadFactoryBuilder().setNameFormat("debounce-thread-%d").setDaemon(true).build());
   private final Map<String, ScheduledFuture> futureHandles = new HashMap<>();
 
   synchronized public void scheduleAfterDebounceTime(String actionName, long 
debounceTimeMs, Runnable runnable) {
     // check if this action has been scheduled already
     ScheduledFuture sf = futureHandles.get(actionName);
     if (sf != null && !sf.isDone()) {
-      LOG.info("DEBOUNCE: cancel future for " + actionName);
+      LOGGER.info("cancel future for " + actionName);
       // attempt to cancel
       if (!sf.cancel(false)) {
         try {
           sf.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
         } catch (Exception e) {
           // we ignore the exception
-          LOG.warn("cancel for action " + actionName + " failed with ", e);
+          LOGGER.warn("cancel for action " + actionName + " failed with ", e);
         }
       }
       futureHandles.remove(actionName);
     }
     // schedule a new task
     sf = scheduledExecutorService.schedule(runnable, debounceTimeMs, 
TimeUnit.MILLISECONDS);
-    LOG.info("DEBOUNCE: scheduled " + actionName + " in " + debounceTimeMs);
+    LOGGER.info("DEBOUNCE: scheduled " + actionName + " in " + debounceTimeMs);
     futureHandles.put(actionName, sf);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
index 20de43c..c7bfc1d 100644
--- 
a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
+++ 
b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory;
  */
 public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
   private final ZkUtils zkUtils;
-  private final ZkKeyBuilder keyBuilder;
   private final static String BARRIER_DONE = "done";
   private final static String BARRIER_TIMED_OUT = "TIMED_OUT";
   private final static Logger LOG = 
LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class);
@@ -49,31 +48,21 @@ public class ZkBarrierForVersionUpgrade implements 
BarrierForVersionUpgrade {
   private final ScheduleAfterDebounceTime debounceTimer;
 
   private final String barrierPrefix;
-  private String barrierPath;
   private String barrierDonePath;
   private String barrierProcessors;
   private static final String VERSION_UPGRADE_TIMEOUT_TIMER = 
"VersionUpgradeTimeout";
   private final long barrierTimeoutMS;
 
   public ZkBarrierForVersionUpgrade(String barrierId, ZkUtils zkUtils, 
ScheduleAfterDebounceTime debounceTimer, long barrierTimeoutMS) {
+    if (zkUtils == null) {
+      throw new RuntimeException("Cannot operate ZkBarrierForVersionUpgrade 
without ZkUtils.");
+    }
     this.zkUtils = zkUtils;
-    keyBuilder = zkUtils.getKeyBuilder();
-
-    barrierPrefix = keyBuilder.getJobModelVersionBarrierPrefix(barrierId);
-
+    barrierPrefix = 
zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(barrierId);
     this.debounceTimer = debounceTimer;
     this.barrierTimeoutMS = barrierTimeoutMS;
   }
 
-  /**
-   * set the barrier for the timer. If the timer is not achieved by the 
timeout - it will fail
-   * @param version for which the barrier is created
-   * @param timeout - time in ms to wait
-   */
-  private void setTimer(final String version, final long timeout, final Stat 
currentStatOfBarrierDone) {
-    debounceTimer.scheduleAfterDebounceTime(VERSION_UPGRADE_TIMEOUT_TIMER, 
timeout, ()->timerOff(version, currentStatOfBarrierDone));
-  }
-
   protected long getBarrierTimeOutMs() {
     return barrierTimeoutMS;
   }
@@ -84,7 +73,6 @@ public class ZkBarrierForVersionUpgrade implements 
BarrierForVersionUpgrade {
       zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_TIMED_OUT, 
currentStatOfBarrierDone.getVersion());
     } catch (ZkBadVersionException e) {
       // Expected. failed to write, make sure the value is "DONE"
-      ///LOG.("Barrier timeout write failed");
       String done = zkUtils.getZkClient().<String>readData(barrierDonePath);
       LOG.info("Barrier timeout expired, but done=" + done);
       if (!done.equals(BARRIER_DONE)) {
@@ -94,7 +82,7 @@ public class ZkBarrierForVersionUpgrade implements 
BarrierForVersionUpgrade {
   }
 
   private void setPaths(String version) {
-    barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
+    String barrierPath = String.format("%s/barrier_%s", barrierPrefix, 
version);
     barrierDonePath = String.format("%s/barrier_done", barrierPath);
     barrierProcessors = String.format("%s/barrier_processors", barrierPath);
 
@@ -103,7 +91,6 @@ public class ZkBarrierForVersionUpgrade implements 
BarrierForVersionUpgrade {
 
   @Override
   public void start(String version, List<String> participants) {
-
     setPaths(version);
 
     // subscribe for processor's list changes
@@ -114,7 +101,10 @@ public class ZkBarrierForVersionUpgrade implements 
BarrierForVersionUpgrade {
     Stat currentStatOfBarrierDone = new Stat();
     zkUtils.getZkClient().readData(barrierDonePath, currentStatOfBarrierDone);
 
-    setTimer(version, getBarrierTimeOutMs(), currentStatOfBarrierDone);
+    debounceTimer.scheduleAfterDebounceTime(
+        VERSION_UPGRADE_TIMEOUT_TIMER,
+        getBarrierTimeOutMs(),
+        () -> timerOff(version, currentStatOfBarrierDone));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
index 20c62cf..de2e473 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
@@ -22,11 +22,18 @@ package org.apache.samza.zk;
 
 /**
  * Api to the functionality provided by ZK
+ *
+ * Api for JC to ZK communication
  */
 public interface ZkController {
   void register();
   boolean isLeader();
-  void notifyJobModelChange(String version);
   void stop();
-  void listenToProcessorLiveness();
+
+  // Leader
+  /**
+   * Allows the {@link ZkJobCoordinator} to subscribe to changes to Zk nodes 
in the processors subtree
+   * Typically, the leader is interested in such notifications.
+   */
+  void subscribeToProcessorChange();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
index b6e3aed..52bfef1 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
@@ -22,7 +22,7 @@ package org.apache.samza.zk;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.apache.samza.SamzaException;
-import org.apache.samza.coordinator.LeaderElectorListener;
+import org.apache.samza.coordinator.LeaderElector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,16 +35,14 @@ public class ZkControllerImpl implements ZkController {
   private final String processorIdStr;
   private final ZkUtils zkUtils;
   private final ZkControllerListener zkControllerListener;
-  private final ZkLeaderElector leaderElector;
-  private final ScheduleAfterDebounceTime debounceTimer;
+  private final LeaderElector zkLeaderElector;
 
-  public ZkControllerImpl(String processorIdStr, ZkUtils zkUtils, 
ScheduleAfterDebounceTime debounceTimer,
-      ZkControllerListener zkControllerListener) {
+  public ZkControllerImpl(String processorIdStr, ZkUtils zkUtils,
+      ZkControllerListener zkControllerListener, LeaderElector 
zkLeaderElector) {
     this.processorIdStr = processorIdStr;
     this.zkUtils = zkUtils;
     this.zkControllerListener = zkControllerListener;
-    this.leaderElector = new ZkLeaderElector(processorIdStr, zkUtils, 
debounceTimer);
-    this.debounceTimer = debounceTimer;
+    this.zkLeaderElector = zkLeaderElector;
 
     init();
   }
@@ -62,49 +60,32 @@ public class ZkControllerImpl implements ZkController {
   public void register() {
     // TODO - make a loop here with some number of attempts.
     // possibly split into two method - becomeLeader() and becomeParticipant()
-    leaderElector.tryBecomeLeader(new LeaderElectorListener() {
-      @Override
-      public void onBecomingLeader() {
-        listenToProcessorLiveness();
-
-        // inform the caller
-        zkControllerListener.onBecomeLeader();
-      }
-    });
+    zkLeaderElector.tryBecomeLeader();
 
     // subscribe to JobModel version updates
-    zkUtils.subscribeToJobModelVersionChange(new 
ZkJobModelVersionChangeHandler(debounceTimer));
+    zkUtils.subscribeToJobModelVersionChange(new 
ZkJobModelVersionChangeHandler());
   }
 
   @Override
   public boolean isLeader() {
-    return leaderElector.amILeader();
-  }
-
-  @Override
-  public void notifyJobModelChange(String version) {
-    zkControllerListener.onNewJobModelAvailable(version);
+    return zkLeaderElector.amILeader();
   }
 
   @Override
   public void stop() {
     if (isLeader()) {
-      leaderElector.resignLeadership();
+      zkLeaderElector.resignLeadership();
     }
     zkUtils.close();
   }
 
   @Override
-  public void listenToProcessorLiveness() {
-    zkUtils.subscribeToProcessorChange(new 
ZkProcessorChangeHandler(debounceTimer));
+  public void subscribeToProcessorChange() {
+    zkUtils.subscribeToProcessorChange(new ProcessorChangeHandler());
   }
 
   // Only by Leader
-  class ZkProcessorChangeHandler  implements IZkChildListener {
-    private final ScheduleAfterDebounceTime debounceTimer;
-    public ZkProcessorChangeHandler(ScheduleAfterDebounceTime debounceTimer) {
-      this.debounceTimer = debounceTimer;
-    }
+  class ProcessorChangeHandler implements IZkChildListener {
     /**
      * Called when the children of the given path changed.
      *
@@ -115,18 +96,13 @@ public class ZkControllerImpl implements ZkController {
     @Override
     public void handleChildChange(String parentPath, List<String> 
currentChildren) throws Exception {
       LOG.info(
-          "ZkControllerImpl::ZkProcessorChangeHandler::handleChildChange - 
Path: " + parentPath + "  Current Children: "
+          "ZkControllerImpl::ProcessorChangeHandler::handleChildChange - Path: 
" + parentPath + "  Current Children: "
               + currentChildren);
-      
debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
-          ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> 
zkControllerListener.onProcessorChange(currentChildren));
+      zkControllerListener.onProcessorChange(currentChildren);
     }
   }
 
   class ZkJobModelVersionChangeHandler implements IZkDataListener {
-    private final ScheduleAfterDebounceTime debounceTimer;
-    public ZkJobModelVersionChangeHandler(ScheduleAfterDebounceTime 
debounceTimer) {
-      this.debounceTimer = debounceTimer;
-    }
     /**
      * called when job model version gets updated
      * @param dataPath
@@ -136,22 +112,13 @@ public class ZkControllerImpl implements ZkController {
     @Override
     public void handleDataChange(String dataPath, Object data) throws 
Exception {
       LOG.info("pid=" + processorIdStr + ". Got notification on version update 
change. path=" + dataPath + "; data="
-          + (String) data);
-
-      debounceTimer
-          
.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 
0, () -> notifyJobModelChange((String) data));
+          + data);
+      zkControllerListener.onNewJobModelAvailable((String) data);
     }
+
     @Override
     public void handleDataDeleted(String dataPath) throws Exception {
       throw new SamzaException("version update path has been deleted!");
     }
   }
-
-  public void shutdown() {
-    if (debounceTimer != null)
-      debounceTimer.stopScheduler();
-
-    if (zkUtils != null)
-      zkUtils.close();
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java
index f7fedd7..af4d56c 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java
@@ -21,12 +21,15 @@ package org.apache.samza.zk;
 
 import java.util.List;
 
-
 /**
- * callbacks to the caller of the ZkController
+ * Interface to listen for notifications from the {@link ZkController}
  */
 public interface ZkControllerListener {
-  void onBecomeLeader();
+  /**
+   * ZkController observes the ZkTree for changes to group membership of 
processors and notifies the listener
+   *
+   * @param processorIds List of current znodes that are in the processing 
group
+   */
   void onProcessorChange(List<String> processorIds);
 
   void onNewJobModelAvailable(String version); // start job model update (stop 
current work)

http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
 
b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
index 6ddc3fe..07da147 100644
--- 
a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
+++ 
b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
@@ -24,15 +24,13 @@ import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.CoordinationServiceFactory;
 
-
 public class ZkCoordinationServiceFactory implements 
CoordinationServiceFactory {
   // TODO - Why should this method be synchronized?
   synchronized public CoordinationUtils getCoordinationService(String groupId, 
String participantId, Config config) {
     ZkConfig zkConfig = new ZkConfig(config);
     ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), 
zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
     ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, 
zkConfig.getZkConnectionTimeoutMs());
-    ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
-    return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, 
debounceTimer);
+    return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, new 
ScheduleAfterDebounceTime());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
index 8f0b6d2..5a6c88a 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
@@ -18,7 +18,6 @@
  */
 package org.apache.samza.zk;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.BarrierForVersionUpgrade;
 import org.apache.samza.coordinator.CoordinationUtils;
@@ -47,7 +46,7 @@ public class ZkCoordinationUtils implements CoordinationUtils 
{
 
   @Override
   public LeaderElector getLeaderElector() {
-    return new ZkLeaderElector(processorIdStr, zkUtils, debounceTimer);
+    return new ZkLeaderElector(processorIdStr, zkUtils);
   }
 
   @Override
@@ -59,7 +58,8 @@ public class ZkCoordinationUtils implements CoordinationUtils 
{
   public BarrierForVersionUpgrade getBarrier(String barrierId) {
     return new ZkBarrierForVersionUpgrade(barrierId, zkUtils, debounceTimer, 
zkConfig.getZkBarrierTimeoutMs());
   }
-  @VisibleForTesting
+
+  // TODO - SAMZA-1128 CoordinationService should directly depende on ZkUtils 
and DebounceTimer
   public ZkUtils getZkUtils() {
     return zkUtils;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 08f779f..0ac9e8e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -18,12 +18,6 @@
  */
 package org.apache.samza.zk;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
@@ -33,6 +27,8 @@ import org.apache.samza.coordinator.BarrierForVersionUpgrade;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.LeaderElector;
+import org.apache.samza.coordinator.LeaderElectorListener;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.runtime.ProcessorIdGenerator;
@@ -43,6 +39,13 @@ import org.apache.samza.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 /**
  * JobCoordinator for stand alone processor managed via Zookeeper.
  */
@@ -61,15 +64,18 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
   private JobCoordinatorListener coordinatorListener = null;
   private JobModel newJobModel;
 
-  public ZkJobCoordinator(Config config, ScheduleAfterDebounceTime 
debounceTimer) {
-    this.debounceTimer = debounceTimer;
+  public ZkJobCoordinator(Config config) {
+    this.debounceTimer = new ScheduleAfterDebounceTime();
     this.config = config;
     this.processorId = createProcessorId(config);
     this.coordinationUtils = new ZkCoordinationServiceFactory()
         .getCoordinationService(new 
ApplicationConfig(config).getGlobalAppId(), String.valueOf(processorId), 
config);
     this.zkUtils = ((ZkCoordinationUtils) coordinationUtils).getZkUtils();
-    this.zkController = new ZkControllerImpl(processorId, zkUtils, 
debounceTimer, this);
-    this.streamMetadataCache = getStreamMetadataCache();
+    LeaderElector leaderElector = new ZkLeaderElector(this.processorId, 
zkUtils);
+    leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
+
+    this.zkController = new ZkControllerImpl(processorId, zkUtils, this, 
leaderElector);
+    streamMetadataCache = getStreamMetadataCache();
   }
 
   private StreamMetadataCache getStreamMetadataCache() {
@@ -100,6 +106,7 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
     if (coordinatorListener != null) {
       coordinatorListener.onJobModelExpired();
     }
+    debounceTimer.stopScheduler();
     zkController.stop();
     if (coordinatorListener != null) {
       coordinatorListener.onCoordinatorStop();
@@ -123,19 +130,13 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
 
   //////////////////////////////////////////////// LEADER stuff 
///////////////////////////
   @Override
-  public void onBecomeLeader() {
-    log.info("ZkJobCoordinator::onBecomeLeader - I became the leader!");
-
-    List<String> emptyList = new ArrayList<>();
-
-    // actual actions to do are the same as onProcessorChange()
+  public void onProcessorChange(List<String> processors) {
+    log.info("ZkJobCoordinator::onProcessorChange - list of processors 
changed! List size=" + processors.size());
     
debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
-        ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> 
onProcessorChange(emptyList));
+        ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> 
doOnProcessorChange(processors));
   }
 
-  @Override
-  public void onProcessorChange(List<String> processors) {
-    log.info("ZkJobCoordinator::onProcessorChange - list of processors 
changed! List size=" + processors.size());
+  public void doOnProcessorChange(List<String> processors) {
     // if list of processors is empty - it means we are called from 
'onBecomeLeader'
     generateNewJobModel(processors);
     if (coordinatorListener != null) {
@@ -145,26 +146,24 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
 
   @Override
   public void onNewJobModelAvailable(final String version) {
-    log.info("pid=" + processorId + "new JobModel available");
-    // stop current work
-    if (coordinatorListener != null) {
-      coordinatorListener.onJobModelExpired();
-    }
-    log.info("pid=" + processorId + "new JobModel available.Container 
stopped.");
-    // get the new job model
-    newJobModel = zkUtils.getJobModel(version);
+    
debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE,
 0, () ->
+      {
+        log.info("pid=" + processorId + "new JobModel available");
+        // stop current work
+        if (coordinatorListener != null) {
+          coordinatorListener.onJobModelExpired();
+        }
+        log.info("pid=" + processorId + "new JobModel available.Container 
stopped.");
+        // get the new job model
+        newJobModel = zkUtils.getJobModel(version);
 
-    log.info("pid=" + processorId + ": new JobModel available. ver=" + version 
+ "; jm = " + newJobModel);
+        log.info("pid=" + processorId + ": new JobModel available. ver=" + 
version + "; jm = " + newJobModel);
 
-    // update ZK and wait for all the processors to get this new version
-    ZkBarrierForVersionUpgrade barrier = (ZkBarrierForVersionUpgrade) 
coordinationUtils.getBarrier(
-        JOB_MODEL_UPGRADE_BARRIER);
-    barrier.waitForBarrier(version, processorId, new Runnable() {
-      @Override
-      public void run() {
-        onNewJobModelConfirmed(version);
-      }
-    });
+        // update ZK and wait for all the processors to get this new version
+        ZkBarrierForVersionUpgrade barrier = (ZkBarrierForVersionUpgrade) 
coordinationUtils.getBarrier(
+            JOB_MODEL_UPGRADE_BARRIER);
+        barrier.waitForBarrier(version, processorId, () -> 
onNewJobModelConfirmed(version));
+      });
   }
 
   @Override
@@ -245,4 +244,18 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
     zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
     log.info("pid=" + processorId + "published new JobModel ver=" + 
nextJMVersion);
   }
+
+  class LeaderElectorListenerImpl implements LeaderElectorListener {
+    @Override
+    public void onBecomingLeader() {
+      log.info("ZkJobCoordinator::onBecomeLeader - I became the leader!");
+      zkController.subscribeToProcessorChange();
+      debounceTimer.scheduleAfterDebounceTime(
+        ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
+        ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> {
+          // actual actions to do are the same as onProcessorChange()
+          doOnProcessorChange(new ArrayList<>());
+        });
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index e02e504..d2e0d14 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -32,8 +32,6 @@ public class ZkJobCoordinatorFactory implements 
JobCoordinatorFactory {
    */
   @Override
   public JobCoordinator getJobCoordinator(Config config) {
-    ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
-
-    return new ZkJobCoordinator(config, debounceTimer);
+    return new ZkJobCoordinator(config);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index 4ffe3e4..8caa5c6 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -49,28 +49,34 @@ public class ZkLeaderElector implements LeaderElector {
   private final String processorIdStr;
   private final ZkKeyBuilder keyBuilder;
   private final String hostName;
-  private final ScheduleAfterDebounceTime debounceTimer;
 
   private AtomicBoolean isLeader = new AtomicBoolean(false);
-  private IZkDataListener previousProcessorChangeListener;
+  private final IZkDataListener previousProcessorChangeListener;
+  private LeaderElectorListener leaderElectorListener = null;
   private String currentSubscription = null;
   private final Random random = new Random();
 
-  @VisibleForTesting
-  public void setPreviousProcessorChangeListener(IZkDataListener 
previousProcessorChangeListener) {
-    this.previousProcessorChangeListener = previousProcessorChangeListener;
-  }
-
-  public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils,  
ScheduleAfterDebounceTime debounceTimer) {
+  public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils) {
     this.processorIdStr = processorIdStr;
     this.zkUtils = zkUtils;
     this.keyBuilder = zkUtils.getKeyBuilder();
     this.hostName = getHostName();
-    this.debounceTimer = (debounceTimer != null) ? debounceTimer : new 
ScheduleAfterDebounceTime();
+    this.previousProcessorChangeListener = new 
PreviousProcessorChangeListener();
 
     zkUtils.makeSurePersistentPathsExists(new 
String[]{keyBuilder.getProcessorsPath()});
   }
 
+  @VisibleForTesting
+  public ZkLeaderElector(String processorIdStr,
+                         ZkUtils zkUtils,
+                         IZkDataListener previousProcessorChangeListener) {
+    this.processorIdStr = processorIdStr;
+    this.zkUtils = zkUtils;
+    this.keyBuilder = zkUtils.getKeyBuilder();
+    this.hostName = getHostName();
+    this.previousProcessorChangeListener = previousProcessorChangeListener;
+  }
+
   // TODO: This should go away once we integrate with Zk based Job Coordinator
   private String getHostName() {
     try {
@@ -81,8 +87,21 @@ public class ZkLeaderElector implements LeaderElector {
     }
   }
 
+  /**
+   * Register a LeaderElectorListener
+   *
+   * @param listener {@link LeaderElectorListener} interfaces to be invoked 
upon completion of leader election participation
+   */
+  @Override
+  public void setLeaderElectorListener(LeaderElectorListener listener) {
+    this.leaderElectorListener = listener;
+  }
+
+  /**
+   * Async method that helps the caller participate in leader election.
+   **/
   @Override
-  public void tryBecomeLeader(LeaderElectorListener leaderElectorListener) {
+  public void tryBecomeLeader() {
     String currentPath = zkUtils.registerProcessorAndGetId(new 
ProcessorData(hostName, processorIdStr));
 
     List<String> children = zkUtils.getSortedActiveProcessorsZnodes();
@@ -97,7 +116,9 @@ public class ZkLeaderElector implements LeaderElector {
     if (index == 0) {
       isLeader.getAndSet(true);
       LOG.info(zLog("Eligible to become the leader!"));
-      debounceTimer.scheduleAfterDebounceTime("ON_BECOMING_LEADER", 1, () -> 
leaderElectorListener.onBecomingLeader()); // inform the caller
+      if (leaderElectorListener != null) {
+        leaderElectorListener.onBecomingLeader();
+      }
       return;
     }
 
@@ -105,17 +126,12 @@ public class ZkLeaderElector implements LeaderElector {
     LOG.info("Index = " + index + " Not eligible to be a leader yet!");
     String predecessor = children.get(index - 1);
     if (!predecessor.equals(currentSubscription)) {
-
       if (currentSubscription != null) {
         LOG.debug(zLog("Unsubscribing data change for " + 
currentSubscription));
         zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + 
currentSubscription,
             previousProcessorChangeListener);
       }
       currentSubscription = predecessor;
-      // callback in case if the previous node gets deleted (when previous 
processor dies)
-      if (previousProcessorChangeListener == null)
-        previousProcessorChangeListener =  new 
PreviousProcessorChangeListener(leaderElectorListener);
-
       LOG.info(zLog("Subscribing data change for " + predecessor));
       zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + 
currentSubscription,
           previousProcessorChangeListener);
@@ -135,7 +151,7 @@ public class ZkLeaderElector implements LeaderElector {
         Thread.interrupted();
       }
       LOG.info(zLog("Predecessor doesn't exist anymore. Trying to become 
leader again..."));
-      tryBecomeLeader(leaderElectorListener);
+      tryBecomeLeader();
     }
   }
 
@@ -155,11 +171,6 @@ public class ZkLeaderElector implements LeaderElector {
 
   // Only by non-leaders
   class PreviousProcessorChangeListener implements IZkDataListener {
-    private final LeaderElectorListener leaderElectorListener;
-    PreviousProcessorChangeListener(LeaderElectorListener 
leaderElectorListener) {
-      this.leaderElectorListener = leaderElectorListener;
-    }
-
     @Override
     public void handleDataChange(String dataPath, Object data) throws 
Exception {
       LOG.debug("Data change on path: " + dataPath + " Data: " + data);
@@ -169,7 +180,7 @@ public class ZkLeaderElector implements LeaderElector {
     public void handleDataDeleted(String dataPath) throws Exception {
       LOG.info(
           zLog("Data deleted on path " + dataPath + ". Predecessor went away. 
So, trying to become leader again..."));
-      tryBecomeLeader(leaderElectorListener);
+      tryBecomeLeader();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index b8522b9..385a060 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -19,7 +19,6 @@
 
 package org.apache.samza.job.local
 
-
 import org.apache.samza.config.Config
 import org.apache.samza.config.JobConfig._
 import org.apache.samza.config.ShellCommandConfig._

http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 2d2bf16..9d15211 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -153,8 +153,15 @@ public class TestLocalApplicationRunner {
 
     CoordinationUtils coordinationUtils = mock(CoordinationUtils.class);
     LeaderElector leaderElector = new LeaderElector() {
+      private LeaderElectorListener leaderElectorListener;
+
       @Override
-      public void tryBecomeLeader(LeaderElectorListener leaderElectorListener) 
{
+      public void setLeaderElectorListener(LeaderElectorListener listener) {
+        this.leaderElectorListener = listener;
+      }
+
+      @Override
+      public void tryBecomeLeader() {
         leaderElectorListener.onBecomingLeader();
       }
 
@@ -166,6 +173,7 @@ public class TestLocalApplicationRunner {
         return false;
       }
     };
+
     Latch latch = new Latch() {
       boolean done = false;
       @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
 
b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
index 23a8cc1..cd396ad 100644
--- 
a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
+++ 
b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
@@ -20,79 +20,72 @@
 package org.apache.samza.zk;
 
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 public class TestScheduleAfterDebounceTime {
-  private static final long DEBOUNCE_TIME = 500;
-  int i = 0;
-  @Before
-  public void setup() {
-
-  }
+  private static final long WAIT_TIME = 500;
 
   class TestObj {
+    private volatile int i = 0;
     public void inc() {
       i++;
     }
     public void setTo(int val) {
       i = val;
     }
-    public void doNothing() {
-
+    public int get() {
+      return i;
     }
   }
+
   @Test
-  public void testSchedule() {
-    ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+  public void testSchedule() throws InterruptedException {
+    ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
+    final CountDownLatch latch = new CountDownLatch(1);
 
     final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
-    debounceTimer.scheduleAfterDebounceTime("TEST1", DEBOUNCE_TIME, () -> {
+    scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME, () -> {
         testObj.inc();
+        latch.countDown();
       });
     // action is delayed
-    Assert.assertEquals(0, i);
+    Assert.assertEquals(0, testObj.get());
 
-    TestZkUtils.sleepMs(DEBOUNCE_TIME + 10);
+    boolean result = latch.await(WAIT_TIME * 2, TimeUnit.MILLISECONDS);
+    Assert.assertTrue("Latch timed-out and task was not scheduled on time.", 
result);
+    Assert.assertEquals(1, testObj.get());
 
-    // debounce time passed
-    Assert.assertEquals(1, i);
+    scheduledQueue.stopScheduler();
   }
 
   @Test
-  public void testCancelAndSchedule() {
-    ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+  public void testCancelAndSchedule() throws InterruptedException {
+    ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
+    final CountDownLatch test1Latch = new CountDownLatch(1);
 
     final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
-    debounceTimer.scheduleAfterDebounceTime("TEST1", DEBOUNCE_TIME, () ->
-      {
-        testObj.inc();
-      }
-    );
-    Assert.assertEquals(0, i);
+    scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME, testObj::inc);
 
     // next schedule should cancel the previous one with the same name
-    debounceTimer.scheduleAfterDebounceTime("TEST1", 2 * DEBOUNCE_TIME, () ->
+    scheduledQueue.scheduleAfterDebounceTime("TEST1", 2 * WAIT_TIME, () ->
       {
-        testObj.setTo(100);
+        testObj.inc();
+        test1Latch.countDown();
       }
     );
 
-    TestZkUtils.sleepMs(DEBOUNCE_TIME + 10);
-
-    // still should be the old value
-    Assert.assertEquals(0, i);
-
+    final TestObj testObj2 = new TestScheduleAfterDebounceTime.TestObj();
     // this schedule should not cancel the previous one, because it has 
different name
-    debounceTimer.scheduleAfterDebounceTime("TEST2", DEBOUNCE_TIME, () ->
-      {
-        testObj.doNothing();
-      }
-    );
+    scheduledQueue.scheduleAfterDebounceTime("TEST2", WAIT_TIME, 
testObj2::inc);
 
-    TestZkUtils.sleepMs(3 * DEBOUNCE_TIME + 10);
+    boolean result = test1Latch.await(4 * WAIT_TIME, TimeUnit.MILLISECONDS);
+    Assert.assertTrue("Latch timed-out. Scheduled tasks were not run 
correctly.", result);
+    Assert.assertEquals(1, testObj.get());
+    Assert.assertEquals(1, testObj2.get());
 
-    Assert.assertEquals(100, i);
+    scheduledQueue.stopScheduler();
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index 5aaee2a..48dca9a 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -22,7 +22,6 @@ import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.apache.samza.SamzaException;
-import org.apache.samza.coordinator.LeaderElectorListener;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -63,7 +62,7 @@ public class TestZkLeaderElector {
   public void testSetup() {
     testZkConnectionString = "127.0.0.1:" + zkServer.getPort();
     try {
-      testZkUtils = getZkUtilsWithNewClient("testProcessorId");
+      testZkUtils = getZkUtilsWithNewClient();
     } catch (Exception e) {
       Assert.fail("Client connection setup failed. Aborting tests..");
     }
@@ -109,13 +108,9 @@ public class TestZkLeaderElector {
 
     ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils, 
null);
     BooleanResult isLeader = new BooleanResult();
+    leaderElector.setLeaderElectorListener(() -> isLeader.res = true);
 
-    leaderElector.tryBecomeLeader(new LeaderElectorListener() {
-      @Override
-      public void onBecomingLeader() {
-        isLeader.res = true;
-      }
-    });
+    leaderElector.tryBecomeLeader();
     Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader.res, 2, 
100));
   }
 
@@ -131,13 +126,9 @@ public class TestZkLeaderElector {
     when(mockZkUtils.getKeyBuilder()).thenReturn(kb);
 
     ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, 
mockZkUtils, null);
-
+    leaderElector.setLeaderElectorListener(() -> { });
     try {
-      leaderElector.tryBecomeLeader(new LeaderElectorListener() {
-        @Override
-        public void onBecomingLeader() {
-        }
-      });
+      leaderElector.tryBecomeLeader();
       Assert.fail("Was expecting leader election to fail!");
     } catch (SamzaException e) {
       // No-op Expected
@@ -155,38 +146,25 @@ public class TestZkLeaderElector {
 
 
     // Processor-1
-    ZkUtils zkUtils1 = getZkUtilsWithNewClient("1");
+    ZkUtils zkUtils1 = getZkUtilsWithNewClient();
     ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1, null);
+    leaderElector1.setLeaderElectorListener(() -> isLeader1.res = true);
 
     // Processor-2
-    ZkUtils zkUtils2 = getZkUtilsWithNewClient("2");
+    ZkUtils zkUtils2 = getZkUtilsWithNewClient();
     ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2, null);
-
+    leaderElector2.setLeaderElectorListener(() -> isLeader2.res = true);
 
     // Processor-3
-    ZkUtils zkUtils3 = getZkUtilsWithNewClient("3");
+    ZkUtils zkUtils3 = getZkUtilsWithNewClient();
     ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3, null);
+    leaderElector3.setLeaderElectorListener(() -> isLeader3.res = true);
 
     Assert.assertEquals(0, 
testZkUtils.getSortedActiveProcessorsZnodes().size());
 
-    leaderElector1.tryBecomeLeader(new LeaderElectorListener() {
-      @Override
-      public void onBecomingLeader() {
-        isLeader1.res = true;
-      }
-    });
-    leaderElector2.tryBecomeLeader(new LeaderElectorListener() {
-      @Override
-      public void onBecomingLeader() {
-        isLeader2.res = true;
-      }
-    });
-    leaderElector3.tryBecomeLeader(new LeaderElectorListener() {
-      @Override
-      public void onBecomingLeader() {
-        isLeader3.res = true;
-      }
-    });
+    leaderElector1.tryBecomeLeader();
+    leaderElector2.tryBecomeLeader();
+    leaderElector3.tryBecomeLeader();
 
     Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader1.res, 2, 
100));
     Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 
2, 100));
@@ -222,11 +200,9 @@ public class TestZkLeaderElector {
 
 
     // Processor-1
-    ZkUtils zkUtils1 = getZkUtilsWithNewClient("processor1");
+    ZkUtils zkUtils1 = getZkUtilsWithNewClient();
     zkUtils1.registerProcessorAndGetId(new ProcessorData("processor1", "1"));
-    ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", 
zkUtils1, null);
-
-    leaderElector1.setPreviousProcessorChangeListener(new IZkDataListener() {
+    ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", 
zkUtils1, new IZkDataListener() {
       @Override
       public void handleDataChange(String dataPath, Object data)
           throws Exception {
@@ -238,13 +214,12 @@ public class TestZkLeaderElector {
         count.incrementAndGet();
       }
     });
+    leaderElector1.setLeaderElectorListener(() -> isLeader1.res = true);
 
     // Processor-2
-    ZkUtils zkUtils2 = getZkUtilsWithNewClient("processor2");
+    ZkUtils zkUtils2 = getZkUtilsWithNewClient();
     final String path2 = zkUtils2.registerProcessorAndGetId(new 
ProcessorData("processor2", "2"));
-    ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", 
zkUtils2, null);
-
-    leaderElector2.setPreviousProcessorChangeListener(new IZkDataListener() {
+    ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", 
zkUtils2, new IZkDataListener() {
       @Override
       public void handleDataChange(String dataPath, Object data)
           throws Exception {
@@ -270,13 +245,12 @@ public class TestZkLeaderElector {
         electionLatch.countDown();
       }
     });
+    leaderElector2.setLeaderElectorListener(() -> isLeader2.res = true);
 
     // Processor-3
-    ZkUtils zkUtils3  = getZkUtilsWithNewClient("processor3");
+    ZkUtils zkUtils3  = getZkUtilsWithNewClient();
     zkUtils3.registerProcessorAndGetId(new ProcessorData("processor3", "3"));
-    ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", 
zkUtils3, null);
-
-    leaderElector3.setPreviousProcessorChangeListener(new IZkDataListener() {
+    ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", 
zkUtils3, new IZkDataListener() {
       @Override
       public void handleDataChange(String dataPath, Object data)
           throws Exception {
@@ -288,26 +262,12 @@ public class TestZkLeaderElector {
         count.incrementAndGet();
       }
     });
+    leaderElector3.setLeaderElectorListener(() -> isLeader3.res = true);
 
     // Join Leader Election
-    leaderElector1.tryBecomeLeader(new LeaderElectorListener() {
-      @Override
-      public void onBecomingLeader() {
-        isLeader1.res = true;
-      }
-    });
-    leaderElector2.tryBecomeLeader(new LeaderElectorListener() {
-      @Override
-      public void onBecomingLeader() {
-        isLeader2.res = true;
-      }
-    });
-    leaderElector3.tryBecomeLeader(new LeaderElectorListener() {
-      @Override
-      public void onBecomingLeader() {
-        isLeader3.res = true;
-      }
-    });
+    leaderElector1.tryBecomeLeader();
+    leaderElector2.tryBecomeLeader();
+    leaderElector3.tryBecomeLeader();
 
     Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader1.res, 2, 
100));
     Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 
2, 100));
@@ -357,55 +317,39 @@ public class TestZkLeaderElector {
     BooleanResult isLeader3 = new BooleanResult();
 
     // Processor-1
-    ZkUtils zkUtils1 = getZkUtilsWithNewClient("processor1");
+    ZkUtils zkUtils1 = getZkUtilsWithNewClient();
     zkUtils1.registerProcessorAndGetId(new ProcessorData("processor1", "1"));
-    ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", 
zkUtils1, null);
-
-    leaderElector1.setPreviousProcessorChangeListener(new IZkDataListener() {
-      @Override
-      public void handleDataChange(String dataPath, Object data)
-          throws Exception {
-
-      }
-
-      @Override
-      public void handleDataDeleted(String dataPath)
-          throws Exception {
-        count.incrementAndGet();
-      }
-    });
+    ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", 
zkUtils1, new IZkDataListener() {
+        @Override
+        public void handleDataChange(String dataPath, Object data) throws 
Exception { }
 
+        @Override
+        public void handleDataDeleted(String dataPath) throws Exception {
+          count.incrementAndGet();
+        }
+      });
+    leaderElector1.setLeaderElectorListener(() -> isLeader1.res = true);
 
     // Processor-2
-    ZkUtils zkUtils2 = getZkUtilsWithNewClient("processor2");
+    ZkUtils zkUtils2 = getZkUtilsWithNewClient();
     zkUtils2.registerProcessorAndGetId(new ProcessorData("processor2", "2"));
-    ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", 
zkUtils2, null);
-
-    leaderElector2.setPreviousProcessorChangeListener(new IZkDataListener() {
+    ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", 
zkUtils2, new IZkDataListener() {
       @Override
-      public void handleDataChange(String dataPath, Object data)
-          throws Exception {
-
-      }
+      public void handleDataChange(String dataPath, Object data) throws 
Exception { }
 
       @Override
-      public void handleDataDeleted(String dataPath)
-          throws Exception {
+      public void handleDataDeleted(String dataPath) throws Exception {
         count.incrementAndGet();
       }
     });
+    leaderElector2.setLeaderElectorListener(() -> isLeader2.res = true);
 
     // Processor-3
-    ZkUtils zkUtils3  = getZkUtilsWithNewClient("processor3");
+    ZkUtils zkUtils3  = getZkUtilsWithNewClient();
     final String path3 = zkUtils3.registerProcessorAndGetId(new 
ProcessorData("processor3", "3"));
-    ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", 
zkUtils3, null);
-
-    leaderElector3.setPreviousProcessorChangeListener(new IZkDataListener() {
+    ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", 
zkUtils3, new IZkDataListener() {
       @Override
-      public void handleDataChange(String dataPath, Object data)
-          throws Exception {
-
-      }
+      public void handleDataChange(String dataPath, Object data) throws 
Exception { }
 
       @Override
       public void handleDataDeleted(String dataPath)
@@ -427,26 +371,13 @@ public class TestZkLeaderElector {
         electionLatch.countDown();
       }
     });
+    leaderElector3.setLeaderElectorListener(() -> isLeader3.res = true);
 
     // Join Leader Election
-    leaderElector1.tryBecomeLeader(new LeaderElectorListener() {
-      @Override
-      public void onBecomingLeader() {
-        isLeader1.res = true;
-      }
-    });
-    leaderElector2.tryBecomeLeader(new LeaderElectorListener() {
-      @Override
-      public void onBecomingLeader() {
-        isLeader2.res = true;
-      }
-    });
-    leaderElector3.tryBecomeLeader(new LeaderElectorListener() {
-      @Override
-      public void onBecomingLeader() {
-        isLeader3.res = true;
-      }
-    });
+    leaderElector1.tryBecomeLeader();
+    leaderElector2.tryBecomeLeader();
+    leaderElector3.tryBecomeLeader();
+
     Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader1.res, 2, 
100));
     Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 
2, 100));
     Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 
2, 100));
@@ -477,29 +408,21 @@ public class TestZkLeaderElector {
     BooleanResult isLeader2 = new BooleanResult();
     // Processor-1
 
-    ZkUtils zkUtils1 = getZkUtilsWithNewClient("1");
+    ZkUtils zkUtils1 = getZkUtilsWithNewClient();
     ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1, null);
+    leaderElector1.setLeaderElectorListener(() -> isLeader1.res = true);
 
     // Processor-2
-    ZkUtils zkUtils2 = getZkUtilsWithNewClient("2");
+    ZkUtils zkUtils2 = getZkUtilsWithNewClient();
     ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2, null);
+    leaderElector2.setLeaderElectorListener(() -> isLeader2.res = true);
 
     // Before Leader Election
     Assert.assertFalse(leaderElector1.amILeader());
     Assert.assertFalse(leaderElector2.amILeader());
 
-    leaderElector1.tryBecomeLeader(new LeaderElectorListener() {
-      @Override
-      public void onBecomingLeader() {
-        isLeader1.res = true;
-      }
-    });
-    leaderElector2.tryBecomeLeader(new LeaderElectorListener() {
-      @Override
-      public void onBecomingLeader() {
-        isLeader2.res = true;
-      }
-    });
+    leaderElector1.tryBecomeLeader();
+    leaderElector2.tryBecomeLeader();
 
     // After Leader Election
     Assert.assertTrue(leaderElector1.amILeader());
@@ -509,7 +432,7 @@ public class TestZkLeaderElector {
     zkUtils2.close();
   }
 
-  private ZkUtils getZkUtilsWithNewClient(String processorId) {
+  private ZkUtils getZkUtilsWithNewClient() {
     ZkConnection zkConnection = 
ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
     return new ZkUtils(
         KEY_BUILDER,

http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index e03498c..a3e70b8 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -498,7 +498,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
     @volatile var onContainerFailedThrowable: Throwable = null
 
     val mockRunLoop = mock[RunLoop]
-    when(mockRunLoop.run).then(new Answer[Unit] {
+    when(mockRunLoop.run).thenAnswer(new Answer[Unit] {
       override def answer(invocation: InvocationOnMock): Unit = {
         Thread.sleep(100)
       }

Reply via email to