SAMZA-1406: Fix potential orphaned containers problem in stand alone
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c45c7747 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c45c7747 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c45c7747 Branch: refs/heads/master Commit: c45c7747ae371eee11e2f41dd4e32a53b12c6c91 Parents: ebce13e Author: Shanthoosh Venkataraman <[email protected]> Authored: Thu Sep 14 21:17:36 2017 -0700 Committer: navina <[email protected]> Committed: Thu Sep 14 21:18:55 2017 -0700 ---------------------------------------------------------------------- .../samza/zk/ScheduleAfterDebounceTime.java | 172 +++++++++++++------ .../org/apache/samza/zk/ZkJobCoordinator.java | 26 ++- .../samza/zk/TestScheduleAfterDebounceTime.java | 74 +++++++- 3 files changed, 204 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c45c7747/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java index 6174063..3a7dca9 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java @@ -20,99 +20,157 @@ package org.apache.samza.zk; import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; - +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * This class allows scheduling a Runnable actions after some debounce time. * When the same action is scheduled it needs to cancel the previous one. To accomplish that we keep the previous - * future in a map, keyed by the action name. Here we predefine some actions, which are used in the - * ZK based standalone app. + * future in a map, keyed by the action name. */ public class ScheduleAfterDebounceTime { - public static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class); - public static final long TIMEOUT_MS = 1000 * 10; // timeout to wait for a task to complete + private static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class); + private static final String DEBOUNCE_THREAD_NAME_FORMAT = "debounce-thread-%d"; - // Here we predefine some actions which are used in the ZK based standalone app. - // Action name when the JobModel version changes - public static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange"; - - // Action name when the Processor membership changes - public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange"; + // timeout to wait for a task to complete. + private static final int TIMEOUT_MS = 1000 * 10; /** - * - * cleanup process is started after every new job model generation is complete. - * It deletes old versions of job model and the barrier. - * How many to delete (or to leave) is controlled by @see org.apache.samza.zk.ZkJobCoordinator#NUM_VERSIONS_TO_LEAVE. - **/ - public static final String ON_ZK_CLEANUP = "OnCleanUp"; + * {@link ScheduledTaskCallback} associated with the scheduler. OnError method of the + * callback will be invoked on first scheduled task failure. + */ + private Optional<ScheduledTaskCallback> scheduledTaskCallback; - private final ScheduledTaskFailureCallback scheduledTaskFailureCallback; + // Responsible for scheduling delayed actions. + private final ScheduledExecutorService scheduledExecutorService; - private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("debounce-thread-%d").setDaemon(true).build()); + /** + * A map from actionName to {@link ScheduledFuture} of task scheduled for execution. + */ private final Map<String, ScheduledFuture> futureHandles = new HashMap<>(); - // Ideally, this should be only used for testing. But ZkBarrierForVersionUpgrades uses it. This needs to be fixed. - // TODO: Timer shouldn't be passed around the components. It should be associated with the JC or the caller of - // coordinationUtils. public ScheduleAfterDebounceTime() { - this.scheduledTaskFailureCallback = null; + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(DEBOUNCE_THREAD_NAME_FORMAT).setDaemon(true).build(); + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory); + } + + public void setScheduledTaskCallback(ScheduledTaskCallback scheduledTaskCallback) { + this.scheduledTaskCallback = Optional.ofNullable(scheduledTaskCallback); } - public ScheduleAfterDebounceTime(ScheduledTaskFailureCallback errorScheduledTaskFailureCallback) { - this.scheduledTaskFailureCallback = errorScheduledTaskFailureCallback; + /** + * Performs the following operations in sequential order. + * <ul> + * <li> Makes best effort to cancel any existing task in task queue associated with the action.</li> + * <li> Schedules the incoming action for later execution and records its future.</li> + * </ul> + * + * @param actionName the name of scheduleable action. + * @param delayInMillis the time from now to delay execution. + * @param runnable the action to execute. + */ + public synchronized void scheduleAfterDebounceTime(String actionName, long delayInMillis, Runnable runnable) { + // 1. Try to cancel any existing scheduled task associated with the action. + tryCancelScheduledAction(actionName); + + // 2. Schedule the action. + ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(getScheduleableAction(actionName, runnable), delayInMillis, TimeUnit.MILLISECONDS); + + LOG.info("Scheduled action: {} to run after: {} milliseconds.", actionName, delayInMillis); + futureHandles.put(actionName, scheduledFuture); } - synchronized public void scheduleAfterDebounceTime(String actionName, long debounceTimeMs, Runnable runnable) { - // check if this action has been scheduled already - ScheduledFuture sf = futureHandles.get(actionName); - if (sf != null && !sf.isDone()) { - LOG.info("cancel future for " + actionName); - // attempt to cancel - if (!sf.cancel(false)) { + /** + * Stops the scheduler. After this invocation no further schedule calls will be accepted + * and all pending enqueued tasks will be cancelled. + */ + public synchronized void stopScheduler() { + scheduledExecutorService.shutdownNow(); + + // Clear the existing future handles. + futureHandles.clear(); + } + + /** + * Tries to cancel the task that belongs to {@code actionName} submitted to the queue. + * + * @param actionName the name of action to cancel. + */ + private void tryCancelScheduledAction(String actionName) { + ScheduledFuture scheduledFuture = futureHandles.get(actionName); + if (scheduledFuture != null && !scheduledFuture.isDone()) { + LOG.info("Attempting to cancel the future of action: {}", actionName); + // Attempt to cancel + if (!scheduledFuture.cancel(false)) { try { - sf.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + scheduledFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (Exception e) { // we ignore the exception - LOG.warn("cancel for action " + actionName + " failed with ", e); + LOG.warn("Cancelling the future of action: {} failed.", actionName, e); } } futureHandles.remove(actionName); } - // schedule a new task - sf = scheduledExecutorService.schedule(() -> { - try { - runnable.run(); - LOG.debug(actionName + " completed successfully."); - } catch (Throwable t) { - LOG.error(actionName + " threw an exception.", t); - if (scheduledTaskFailureCallback != null) { - scheduledTaskFailureCallback.onError(t); - } + } + + /** + * Decorate the executable action with exception handlers to facilitate cleanup on failures. + * + * @param actionName the name of the scheduleable action. + * @param runnable the action to execute. + * @return the executable action decorated with exception handlers. + */ + private Runnable getScheduleableAction(String actionName, Runnable runnable) { + return () -> { + try { + runnable.run(); + /* + * Expects all run() implementations <b>not to swallow the interrupts.</b> + * This thread is interrupted from an external source(mostly executor service) to die. + */ + if (Thread.currentThread().isInterrupted()) { + LOG.warn("Action: {} is interrupted.", actionName); + doCleanUpOnTaskException(new InterruptedException()); + } else { + LOG.debug("Action: {} completed successfully.", actionName); } - }, - debounceTimeMs, - TimeUnit.MILLISECONDS); - LOG.info("scheduled " + actionName + " in " + debounceTimeMs); - futureHandles.put(actionName, sf); + } catch (Exception exception) { + LOG.error("Execution of action: {} failed.", actionName, exception); + doCleanUpOnTaskException(exception); + } + }; } - public void stopScheduler() { - // shutdown executor service - scheduledExecutorService.shutdown(); + /** + * Handler method to invoke on a exception during an scheduled task execution and which + * the following operations in sequential order. + * <ul> + * <li> Stop the scheduler. If the task execution fails or a task is interrupted, scheduler will not accept/execute any new tasks.</li> + * <li> Invokes the onError handler method if taskCallback is defined.</li> + * </ul> + * + * @param exception the exception happened during task execution. + */ + private void doCleanUpOnTaskException(Exception exception) { + stopScheduler(); + + scheduledTaskCallback.ifPresent(callback -> callback.onError(exception)); } - interface ScheduledTaskFailureCallback { + /** + * A ScheduledTaskCallback::onError() is invoked on first occurrence of exception + * when executing a task. Provides plausible hook for handling failures + * in an asynchronous scheduled task execution. + */ + interface ScheduledTaskCallback { void onError(Throwable throwable); } } http://git-wip-us.apache.org/repos/asf/samza/blob/c45c7747/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 9f64b3a..2b8349c 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -61,6 +61,19 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { private static final int METADATA_CACHE_TTL_MS = 5000; private static final int NUM_VERSIONS_TO_LEAVE = 10; + // Action name when the JobModel version changes + private static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange"; + + // Action name when the Processor membership changes + private static final String ON_PROCESSOR_CHANGE = "OnProcessorChange"; + + /** + * Cleanup process is started after every new job model generation is complete. + * It deletes old versions of job model and the barrier. + * How many to delete (or to leave) is controlled by @see org.apache.samza.zk.ZkJobCoordinator#NUM_VERSIONS_TO_LEAVE. + **/ + private static final String ON_ZK_CLEANUP = "OnCleanUp"; + private final ZkUtils zkUtils; private final String processorId; private final ZkController zkController; @@ -95,7 +108,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { new ZkBarrierListenerImpl()); this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs(); this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId); - debounceTimer = new ScheduleAfterDebounceTime(throwable -> { + debounceTimer = new ScheduleAfterDebounceTime(); + debounceTimer.setScheduledTaskCallback(throwable -> { LOG.error("Received exception from in JobCoordinator Processing!", throwable); stop(); }); @@ -157,7 +171,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { @Override public void onProcessorChange(List<String> processors) { LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size()); - debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, debounceTimeMs, + debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> doOnProcessorChange(processors)); } @@ -195,12 +209,12 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { LOG.info("pid=" + processorId + "Published new Job Model. Version = " + nextJMVersion); - debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_ZK_CLEANUP, 0, () -> zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE)); + debounceTimer.scheduleAfterDebounceTime(ON_ZK_CLEANUP, 0, () -> zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE)); } @Override public void onNewJobModelAvailable(final String version) { - debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () -> + debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () -> { LOG.info("pid=" + processorId + "new JobModel available"); // get the new job model from ZK @@ -273,7 +287,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader!"); metrics.isLeader.set(true); zkController.subscribeToProcessorChange(); - debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, debounceTimeMs, () -> + debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> { // actual actions to do are the same as onProcessorChange doOnProcessorChange(new ArrayList<>()); @@ -312,7 +326,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { LOG.warn("Barrier for version " + version + " timed out."); if (zkController.isLeader()) { LOG.info("Leader will schedule a new job model generation"); - debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, debounceTimeMs, () -> + debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> { // actual actions to do are the same as onProcessorChange doOnProcessorChange(new ArrayList<>()); http://git-wip-us.apache.org/repos/asf/samza/blob/c45c7747/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java index d3152be..a681767 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java @@ -19,15 +19,23 @@ package org.apache.samza.zk; -import org.junit.Assert; -import org.junit.Test; - import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.rules.Timeout; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestScheduleAfterDebounceTime { + private static final Logger LOG = LoggerFactory.getLogger(TestScheduleAfterDebounceTime.class); + private static final long WAIT_TIME = 500; + @Rule + public Timeout testTimeOutInSeconds = new Timeout(10, TimeUnit.SECONDS); + class TestObj { private volatile int i = 0; public void inc() { @@ -91,8 +99,10 @@ public class TestScheduleAfterDebounceTime { @Test public void testRunnableWithExceptionInvokesCallback() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); - ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime(e -> { - Assert.assertEquals(RuntimeException.class, e.getClass()); + final Throwable[] taskCallbackException = new Exception[1]; + ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime(); + scheduledQueue.setScheduledTaskCallback(throwable -> { + taskCallbackException[0] = throwable; latch.countDown(); }); @@ -107,6 +117,60 @@ public class TestScheduleAfterDebounceTime { boolean result = latch.await(5 * WAIT_TIME, TimeUnit.MILLISECONDS); Assert.assertTrue("Latch timed-out.", result); Assert.assertEquals(0, testObj.get()); + Assert.assertEquals(RuntimeException.class, taskCallbackException[0].getClass()); + scheduledQueue.stopScheduler(); + } + + /** + * Validates if the interrupted exception triggered by ExecutorService is handled by ScheduleAfterDebounceTime. + */ + @Test + public void testStopSchedulerInvokesRegisteredCallback() throws InterruptedException { + final CountDownLatch hasTaskCallbackCompleted = new CountDownLatch(1); + final CountDownLatch hasThreadStarted = new CountDownLatch(1); + final CountDownLatch isSchedulerShutdownTriggered = new CountDownLatch(1); + + /** + * Declaring this as an array to record the value inside the lambda. + */ + final Throwable[] taskCallbackException = new Exception[1]; + + ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime(); + scheduledQueue.setScheduledTaskCallback(throwable -> { + /** + * Assertion failures in callback doesn't fail the test. + * Record the received exception here and assert outside + * the callback. + */ + taskCallbackException[0] = throwable; + hasTaskCallbackCompleted.countDown(); + }); + + scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME , () -> { + hasThreadStarted.countDown(); + try { + LOG.debug("Waiting for the scheduler shutdown trigger."); + isSchedulerShutdownTriggered.await(); + } catch (InterruptedException e) { + /** + * Don't swallow the exception and restore the interrupt status. + * Expect the ScheduleDebounceTime to handle this interrupt + * and invoke ScheduledTaskCallback. + */ + Thread.currentThread().interrupt(); + } + }); + + // Wait for the task to run. + hasThreadStarted.await(); + + // Shutdown the scheduler and update relevant state. scheduledQueue.stopScheduler(); + isSchedulerShutdownTriggered.countDown(); + + hasTaskCallbackCompleted.await(); + + // Assert on exception thrown. + Assert.assertEquals(InterruptedException.class, taskCallbackException[0].getClass()); } }
