Repository: aurora Updated Branches: refs/heads/master c5f94e05f -> 72bf8dbd2
Log and terminate scheduler on updater thread failure. Bugs closed: AURORA-1630 Reviewed at https://reviews.apache.org/r/44493/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/72bf8dbd Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/72bf8dbd Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/72bf8dbd Branch: refs/heads/master Commit: 72bf8dbd20221806806e964870f6f592fb6713f8 Parents: c5f94e0 Author: Maxim Khutornenko <[email protected]> Authored: Thu Mar 10 12:32:38 2016 -0800 Committer: Maxim Khutornenko <[email protected]> Committed: Thu Mar 10 12:32:38 2016 -0800 ---------------------------------------------------------------------- .../apache/aurora/scheduler/base/AsyncUtil.java | 31 ++++++++ .../scheduler/pruning/TaskHistoryPruner.java | 67 ++++++++-------- .../updater/JobUpdateControllerImpl.java | 82 ++++++++++++-------- .../aurora/scheduler/updater/UpdaterModule.java | 9 ++- .../aurora/scheduler/updater/JobUpdaterIT.java | 41 ++++++++++ 5 files changed, 159 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/72bf8dbd/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java b/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java index 80dc35e..474b6e0 100644 --- a/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java +++ b/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.aurora.common.application.Lifecycle; import org.apache.aurora.common.stats.Stats; import org.slf4j.Logger; @@ -112,6 +113,36 @@ public final class AsyncUtil { }; } + /** + * Helper wrapper to call the provided {@link Lifecycle} on unhandled error. + * + * @param lifecycle {@link Lifecycle} instance. + * @param logger Logger instance. + * @param message message to log. + * @param runnable {@link Runnable} to wrap. + * @return A new {@link Runnable} logging an error and calling {@link Lifecycle#shutdown()}. + */ + public static Runnable shutdownOnError( + Lifecycle lifecycle, + Logger logger, + String message, + Runnable runnable) { + + requireNonNull(lifecycle); + requireNonNull(logger); + requireNonNull(message); + requireNonNull(runnable); + + return () -> { + try { + runnable.run(); + } catch (Throwable t) { + logger.error(message, t); + lifecycle.shutdown(); + } + }; + } + private static void evaluateResult(Runnable runnable, Throwable throwable, Logger logger) { // See java.util.concurrent.ThreadPoolExecutor#afterExecute(Runnable, Throwable) // for more details and an implementation example. http://git-wip-us.apache.org/repos/asf/aurora/blob/72bf8dbd/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java index 22753b4..f07746c 100644 --- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java +++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java @@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory; import static java.util.Objects.requireNonNull; +import static org.apache.aurora.scheduler.base.AsyncUtil.shutdownOnError; import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; @@ -52,6 +53,8 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; */ public class TaskHistoryPruner implements EventSubscriber { private static final Logger LOG = LoggerFactory.getLogger(TaskHistoryPruner.class); + private static final String FATAL_ERROR_FORMAT = + "Unexpected problem pruning task history for %s. Triggering shutdown"; private final DelayExecutor executor; private final StateManager stateManager; @@ -137,19 +140,6 @@ public class TaskHistoryPruner implements EventSubscriber { return Query.jobScoped(jobKey).byStatus(apiConstants.TERMINAL_STATES); } - private Runnable shutdownOnError(String subject, Runnable runnable) { - return () -> { - try { - runnable.run(); - } catch (Throwable t) { - LOG.error( - "Unexpected problem pruning task history for " + subject + ". Triggering shutdown", - t); - lifecycle.shutdown(); - } - }; - } - private void registerInactiveTask( final IJobKey jobKey, final String taskId, @@ -158,28 +148,37 @@ public class TaskHistoryPruner implements EventSubscriber { LOG.debug("Prune task " + taskId + " in " + timeRemaining + " ms."); executor.execute( - shutdownOnError("task: " + taskId, () -> { - LOG.info("Pruning expired inactive task " + taskId); - deleteTasks(ImmutableSet.of(taskId)); - }), + shutdownOnError( + lifecycle, + LOG, + String.format(FATAL_ERROR_FORMAT, "task: " + taskId), + () -> { + LOG.info("Pruning expired inactive task " + taskId); + deleteTasks(ImmutableSet.of(taskId)); + }), Amount.of(timeRemaining, Time.MILLISECONDS)); - executor.execute(shutdownOnError("job: " + jobKey, () -> { - Iterable<IScheduledTask> inactiveTasks = - Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey)); - int numInactiveTasks = Iterables.size(inactiveTasks); - int tasksToPrune = numInactiveTasks - settings.perJobHistoryGoal; - if (tasksToPrune > 0 && numInactiveTasks > settings.perJobHistoryGoal) { - Set<String> toPrune = FluentIterable - .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks)) - .filter(safeToDelete) - .limit(tasksToPrune) - .transform(Tasks::id) - .toSet(); - if (!toPrune.isEmpty()) { - deleteTasks(toPrune); - } - } - })); + executor.execute( + shutdownOnError( + lifecycle, + LOG, + String.format(FATAL_ERROR_FORMAT, "job: " + jobKey), + () -> { + Iterable<IScheduledTask> inactiveTasks = + Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey)); + int numInactiveTasks = Iterables.size(inactiveTasks); + int tasksToPrune = numInactiveTasks - settings.perJobHistoryGoal; + if (tasksToPrune > 0 && numInactiveTasks > settings.perJobHistoryGoal) { + Set<String> toPrune = FluentIterable + .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks)) + .filter(safeToDelete) + .limit(tasksToPrune) + .transform(Tasks::id) + .toSet(); + if (!toPrune.isEmpty()) { + deleteTasks(toPrune); + } + } + })); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/72bf8dbd/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java index 48d7e2a..364c5c7 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java @@ -31,6 +31,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.inject.Inject; +import org.apache.aurora.common.application.Lifecycle; import org.apache.aurora.common.collections.Pair; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; @@ -81,6 +82,7 @@ import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_FORWARD; import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK; import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD; import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE; +import static org.apache.aurora.scheduler.base.AsyncUtil.shutdownOnError; import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.ACTIVE_QUERY; import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.AUTO_RESUME_STATES; @@ -107,6 +109,8 @@ import static org.apache.aurora.scheduler.updater.SideEffect.InstanceUpdateStatu */ class JobUpdateControllerImpl implements JobUpdateController { private static final Logger LOG = LoggerFactory.getLogger(JobUpdateControllerImpl.class); + private static final String FATAL_ERROR_FORMAT = + "Unexpected problem running asynchronous updater for: %s. Triggering shutdown"; private final UpdateFactory updateFactory; private final LockManager lockManager; @@ -115,6 +119,7 @@ class JobUpdateControllerImpl implements JobUpdateController { private final StateManager stateManager; private final Clock clock; private final PulseHandler pulseHandler; + private final Lifecycle lifecycle; // Currently-active updaters. An active updater is one that is rolling forward or back. Paused // and completed updates are represented only in storage, not here. @@ -128,7 +133,8 @@ class JobUpdateControllerImpl implements JobUpdateController { Storage storage, ScheduledExecutorService executor, StateManager stateManager, - Clock clock) { + Clock clock, + Lifecycle lifecycle) { this.updateFactory = requireNonNull(updateFactory); this.lockManager = requireNonNull(lockManager); @@ -136,6 +142,7 @@ class JobUpdateControllerImpl implements JobUpdateController { this.executor = requireNonNull(executor); this.stateManager = requireNonNull(stateManager); this.clock = requireNonNull(clock); + this.lifecycle = requireNonNull(lifecycle); this.pulseHandler = new PulseHandler(clock); } @@ -289,15 +296,19 @@ class JobUpdateControllerImpl implements JobUpdateController { if (JobUpdateStateMachine.isAwaitingPulse(state.getStatus())) { // Attempt to unblock a job update previously blocked on expired pulse. - executor.execute(() -> { - try { - unscopedChangeUpdateStatus( - key, - status -> new JobUpdateEvent().setStatus(GET_UNBLOCKED_STATE.apply(status))); - } catch (UpdateStateException e) { - LOG.error("Error while processing job update pulse: " + e); - } - }); + executor.execute(shutdownOnError( + lifecycle, + LOG, + String.format(FATAL_ERROR_FORMAT, key), + () -> { + try { + unscopedChangeUpdateStatus( + key, + status -> new JobUpdateEvent().setStatus(GET_UNBLOCKED_STATE.apply(status))); + } catch (UpdateStateException e) { + LOG.error(String.format("Error processing job update pulse for %s: %s", key, e)); + } + })); } return JobUpdatePulseStatus.OK; @@ -694,29 +705,34 @@ class JobUpdateControllerImpl implements JobUpdateController { } private Runnable getDeferredEvaluator(final IInstanceKey instance, final IJobUpdateKey key) { - return () -> storage.write((NoResult.Quiet) storeProvider -> { - IJobUpdateSummary summary = - getOnlyMatch(storeProvider.getJobUpdateStore(), queryByUpdate(key)); - JobUpdateStatus status = summary.getState().getStatus(); - // Suppress this evaluation if the updater is not currently active. - if (JobUpdateStateMachine.isActive(status)) { - UpdateFactory.Update update = updates.get(instance.getJobKey()); - try { - evaluateUpdater( - storeProvider, - update, - summary, - ImmutableMap.of( - instance.getInstanceId(), - getActiveInstance( - storeProvider.getTaskStore(), - instance.getJobKey(), - instance.getInstanceId()))); - } catch (UpdateStateException e) { - throw Throwables.propagate(e); - } - } - }); + return shutdownOnError( + lifecycle, + LOG, + String.format(FATAL_ERROR_FORMAT, "Key: " + key + " Instance key: " + instance), + () -> storage.write((NoResult.Quiet) storeProvider -> { + IJobUpdateSummary summary = + getOnlyMatch(storeProvider.getJobUpdateStore(), queryByUpdate(key)); + JobUpdateStatus status = summary.getState().getStatus(); + // Suppress this evaluation if the updater is not currently active. + if (JobUpdateStateMachine.isActive(status)) { + UpdateFactory.Update update = updates.get(instance.getJobKey()); + try { + evaluateUpdater( + storeProvider, + update, + summary, + ImmutableMap.of( + instance.getInstanceId(), + getActiveInstance( + storeProvider.getTaskStore(), + instance.getJobKey(), + instance.getInstanceId()))); + } catch (UpdateStateException e) { + LOG.error(String.format("Error running deferred evaluation for %s: %s", instance, e)); + Throwables.propagate(e); + } + } + })); } private static class PulseHandler { http://git-wip-us.apache.org/repos/asf/aurora/blob/72bf8dbd/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java index c0472d7..13cbdad 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java @@ -14,29 +14,30 @@ package org.apache.aurora.scheduler.updater; import java.util.Objects; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import javax.inject.Singleton; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.AbstractModule; import com.google.inject.PrivateModule; import org.apache.aurora.scheduler.SchedulerServicesModule; +import org.apache.aurora.scheduler.base.AsyncUtil; import org.apache.aurora.scheduler.events.PubsubEventModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Binding module for scheduling logic and higher-level state management. */ public class UpdaterModule extends AbstractModule { + private static final Logger LOG = LoggerFactory.getLogger(UpdaterModule.class); private final ScheduledExecutorService executor; public UpdaterModule() { - this(Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("updater-%d").build())); + this(AsyncUtil.singleThreadLoggingScheduledExecutor("updater-%d", LOG)); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/aurora/blob/72bf8dbd/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java index b39e388..cc88915 100644 --- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java +++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java @@ -34,6 +34,8 @@ import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; +import org.apache.aurora.common.application.Lifecycle; +import org.apache.aurora.common.base.Command; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.stats.Stats; @@ -149,6 +151,7 @@ public class JobUpdaterIT extends EasyMockTest { private LockManager lockManager; private StateManager stateManager; private JobUpdateEventSubscriber subscriber; + private Command shutdownCommand; private static ITaskConfig setExecutorData(ITaskConfig task, String executorData) { TaskConfig builder = task.newBuilder(); @@ -163,6 +166,7 @@ public class JobUpdaterIT extends EasyMockTest { ScheduledExecutorService executor = createMock(ScheduledExecutorService.class); clock = FakeScheduledExecutor.scheduleExecutor(executor); driver = createMock(Driver.class); + shutdownCommand = createMock(Command.class); eventBus = new EventBus(); Injector injector = Guice.createInjector( @@ -186,6 +190,7 @@ public class JobUpdaterIT extends EasyMockTest { bind(EventSink.class).toInstance(eventBus::post); bind(LockManager.class).to(LockManagerImpl.class); bind(UUIDGenerator.class).to(UUIDGeneratorImpl.class); + bind(Lifecycle.class).toInstance(new Lifecycle(shutdownCommand)); } }); updater = injector.getInstance(JobUpdateController.class); @@ -668,6 +673,42 @@ public class JobUpdaterIT extends EasyMockTest { updater.pulse(IJobUpdateKey.build(new JobUpdateKey(JOB.newBuilder(), "invalid")))); } + @Test(expected = IllegalStateException.class) + public void testShutdownOnFailedPulse() throws Exception { + // Missing kill expectation will trigger failure. + shutdownCommand.execute(); + expectLastCall().andAnswer(() -> { + storage.write((NoResult.Quiet) storeProvider -> releaseAllLocks()); + throw new IllegalStateException("Expected shutdown triggered."); + }); + + control.replay(); + + JobUpdate builder = makeJobUpdate( + // No-op - task is already matching the new config. + makeInstanceConfig(0, 0, NEW_CONFIG), + // Tasks needing update. + makeInstanceConfig(1, 2, OLD_CONFIG)).newBuilder(); + + builder.getInstructions().getSettings().setBlockIfNoPulsesAfterMs((int) PULSE_TIMEOUT_MS); + insertInitialTasks(IJobUpdate.build(builder)); + + changeState(JOB, 0, ASSIGNED, STARTING, RUNNING); + changeState(JOB, 1, ASSIGNED, STARTING, RUNNING); + changeState(JOB, 2, ASSIGNED, STARTING, RUNNING); + clock.advance(WATCH_TIMEOUT); + + ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder(); + updater.start(IJobUpdate.build(builder), AUDIT); + + // The update is blocked initially waiting for a pulse. + assertState(ROLL_FORWARD_AWAITING_PULSE, actions.build()); + + // Pulse arrives and update starts. + assertEquals(JobUpdatePulseStatus.OK, updater.pulse(UPDATE_ID)); + changeState(JOB, 1, KILLED, ASSIGNED, STARTING, RUNNING); + } + @Test public void testSuccessfulBatchedUpdate() throws Exception { expectTaskKilled().times(3);
