Repository: samza Updated Branches: refs/heads/master 67b195363 -> fb7aa73f5
SAMZA-1251 - Remove DebounceTimer dependency from ZkLeaderElector & ZkController Addresses the following: * Makes LeaderElectionListener to be explicitly registered by the caller * Removes debouncetimer dependency from ZkLeaderElector implementation * [Bug] onBecomeLeader was scheduling a task in timer under "OnBecomeLeader", when it should actually be the same as "OnProcessorChange". Otherwise, it will not cancel when there is a new OnProcessorChange event. * [Transient Test Failure] `TestScheduleAfterDebounceTime` tests were relying on timing controlled by sleep. Fixed it by using latch Author: Navina Ramesh <[email protected]> Reviewers: Xinyu Liu <[email protected]>, Boris Shkolnik <[email protected]> Closes #153 from navina/SAMZA-1251 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fb7aa73f Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fb7aa73f Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fb7aa73f Branch: refs/heads/master Commit: fb7aa73f5a574a7a57013b49f22896e122668e70 Parents: 67b1953 Author: Navina Ramesh <[email protected]> Authored: Fri May 5 13:50:08 2017 -0700 Committer: nramesh <[email protected]> Committed: Fri May 5 13:50:08 2017 -0700 ---------------------------------------------------------------------- .../coordinator/BarrierForVersionUpgrade.java | 9 +- .../samza/coordinator/CoordinationUtils.java | 5 +- .../apache/samza/coordinator/LeaderElector.java | 11 +- .../samza/runtime/LocalApplicationRunner.java | 6 +- .../samza/zk/ScheduleAfterDebounceTime.java | 14 +- .../samza/zk/ZkBarrierForVersionUpgrade.java | 28 +-- .../java/org/apache/samza/zk/ZkController.java | 11 +- .../org/apache/samza/zk/ZkControllerImpl.java | 67 ++----- .../apache/samza/zk/ZkControllerListener.java | 9 +- .../samza/zk/ZkCoordinationServiceFactory.java | 4 +- .../apache/samza/zk/ZkCoordinationUtils.java | 6 +- .../org/apache/samza/zk/ZkJobCoordinator.java | 89 +++++---- .../samza/zk/ZkJobCoordinatorFactory.java | 4 +- .../org/apache/samza/zk/ZkLeaderElector.java | 57 +++--- .../samza/job/local/ThreadJobFactory.scala | 1 - .../runtime/TestLocalApplicationRunner.java | 10 +- .../samza/zk/TestScheduleAfterDebounceTime.java | 69 +++---- .../apache/samza/zk/TestZkLeaderElector.java | 189 ++++++------------- .../samza/container/TestSamzaContainer.scala | 2 +- 19 files changed, 254 insertions(+), 337 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java index 145d81c..664cef8 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java @@ -28,11 +28,12 @@ import java.util.List; */ public interface BarrierForVersionUpgrade { /** - * Barrier is usually started by the leader. - * @param version - for which the barrier is created - * @param participatns - list of participants that need to join for barrier to complete + * Barrier is usually started by the leader. Creates the Barrier paths in ZK + * + * @param version - String, representing the version of the JobModel for which the barrier is created + * @param participants - {@link List} of participants that need to join for barrier to complete */ - void start(String version, List<String> participatns); + void start(String version, List<String> participants); /** * Called by the processor. http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java index 39bda24..952aa51 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java @@ -18,7 +18,9 @@ */ package org.apache.samza.coordinator; -/** THIS API WILL CHANGE +import org.apache.samza.annotation.InterfaceStability; + +/** * * Coordination service provides synchronization primitives. * The actual implementation (for example ZK based) is left to each implementation class. @@ -27,6 +29,7 @@ package org.apache.samza.coordinator; * - Latch * - barrier for version upgrades */ [email protected] public interface CoordinationUtils { /** http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java b/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java index c6c8bbb..c624f83 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java @@ -32,11 +32,16 @@ import org.apache.samza.annotation.InterfaceStability; @InterfaceStability.Evolving public interface LeaderElector { /** - * Async method that helps the caller participate in leader election. + * Register a LeaderElectorListener * - * @param leaderElectorListener to be invoked if the caller is chosen as a leader through the leader election process + * @param listener {@link LeaderElectorListener} interfaces to be invoked upon completion of leader election participation */ - void tryBecomeLeader(LeaderElectorListener leaderElectorListener); + void setLeaderElectorListener(LeaderElectorListener listener); + + /** + * Async method that helps the caller participate in leader election. + **/ + void tryBecomeLeader(); /** * Method that allows a caller to resign from leadership role. Caller can resign from leadership due to various http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 9fed202..b1f0aba 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -35,6 +35,7 @@ import org.apache.samza.config.Config; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.Latch; +import org.apache.samza.coordinator.LeaderElector; import org.apache.samza.execution.ExecutionPlan; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.processor.StreamProcessor; @@ -48,7 +49,6 @@ import org.apache.samza.zk.ZkJobCoordinatorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * This class implements the {@link ApplicationRunner} that runs the applications in standalone environment */ @@ -213,10 +213,12 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { if (!intStreams.isEmpty()) { if (coordinationUtils != null) { Latch initLatch = coordinationUtils.getLatch(1, INIT_LATCH_ID); - coordinationUtils.getLeaderElector().tryBecomeLeader(() -> { + LeaderElector leaderElector = coordinationUtils.getLeaderElector(); + leaderElector.setLeaderElectorListener(() -> { getStreamManager().createStreams(intStreams); initLatch.countDown(); }); + leaderElector.tryBecomeLeader(); initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES); } else { // each application process will try creating the streams, which http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java index 289d900..21572f5 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java @@ -39,13 +39,9 @@ import org.slf4j.LoggerFactory; * ZK based standalone app. */ public class ScheduleAfterDebounceTime { - public static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class); + public static final Logger LOGGER = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class); public static final long TIMEOUT_MS = 1000 * 10; // timeout to wait for a task to complete - // Names of actions. - // When the same action is scheduled it needs to cancel the previous one. - // To accomplish that we keep the previous future in a map, keyed by the action name. - // Here we predefine some actions which are used in the ZK based standalone app. // Action name when the JobModel version changes public static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange"; @@ -56,28 +52,28 @@ public class ScheduleAfterDebounceTime { public static final int DEBOUNCE_TIME_MS = 2000; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("zk-debounce-thread-%d").setDaemon(true).build()); + new ThreadFactoryBuilder().setNameFormat("debounce-thread-%d").setDaemon(true).build()); private final Map<String, ScheduledFuture> futureHandles = new HashMap<>(); synchronized public void scheduleAfterDebounceTime(String actionName, long debounceTimeMs, Runnable runnable) { // check if this action has been scheduled already ScheduledFuture sf = futureHandles.get(actionName); if (sf != null && !sf.isDone()) { - LOG.info("DEBOUNCE: cancel future for " + actionName); + LOGGER.info("cancel future for " + actionName); // attempt to cancel if (!sf.cancel(false)) { try { sf.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (Exception e) { // we ignore the exception - LOG.warn("cancel for action " + actionName + " failed with ", e); + LOGGER.warn("cancel for action " + actionName + " failed with ", e); } } futureHandles.remove(actionName); } // schedule a new task sf = scheduledExecutorService.schedule(runnable, debounceTimeMs, TimeUnit.MILLISECONDS); - LOG.info("DEBOUNCE: scheduled " + actionName + " in " + debounceTimeMs); + LOGGER.info("DEBOUNCE: scheduled " + actionName + " in " + debounceTimeMs); futureHandles.put(actionName, sf); } http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java index 20de43c..c7bfc1d 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java @@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory; */ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { private final ZkUtils zkUtils; - private final ZkKeyBuilder keyBuilder; private final static String BARRIER_DONE = "done"; private final static String BARRIER_TIMED_OUT = "TIMED_OUT"; private final static Logger LOG = LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class); @@ -49,31 +48,21 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { private final ScheduleAfterDebounceTime debounceTimer; private final String barrierPrefix; - private String barrierPath; private String barrierDonePath; private String barrierProcessors; private static final String VERSION_UPGRADE_TIMEOUT_TIMER = "VersionUpgradeTimeout"; private final long barrierTimeoutMS; public ZkBarrierForVersionUpgrade(String barrierId, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer, long barrierTimeoutMS) { + if (zkUtils == null) { + throw new RuntimeException("Cannot operate ZkBarrierForVersionUpgrade without ZkUtils."); + } this.zkUtils = zkUtils; - keyBuilder = zkUtils.getKeyBuilder(); - - barrierPrefix = keyBuilder.getJobModelVersionBarrierPrefix(barrierId); - + barrierPrefix = zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(barrierId); this.debounceTimer = debounceTimer; this.barrierTimeoutMS = barrierTimeoutMS; } - /** - * set the barrier for the timer. If the timer is not achieved by the timeout - it will fail - * @param version for which the barrier is created - * @param timeout - time in ms to wait - */ - private void setTimer(final String version, final long timeout, final Stat currentStatOfBarrierDone) { - debounceTimer.scheduleAfterDebounceTime(VERSION_UPGRADE_TIMEOUT_TIMER, timeout, ()->timerOff(version, currentStatOfBarrierDone)); - } - protected long getBarrierTimeOutMs() { return barrierTimeoutMS; } @@ -84,7 +73,6 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_TIMED_OUT, currentStatOfBarrierDone.getVersion()); } catch (ZkBadVersionException e) { // Expected. failed to write, make sure the value is "DONE" - ///LOG.("Barrier timeout write failed"); String done = zkUtils.getZkClient().<String>readData(barrierDonePath); LOG.info("Barrier timeout expired, but done=" + done); if (!done.equals(BARRIER_DONE)) { @@ -94,7 +82,7 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { } private void setPaths(String version) { - barrierPath = String.format("%s/barrier_%s", barrierPrefix, version); + String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version); barrierDonePath = String.format("%s/barrier_done", barrierPath); barrierProcessors = String.format("%s/barrier_processors", barrierPath); @@ -103,7 +91,6 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { @Override public void start(String version, List<String> participants) { - setPaths(version); // subscribe for processor's list changes @@ -114,7 +101,10 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { Stat currentStatOfBarrierDone = new Stat(); zkUtils.getZkClient().readData(barrierDonePath, currentStatOfBarrierDone); - setTimer(version, getBarrierTimeOutMs(), currentStatOfBarrierDone); + debounceTimer.scheduleAfterDebounceTime( + VERSION_UPGRADE_TIMEOUT_TIMER, + getBarrierTimeOutMs(), + () -> timerOff(version, currentStatOfBarrierDone)); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ZkController.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java index 20c62cf..de2e473 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java @@ -22,11 +22,18 @@ package org.apache.samza.zk; /** * Api to the functionality provided by ZK + * + * Api for JC to ZK communication */ public interface ZkController { void register(); boolean isLeader(); - void notifyJobModelChange(String version); void stop(); - void listenToProcessorLiveness(); + + // Leader + /** + * Allows the {@link ZkJobCoordinator} to subscribe to changes to Zk nodes in the processors subtree + * Typically, the leader is interested in such notifications. + */ + void subscribeToProcessorChange(); } http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java index b6e3aed..52bfef1 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java @@ -22,7 +22,7 @@ package org.apache.samza.zk; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.apache.samza.SamzaException; -import org.apache.samza.coordinator.LeaderElectorListener; +import org.apache.samza.coordinator.LeaderElector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,16 +35,14 @@ public class ZkControllerImpl implements ZkController { private final String processorIdStr; private final ZkUtils zkUtils; private final ZkControllerListener zkControllerListener; - private final ZkLeaderElector leaderElector; - private final ScheduleAfterDebounceTime debounceTimer; + private final LeaderElector zkLeaderElector; - public ZkControllerImpl(String processorIdStr, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer, - ZkControllerListener zkControllerListener) { + public ZkControllerImpl(String processorIdStr, ZkUtils zkUtils, + ZkControllerListener zkControllerListener, LeaderElector zkLeaderElector) { this.processorIdStr = processorIdStr; this.zkUtils = zkUtils; this.zkControllerListener = zkControllerListener; - this.leaderElector = new ZkLeaderElector(processorIdStr, zkUtils, debounceTimer); - this.debounceTimer = debounceTimer; + this.zkLeaderElector = zkLeaderElector; init(); } @@ -62,49 +60,32 @@ public class ZkControllerImpl implements ZkController { public void register() { // TODO - make a loop here with some number of attempts. // possibly split into two method - becomeLeader() and becomeParticipant() - leaderElector.tryBecomeLeader(new LeaderElectorListener() { - @Override - public void onBecomingLeader() { - listenToProcessorLiveness(); - - // inform the caller - zkControllerListener.onBecomeLeader(); - } - }); + zkLeaderElector.tryBecomeLeader(); // subscribe to JobModel version updates - zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(debounceTimer)); + zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler()); } @Override public boolean isLeader() { - return leaderElector.amILeader(); - } - - @Override - public void notifyJobModelChange(String version) { - zkControllerListener.onNewJobModelAvailable(version); + return zkLeaderElector.amILeader(); } @Override public void stop() { if (isLeader()) { - leaderElector.resignLeadership(); + zkLeaderElector.resignLeadership(); } zkUtils.close(); } @Override - public void listenToProcessorLiveness() { - zkUtils.subscribeToProcessorChange(new ZkProcessorChangeHandler(debounceTimer)); + public void subscribeToProcessorChange() { + zkUtils.subscribeToProcessorChange(new ProcessorChangeHandler()); } // Only by Leader - class ZkProcessorChangeHandler implements IZkChildListener { - private final ScheduleAfterDebounceTime debounceTimer; - public ZkProcessorChangeHandler(ScheduleAfterDebounceTime debounceTimer) { - this.debounceTimer = debounceTimer; - } + class ProcessorChangeHandler implements IZkChildListener { /** * Called when the children of the given path changed. * @@ -115,18 +96,13 @@ public class ZkControllerImpl implements ZkController { @Override public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception { LOG.info( - "ZkControllerImpl::ZkProcessorChangeHandler::handleChildChange - Path: " + parentPath + " Current Children: " + "ZkControllerImpl::ProcessorChangeHandler::handleChildChange - Path: " + parentPath + " Current Children: " + currentChildren); - debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, - ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> zkControllerListener.onProcessorChange(currentChildren)); + zkControllerListener.onProcessorChange(currentChildren); } } class ZkJobModelVersionChangeHandler implements IZkDataListener { - private final ScheduleAfterDebounceTime debounceTimer; - public ZkJobModelVersionChangeHandler(ScheduleAfterDebounceTime debounceTimer) { - this.debounceTimer = debounceTimer; - } /** * called when job model version gets updated * @param dataPath @@ -136,22 +112,13 @@ public class ZkControllerImpl implements ZkController { @Override public void handleDataChange(String dataPath, Object data) throws Exception { LOG.info("pid=" + processorIdStr + ". Got notification on version update change. path=" + dataPath + "; data=" - + (String) data); - - debounceTimer - .scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () -> notifyJobModelChange((String) data)); + + data); + zkControllerListener.onNewJobModelAvailable((String) data); } + @Override public void handleDataDeleted(String dataPath) throws Exception { throw new SamzaException("version update path has been deleted!"); } } - - public void shutdown() { - if (debounceTimer != null) - debounceTimer.stopScheduler(); - - if (zkUtils != null) - zkUtils.close(); - } } http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java index f7fedd7..af4d56c 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java @@ -21,12 +21,15 @@ package org.apache.samza.zk; import java.util.List; - /** - * callbacks to the caller of the ZkController + * Interface to listen for notifications from the {@link ZkController} */ public interface ZkControllerListener { - void onBecomeLeader(); + /** + * ZkController observes the ZkTree for changes to group membership of processors and notifies the listener + * + * @param processorIds List of current znodes that are in the processing group + */ void onProcessorChange(List<String> processorIds); void onNewJobModelAvailable(String version); // start job model update (stop current work) http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java index 6ddc3fe..07da147 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java @@ -24,15 +24,13 @@ import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.CoordinationServiceFactory; - public class ZkCoordinationServiceFactory implements CoordinationServiceFactory { // TODO - Why should this method be synchronized? synchronized public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) { ZkConfig zkConfig = new ZkConfig(config); ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs()); - ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime(); - return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, debounceTimer); + return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, new ScheduleAfterDebounceTime()); } } http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java index 8f0b6d2..5a6c88a 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java @@ -18,7 +18,6 @@ */ package org.apache.samza.zk; -import com.google.common.annotations.VisibleForTesting; import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.BarrierForVersionUpgrade; import org.apache.samza.coordinator.CoordinationUtils; @@ -47,7 +46,7 @@ public class ZkCoordinationUtils implements CoordinationUtils { @Override public LeaderElector getLeaderElector() { - return new ZkLeaderElector(processorIdStr, zkUtils, debounceTimer); + return new ZkLeaderElector(processorIdStr, zkUtils); } @Override @@ -59,7 +58,8 @@ public class ZkCoordinationUtils implements CoordinationUtils { public BarrierForVersionUpgrade getBarrier(String barrierId) { return new ZkBarrierForVersionUpgrade(barrierId, zkUtils, debounceTimer, zkConfig.getZkBarrierTimeoutMs()); } - @VisibleForTesting + + // TODO - SAMZA-1128 CoordinationService should directly depende on ZkUtils and DebounceTimer public ZkUtils getZkUtils() { return zkUtils; } http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 08f779f..0ac9e8e 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -18,12 +18,6 @@ */ package org.apache.samza.zk; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.samza.SamzaException; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; @@ -33,6 +27,8 @@ import org.apache.samza.coordinator.BarrierForVersionUpgrade; import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobModelManager; +import org.apache.samza.coordinator.LeaderElector; +import org.apache.samza.coordinator.LeaderElectorListener; import org.apache.samza.job.model.JobModel; import org.apache.samza.coordinator.JobCoordinatorListener; import org.apache.samza.runtime.ProcessorIdGenerator; @@ -43,6 +39,13 @@ import org.apache.samza.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + /** * JobCoordinator for stand alone processor managed via Zookeeper. */ @@ -61,15 +64,18 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { private JobCoordinatorListener coordinatorListener = null; private JobModel newJobModel; - public ZkJobCoordinator(Config config, ScheduleAfterDebounceTime debounceTimer) { - this.debounceTimer = debounceTimer; + public ZkJobCoordinator(Config config) { + this.debounceTimer = new ScheduleAfterDebounceTime(); this.config = config; this.processorId = createProcessorId(config); this.coordinationUtils = new ZkCoordinationServiceFactory() .getCoordinationService(new ApplicationConfig(config).getGlobalAppId(), String.valueOf(processorId), config); this.zkUtils = ((ZkCoordinationUtils) coordinationUtils).getZkUtils(); - this.zkController = new ZkControllerImpl(processorId, zkUtils, debounceTimer, this); - this.streamMetadataCache = getStreamMetadataCache(); + LeaderElector leaderElector = new ZkLeaderElector(this.processorId, zkUtils); + leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl()); + + this.zkController = new ZkControllerImpl(processorId, zkUtils, this, leaderElector); + streamMetadataCache = getStreamMetadataCache(); } private StreamMetadataCache getStreamMetadataCache() { @@ -100,6 +106,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { if (coordinatorListener != null) { coordinatorListener.onJobModelExpired(); } + debounceTimer.stopScheduler(); zkController.stop(); if (coordinatorListener != null) { coordinatorListener.onCoordinatorStop(); @@ -123,19 +130,13 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { //////////////////////////////////////////////// LEADER stuff /////////////////////////// @Override - public void onBecomeLeader() { - log.info("ZkJobCoordinator::onBecomeLeader - I became the leader!"); - - List<String> emptyList = new ArrayList<>(); - - // actual actions to do are the same as onProcessorChange() + public void onProcessorChange(List<String> processors) { + log.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size()); debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, - ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> onProcessorChange(emptyList)); + ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> doOnProcessorChange(processors)); } - @Override - public void onProcessorChange(List<String> processors) { - log.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size()); + public void doOnProcessorChange(List<String> processors) { // if list of processors is empty - it means we are called from 'onBecomeLeader' generateNewJobModel(processors); if (coordinatorListener != null) { @@ -145,26 +146,24 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { @Override public void onNewJobModelAvailable(final String version) { - log.info("pid=" + processorId + "new JobModel available"); - // stop current work - if (coordinatorListener != null) { - coordinatorListener.onJobModelExpired(); - } - log.info("pid=" + processorId + "new JobModel available.Container stopped."); - // get the new job model - newJobModel = zkUtils.getJobModel(version); + debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () -> + { + log.info("pid=" + processorId + "new JobModel available"); + // stop current work + if (coordinatorListener != null) { + coordinatorListener.onJobModelExpired(); + } + log.info("pid=" + processorId + "new JobModel available.Container stopped."); + // get the new job model + newJobModel = zkUtils.getJobModel(version); - log.info("pid=" + processorId + ": new JobModel available. ver=" + version + "; jm = " + newJobModel); + log.info("pid=" + processorId + ": new JobModel available. ver=" + version + "; jm = " + newJobModel); - // update ZK and wait for all the processors to get this new version - ZkBarrierForVersionUpgrade barrier = (ZkBarrierForVersionUpgrade) coordinationUtils.getBarrier( - JOB_MODEL_UPGRADE_BARRIER); - barrier.waitForBarrier(version, processorId, new Runnable() { - @Override - public void run() { - onNewJobModelConfirmed(version); - } - }); + // update ZK and wait for all the processors to get this new version + ZkBarrierForVersionUpgrade barrier = (ZkBarrierForVersionUpgrade) coordinationUtils.getBarrier( + JOB_MODEL_UPGRADE_BARRIER); + barrier.waitForBarrier(version, processorId, () -> onNewJobModelConfirmed(version)); + }); } @Override @@ -245,4 +244,18 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion); log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion); } + + class LeaderElectorListenerImpl implements LeaderElectorListener { + @Override + public void onBecomingLeader() { + log.info("ZkJobCoordinator::onBecomeLeader - I became the leader!"); + zkController.subscribeToProcessorChange(); + debounceTimer.scheduleAfterDebounceTime( + ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, + ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> { + // actual actions to do are the same as onProcessorChange() + doOnProcessorChange(new ArrayList<>()); + }); + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java index e02e504..d2e0d14 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java @@ -32,8 +32,6 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { */ @Override public JobCoordinator getJobCoordinator(Config config) { - ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime(); - - return new ZkJobCoordinator(config, debounceTimer); + return new ZkJobCoordinator(config); } } http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java index 4ffe3e4..8caa5c6 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java @@ -49,28 +49,34 @@ public class ZkLeaderElector implements LeaderElector { private final String processorIdStr; private final ZkKeyBuilder keyBuilder; private final String hostName; - private final ScheduleAfterDebounceTime debounceTimer; private AtomicBoolean isLeader = new AtomicBoolean(false); - private IZkDataListener previousProcessorChangeListener; + private final IZkDataListener previousProcessorChangeListener; + private LeaderElectorListener leaderElectorListener = null; private String currentSubscription = null; private final Random random = new Random(); - @VisibleForTesting - public void setPreviousProcessorChangeListener(IZkDataListener previousProcessorChangeListener) { - this.previousProcessorChangeListener = previousProcessorChangeListener; - } - - public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer) { + public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils) { this.processorIdStr = processorIdStr; this.zkUtils = zkUtils; this.keyBuilder = zkUtils.getKeyBuilder(); this.hostName = getHostName(); - this.debounceTimer = (debounceTimer != null) ? debounceTimer : new ScheduleAfterDebounceTime(); + this.previousProcessorChangeListener = new PreviousProcessorChangeListener(); zkUtils.makeSurePersistentPathsExists(new String[]{keyBuilder.getProcessorsPath()}); } + @VisibleForTesting + public ZkLeaderElector(String processorIdStr, + ZkUtils zkUtils, + IZkDataListener previousProcessorChangeListener) { + this.processorIdStr = processorIdStr; + this.zkUtils = zkUtils; + this.keyBuilder = zkUtils.getKeyBuilder(); + this.hostName = getHostName(); + this.previousProcessorChangeListener = previousProcessorChangeListener; + } + // TODO: This should go away once we integrate with Zk based Job Coordinator private String getHostName() { try { @@ -81,8 +87,21 @@ public class ZkLeaderElector implements LeaderElector { } } + /** + * Register a LeaderElectorListener + * + * @param listener {@link LeaderElectorListener} interfaces to be invoked upon completion of leader election participation + */ + @Override + public void setLeaderElectorListener(LeaderElectorListener listener) { + this.leaderElectorListener = listener; + } + + /** + * Async method that helps the caller participate in leader election. + **/ @Override - public void tryBecomeLeader(LeaderElectorListener leaderElectorListener) { + public void tryBecomeLeader() { String currentPath = zkUtils.registerProcessorAndGetId(new ProcessorData(hostName, processorIdStr)); List<String> children = zkUtils.getSortedActiveProcessorsZnodes(); @@ -97,7 +116,9 @@ public class ZkLeaderElector implements LeaderElector { if (index == 0) { isLeader.getAndSet(true); LOG.info(zLog("Eligible to become the leader!")); - debounceTimer.scheduleAfterDebounceTime("ON_BECOMING_LEADER", 1, () -> leaderElectorListener.onBecomingLeader()); // inform the caller + if (leaderElectorListener != null) { + leaderElectorListener.onBecomingLeader(); + } return; } @@ -105,17 +126,12 @@ public class ZkLeaderElector implements LeaderElector { LOG.info("Index = " + index + " Not eligible to be a leader yet!"); String predecessor = children.get(index - 1); if (!predecessor.equals(currentSubscription)) { - if (currentSubscription != null) { LOG.debug(zLog("Unsubscribing data change for " + currentSubscription)); zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, previousProcessorChangeListener); } currentSubscription = predecessor; - // callback in case if the previous node gets deleted (when previous processor dies) - if (previousProcessorChangeListener == null) - previousProcessorChangeListener = new PreviousProcessorChangeListener(leaderElectorListener); - LOG.info(zLog("Subscribing data change for " + predecessor)); zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, previousProcessorChangeListener); @@ -135,7 +151,7 @@ public class ZkLeaderElector implements LeaderElector { Thread.interrupted(); } LOG.info(zLog("Predecessor doesn't exist anymore. Trying to become leader again...")); - tryBecomeLeader(leaderElectorListener); + tryBecomeLeader(); } } @@ -155,11 +171,6 @@ public class ZkLeaderElector implements LeaderElector { // Only by non-leaders class PreviousProcessorChangeListener implements IZkDataListener { - private final LeaderElectorListener leaderElectorListener; - PreviousProcessorChangeListener(LeaderElectorListener leaderElectorListener) { - this.leaderElectorListener = leaderElectorListener; - } - @Override public void handleDataChange(String dataPath, Object data) throws Exception { LOG.debug("Data change on path: " + dataPath + " Data: " + data); @@ -169,7 +180,7 @@ public class ZkLeaderElector implements LeaderElector { public void handleDataDeleted(String dataPath) throws Exception { LOG.info( zLog("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader again...")); - tryBecomeLeader(leaderElectorListener); + tryBecomeLeader(); } } } http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index b8522b9..385a060 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -19,7 +19,6 @@ package org.apache.samza.job.local - import org.apache.samza.config.Config import org.apache.samza.config.JobConfig._ import org.apache.samza.config.ShellCommandConfig._ http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index 2d2bf16..9d15211 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@ -153,8 +153,15 @@ public class TestLocalApplicationRunner { CoordinationUtils coordinationUtils = mock(CoordinationUtils.class); LeaderElector leaderElector = new LeaderElector() { + private LeaderElectorListener leaderElectorListener; + @Override - public void tryBecomeLeader(LeaderElectorListener leaderElectorListener) { + public void setLeaderElectorListener(LeaderElectorListener listener) { + this.leaderElectorListener = listener; + } + + @Override + public void tryBecomeLeader() { leaderElectorListener.onBecomingLeader(); } @@ -166,6 +173,7 @@ public class TestLocalApplicationRunner { return false; } }; + Latch latch = new Latch() { boolean done = false; @Override http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java index 23a8cc1..cd396ad 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java @@ -20,79 +20,72 @@ package org.apache.samza.zk; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; public class TestScheduleAfterDebounceTime { - private static final long DEBOUNCE_TIME = 500; - int i = 0; - @Before - public void setup() { - - } + private static final long WAIT_TIME = 500; class TestObj { + private volatile int i = 0; public void inc() { i++; } public void setTo(int val) { i = val; } - public void doNothing() { - + public int get() { + return i; } } + @Test - public void testSchedule() { - ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime(); + public void testSchedule() throws InterruptedException { + ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime(); + final CountDownLatch latch = new CountDownLatch(1); final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj(); - debounceTimer.scheduleAfterDebounceTime("TEST1", DEBOUNCE_TIME, () -> { + scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME, () -> { testObj.inc(); + latch.countDown(); }); // action is delayed - Assert.assertEquals(0, i); + Assert.assertEquals(0, testObj.get()); - TestZkUtils.sleepMs(DEBOUNCE_TIME + 10); + boolean result = latch.await(WAIT_TIME * 2, TimeUnit.MILLISECONDS); + Assert.assertTrue("Latch timed-out and task was not scheduled on time.", result); + Assert.assertEquals(1, testObj.get()); - // debounce time passed - Assert.assertEquals(1, i); + scheduledQueue.stopScheduler(); } @Test - public void testCancelAndSchedule() { - ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime(); + public void testCancelAndSchedule() throws InterruptedException { + ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime(); + final CountDownLatch test1Latch = new CountDownLatch(1); final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj(); - debounceTimer.scheduleAfterDebounceTime("TEST1", DEBOUNCE_TIME, () -> - { - testObj.inc(); - } - ); - Assert.assertEquals(0, i); + scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME, testObj::inc); // next schedule should cancel the previous one with the same name - debounceTimer.scheduleAfterDebounceTime("TEST1", 2 * DEBOUNCE_TIME, () -> + scheduledQueue.scheduleAfterDebounceTime("TEST1", 2 * WAIT_TIME, () -> { - testObj.setTo(100); + testObj.inc(); + test1Latch.countDown(); } ); - TestZkUtils.sleepMs(DEBOUNCE_TIME + 10); - - // still should be the old value - Assert.assertEquals(0, i); - + final TestObj testObj2 = new TestScheduleAfterDebounceTime.TestObj(); // this schedule should not cancel the previous one, because it has different name - debounceTimer.scheduleAfterDebounceTime("TEST2", DEBOUNCE_TIME, () -> - { - testObj.doNothing(); - } - ); + scheduledQueue.scheduleAfterDebounceTime("TEST2", WAIT_TIME, testObj2::inc); - TestZkUtils.sleepMs(3 * DEBOUNCE_TIME + 10); + boolean result = test1Latch.await(4 * WAIT_TIME, TimeUnit.MILLISECONDS); + Assert.assertTrue("Latch timed-out. Scheduled tasks were not run correctly.", result); + Assert.assertEquals(1, testObj.get()); + Assert.assertEquals(1, testObj2.get()); - Assert.assertEquals(100, i); + scheduledQueue.stopScheduler(); } } http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java index 5aaee2a..48dca9a 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java @@ -22,7 +22,6 @@ import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.apache.samza.SamzaException; -import org.apache.samza.coordinator.LeaderElectorListener; import org.apache.samza.testUtils.EmbeddedZookeeper; import org.junit.After; import org.junit.AfterClass; @@ -63,7 +62,7 @@ public class TestZkLeaderElector { public void testSetup() { testZkConnectionString = "127.0.0.1:" + zkServer.getPort(); try { - testZkUtils = getZkUtilsWithNewClient("testProcessorId"); + testZkUtils = getZkUtilsWithNewClient(); } catch (Exception e) { Assert.fail("Client connection setup failed. Aborting tests.."); } @@ -109,13 +108,9 @@ public class TestZkLeaderElector { ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils, null); BooleanResult isLeader = new BooleanResult(); + leaderElector.setLeaderElectorListener(() -> isLeader.res = true); - leaderElector.tryBecomeLeader(new LeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader.res = true; - } - }); + leaderElector.tryBecomeLeader(); Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader.res, 2, 100)); } @@ -131,13 +126,9 @@ public class TestZkLeaderElector { when(mockZkUtils.getKeyBuilder()).thenReturn(kb); ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, mockZkUtils, null); - + leaderElector.setLeaderElectorListener(() -> { }); try { - leaderElector.tryBecomeLeader(new LeaderElectorListener() { - @Override - public void onBecomingLeader() { - } - }); + leaderElector.tryBecomeLeader(); Assert.fail("Was expecting leader election to fail!"); } catch (SamzaException e) { // No-op Expected @@ -155,38 +146,25 @@ public class TestZkLeaderElector { // Processor-1 - ZkUtils zkUtils1 = getZkUtilsWithNewClient("1"); + ZkUtils zkUtils1 = getZkUtilsWithNewClient(); ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1, null); + leaderElector1.setLeaderElectorListener(() -> isLeader1.res = true); // Processor-2 - ZkUtils zkUtils2 = getZkUtilsWithNewClient("2"); + ZkUtils zkUtils2 = getZkUtilsWithNewClient(); ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2, null); - + leaderElector2.setLeaderElectorListener(() -> isLeader2.res = true); // Processor-3 - ZkUtils zkUtils3 = getZkUtilsWithNewClient("3"); + ZkUtils zkUtils3 = getZkUtilsWithNewClient(); ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3, null); + leaderElector3.setLeaderElectorListener(() -> isLeader3.res = true); Assert.assertEquals(0, testZkUtils.getSortedActiveProcessorsZnodes().size()); - leaderElector1.tryBecomeLeader(new LeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader1.res = true; - } - }); - leaderElector2.tryBecomeLeader(new LeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader2.res = true; - } - }); - leaderElector3.tryBecomeLeader(new LeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader3.res = true; - } - }); + leaderElector1.tryBecomeLeader(); + leaderElector2.tryBecomeLeader(); + leaderElector3.tryBecomeLeader(); Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader1.res, 2, 100)); Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 2, 100)); @@ -222,11 +200,9 @@ public class TestZkLeaderElector { // Processor-1 - ZkUtils zkUtils1 = getZkUtilsWithNewClient("processor1"); + ZkUtils zkUtils1 = getZkUtilsWithNewClient(); zkUtils1.registerProcessorAndGetId(new ProcessorData("processor1", "1")); - ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", zkUtils1, null); - - leaderElector1.setPreviousProcessorChangeListener(new IZkDataListener() { + ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", zkUtils1, new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { @@ -238,13 +214,12 @@ public class TestZkLeaderElector { count.incrementAndGet(); } }); + leaderElector1.setLeaderElectorListener(() -> isLeader1.res = true); // Processor-2 - ZkUtils zkUtils2 = getZkUtilsWithNewClient("processor2"); + ZkUtils zkUtils2 = getZkUtilsWithNewClient(); final String path2 = zkUtils2.registerProcessorAndGetId(new ProcessorData("processor2", "2")); - ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", zkUtils2, null); - - leaderElector2.setPreviousProcessorChangeListener(new IZkDataListener() { + ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", zkUtils2, new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { @@ -270,13 +245,12 @@ public class TestZkLeaderElector { electionLatch.countDown(); } }); + leaderElector2.setLeaderElectorListener(() -> isLeader2.res = true); // Processor-3 - ZkUtils zkUtils3 = getZkUtilsWithNewClient("processor3"); + ZkUtils zkUtils3 = getZkUtilsWithNewClient(); zkUtils3.registerProcessorAndGetId(new ProcessorData("processor3", "3")); - ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", zkUtils3, null); - - leaderElector3.setPreviousProcessorChangeListener(new IZkDataListener() { + ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", zkUtils3, new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { @@ -288,26 +262,12 @@ public class TestZkLeaderElector { count.incrementAndGet(); } }); + leaderElector3.setLeaderElectorListener(() -> isLeader3.res = true); // Join Leader Election - leaderElector1.tryBecomeLeader(new LeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader1.res = true; - } - }); - leaderElector2.tryBecomeLeader(new LeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader2.res = true; - } - }); - leaderElector3.tryBecomeLeader(new LeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader3.res = true; - } - }); + leaderElector1.tryBecomeLeader(); + leaderElector2.tryBecomeLeader(); + leaderElector3.tryBecomeLeader(); Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader1.res, 2, 100)); Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 2, 100)); @@ -357,55 +317,39 @@ public class TestZkLeaderElector { BooleanResult isLeader3 = new BooleanResult(); // Processor-1 - ZkUtils zkUtils1 = getZkUtilsWithNewClient("processor1"); + ZkUtils zkUtils1 = getZkUtilsWithNewClient(); zkUtils1.registerProcessorAndGetId(new ProcessorData("processor1", "1")); - ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", zkUtils1, null); - - leaderElector1.setPreviousProcessorChangeListener(new IZkDataListener() { - @Override - public void handleDataChange(String dataPath, Object data) - throws Exception { - - } - - @Override - public void handleDataDeleted(String dataPath) - throws Exception { - count.incrementAndGet(); - } - }); + ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", zkUtils1, new IZkDataListener() { + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { } + @Override + public void handleDataDeleted(String dataPath) throws Exception { + count.incrementAndGet(); + } + }); + leaderElector1.setLeaderElectorListener(() -> isLeader1.res = true); // Processor-2 - ZkUtils zkUtils2 = getZkUtilsWithNewClient("processor2"); + ZkUtils zkUtils2 = getZkUtilsWithNewClient(); zkUtils2.registerProcessorAndGetId(new ProcessorData("processor2", "2")); - ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", zkUtils2, null); - - leaderElector2.setPreviousProcessorChangeListener(new IZkDataListener() { + ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", zkUtils2, new IZkDataListener() { @Override - public void handleDataChange(String dataPath, Object data) - throws Exception { - - } + public void handleDataChange(String dataPath, Object data) throws Exception { } @Override - public void handleDataDeleted(String dataPath) - throws Exception { + public void handleDataDeleted(String dataPath) throws Exception { count.incrementAndGet(); } }); + leaderElector2.setLeaderElectorListener(() -> isLeader2.res = true); // Processor-3 - ZkUtils zkUtils3 = getZkUtilsWithNewClient("processor3"); + ZkUtils zkUtils3 = getZkUtilsWithNewClient(); final String path3 = zkUtils3.registerProcessorAndGetId(new ProcessorData("processor3", "3")); - ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", zkUtils3, null); - - leaderElector3.setPreviousProcessorChangeListener(new IZkDataListener() { + ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", zkUtils3, new IZkDataListener() { @Override - public void handleDataChange(String dataPath, Object data) - throws Exception { - - } + public void handleDataChange(String dataPath, Object data) throws Exception { } @Override public void handleDataDeleted(String dataPath) @@ -427,26 +371,13 @@ public class TestZkLeaderElector { electionLatch.countDown(); } }); + leaderElector3.setLeaderElectorListener(() -> isLeader3.res = true); // Join Leader Election - leaderElector1.tryBecomeLeader(new LeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader1.res = true; - } - }); - leaderElector2.tryBecomeLeader(new LeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader2.res = true; - } - }); - leaderElector3.tryBecomeLeader(new LeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader3.res = true; - } - }); + leaderElector1.tryBecomeLeader(); + leaderElector2.tryBecomeLeader(); + leaderElector3.tryBecomeLeader(); + Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader1.res, 2, 100)); Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 2, 100)); Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 2, 100)); @@ -477,29 +408,21 @@ public class TestZkLeaderElector { BooleanResult isLeader2 = new BooleanResult(); // Processor-1 - ZkUtils zkUtils1 = getZkUtilsWithNewClient("1"); + ZkUtils zkUtils1 = getZkUtilsWithNewClient(); ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1, null); + leaderElector1.setLeaderElectorListener(() -> isLeader1.res = true); // Processor-2 - ZkUtils zkUtils2 = getZkUtilsWithNewClient("2"); + ZkUtils zkUtils2 = getZkUtilsWithNewClient(); ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2, null); + leaderElector2.setLeaderElectorListener(() -> isLeader2.res = true); // Before Leader Election Assert.assertFalse(leaderElector1.amILeader()); Assert.assertFalse(leaderElector2.amILeader()); - leaderElector1.tryBecomeLeader(new LeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader1.res = true; - } - }); - leaderElector2.tryBecomeLeader(new LeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader2.res = true; - } - }); + leaderElector1.tryBecomeLeader(); + leaderElector2.tryBecomeLeader(); // After Leader Election Assert.assertTrue(leaderElector1.amILeader()); @@ -509,7 +432,7 @@ public class TestZkLeaderElector { zkUtils2.close(); } - private ZkUtils getZkUtilsWithNewClient(String processorId) { + private ZkUtils getZkUtilsWithNewClient() { ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS); return new ZkUtils( KEY_BUILDER, http://git-wip-us.apache.org/repos/asf/samza/blob/fb7aa73f/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index e03498c..a3e70b8 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -498,7 +498,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { @volatile var onContainerFailedThrowable: Throwable = null val mockRunLoop = mock[RunLoop] - when(mockRunLoop.run).then(new Answer[Unit] { + when(mockRunLoop.run).thenAnswer(new Answer[Unit] { override def answer(invocation: InvocationOnMock): Unit = { Thread.sleep(100) }
