Repository: aurora Updated Branches: refs/heads/master 39815a32d -> 9e77a6f37
`TaskHistoryPruner` controls Lifecycle directly. This was the original idea in https://reviews.apache.org/r/42332. Mixing the active scheduler `Service` lifecycle with the `EventBus` lifecycle proves tricky - prune events are fired before scheduler active services are started. Instead of queueing up prune events to wait for service start or re-engineering service / event bus interaction, returns to the orignal behavior, manipulating the `Lifecycle` directly. Also kill a confusing unused EventSink discovered during analyis of all pub-sub event sourcing that might interact with the `TaskHistoryPruner`. Testing Done: Locally green: ``` ./gradlew -Pq build ./src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh ``` It's the latter - e2e (krb part) - that was the only automated testing revealing the problem previously. Bugs closed: AURORA-1593 Reviewed at https://reviews.apache.org/r/42801/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/9e77a6f3 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/9e77a6f3 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/9e77a6f3 Branch: refs/heads/master Commit: 9e77a6f371dd7c7f527452339b9030079a8644c1 Parents: 39815a3 Author: John Sirois <[email protected]> Authored: Tue Jan 26 11:29:48 2016 -0700 Committer: John Sirois <[email protected]> Committed: Tue Jan 26 11:29:48 2016 -0700 ---------------------------------------------------------------------- src/main/java/org/apache/aurora/GuavaUtils.java | 18 --------------- .../aurora/scheduler/SchedulerLifecycle.java | 5 ---- .../aurora/scheduler/pruning/PruningModule.java | 1 - .../scheduler/pruning/TaskHistoryPruner.java | 24 +++++++++++--------- .../aurora/LifecycleShutdownListenerTest.java | 17 ++++++++++++-- .../scheduler/SchedulerLifecycleTest.java | 4 ---- .../pruning/TaskHistoryPrunerTest.java | 24 +++++++------------- 7 files changed, 36 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/9e77a6f3/src/main/java/org/apache/aurora/GuavaUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/GuavaUtils.java b/src/main/java/org/apache/aurora/GuavaUtils.java index 7d569e0..8c2ab57 100644 --- a/src/main/java/org/apache/aurora/GuavaUtils.java +++ b/src/main/java/org/apache/aurora/GuavaUtils.java @@ -20,7 +20,6 @@ import java.util.stream.Collector; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; -import com.google.common.util.concurrent.AbstractService; import com.google.common.util.concurrent.Service; import com.google.common.util.concurrent.Service.State; import com.google.common.util.concurrent.ServiceManager; @@ -57,23 +56,6 @@ public final class GuavaUtils { } /** - * A Service that does nothing; useful for building passive services driven by an external - * event loop. - */ - public static class PassiveService extends AbstractService { - - @Override - protected void doStart() { - notifyStarted(); - } - - @Override - protected void doStop() { - notifyStopped(); - } - } - - /** * Collector to create a Guava ImmutableSet. */ public static <T> Collector<T, ?, ImmutableSet<T>> toImmutableSet() { http://git-wip-us.apache.org/repos/asf/aurora/blob/9e77a6f3/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java index 5ba5e73..b15540c 100644 --- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java +++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java @@ -51,7 +51,6 @@ import org.apache.aurora.common.util.StateMachine.Transition; import org.apache.aurora.common.zookeeper.Group.JoinException; import org.apache.aurora.common.zookeeper.ServerSet; import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl; -import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.mesos.Driver; @@ -118,7 +117,6 @@ public class SchedulerLifecycle implements EventSubscriber { Driver driver, LeadingOptions leadingOptions, ScheduledExecutorService executorService, - EventSink eventSink, ShutdownRegistry shutdownRegistry, StatsProvider statsProvider, @SchedulerActive ServiceManagerIface schedulerActiveServiceManager) { @@ -128,7 +126,6 @@ public class SchedulerLifecycle implements EventSubscriber { lifecycle, driver, new DefaultDelayedActions(leadingOptions, executorService), - eventSink, shutdownRegistry, statsProvider, schedulerActiveServiceManager); @@ -189,7 +186,6 @@ public class SchedulerLifecycle implements EventSubscriber { final Lifecycle lifecycle, final Driver driver, final DelayedActions delayedActions, - final EventSink eventSink, final ShutdownRegistry shutdownRegistry, StatsProvider statsProvider, final ServiceManagerIface schedulerActiveServiceManager) { @@ -198,7 +194,6 @@ public class SchedulerLifecycle implements EventSubscriber { requireNonNull(lifecycle); requireNonNull(driver); requireNonNull(delayedActions); - requireNonNull(eventSink); requireNonNull(shutdownRegistry); statsProvider.makeGauge( http://git-wip-us.apache.org/repos/asf/aurora/blob/9e77a6f3/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java b/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java index efdfbda..735199a 100644 --- a/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java +++ b/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java @@ -83,7 +83,6 @@ public class PruningModule extends AbstractModule { expose(TaskHistoryPruner.class); } }); - SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskHistoryPruner.class); PubsubEventModule.bindSubscriber(binder(), TaskHistoryPruner.class); install(new PrivateModule() { http://git-wip-us.apache.org/repos/asf/aurora/blob/9e77a6f3/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java index 5441630..22753b4 100644 --- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java +++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java @@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.eventbus.Subscribe; -import org.apache.aurora.GuavaUtils.PassiveService; +import org.apache.aurora.common.application.Lifecycle; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.util.Clock; @@ -43,8 +43,6 @@ import org.slf4j.LoggerFactory; import static java.util.Objects.requireNonNull; -import static com.google.common.base.Preconditions.checkState; - import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; @@ -52,7 +50,7 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; * Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks * transitioning into one of the inactive states. */ -public class TaskHistoryPruner extends PassiveService implements EventSubscriber { +public class TaskHistoryPruner implements EventSubscriber { private static final Logger LOG = LoggerFactory.getLogger(TaskHistoryPruner.class); private final DelayExecutor executor; @@ -60,6 +58,7 @@ public class TaskHistoryPruner extends PassiveService implements EventSubscriber private final Clock clock; private final HistoryPrunnerSettings settings; private final Storage storage; + private final Lifecycle lifecycle; private final Predicate<IScheduledTask> safeToDelete = new Predicate<IScheduledTask>() { @Override @@ -91,13 +90,15 @@ public class TaskHistoryPruner extends PassiveService implements EventSubscriber StateManager stateManager, Clock clock, HistoryPrunnerSettings settings, - Storage storage) { + Storage storage, + Lifecycle lifecycle) { this.executor = requireNonNull(executor); this.stateManager = requireNonNull(stateManager); this.clock = requireNonNull(clock); this.settings = requireNonNull(settings); this.storage = requireNonNull(storage); + this.lifecycle = requireNonNull(lifecycle); } @VisibleForTesting @@ -114,8 +115,6 @@ public class TaskHistoryPruner extends PassiveService implements EventSubscriber */ @Subscribe public void recordStateChange(TaskStateChange change) { - checkState(isRunning()); - if (Tasks.isTerminated(change.getNewState())) { long timeoutBasis = change.isTransition() ? clock.nowMillis() @@ -138,12 +137,15 @@ public class TaskHistoryPruner extends PassiveService implements EventSubscriber return Query.jobScoped(jobKey).byStatus(apiConstants.TERMINAL_STATES); } - private Runnable failureNotifyingRunnable(Runnable runnable) { + private Runnable shutdownOnError(String subject, Runnable runnable) { return () -> { try { runnable.run(); } catch (Throwable t) { - notifyFailed(t); + LOG.error( + "Unexpected problem pruning task history for " + subject + ". Triggering shutdown", + t); + lifecycle.shutdown(); } }; } @@ -156,13 +158,13 @@ public class TaskHistoryPruner extends PassiveService implements EventSubscriber LOG.debug("Prune task " + taskId + " in " + timeRemaining + " ms."); executor.execute( - failureNotifyingRunnable(() -> { + shutdownOnError("task: " + taskId, () -> { LOG.info("Pruning expired inactive task " + taskId); deleteTasks(ImmutableSet.of(taskId)); }), Amount.of(timeRemaining, Time.MILLISECONDS)); - executor.execute(failureNotifyingRunnable(() -> { + executor.execute(shutdownOnError("job: " + jobKey, () -> { Iterable<IScheduledTask> inactiveTasks = Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey)); int numInactiveTasks = Iterables.size(inactiveTasks); http://git-wip-us.apache.org/repos/asf/aurora/blob/9e77a6f3/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java b/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java index d91dc27..8d19c04 100644 --- a/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java +++ b/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java @@ -13,10 +13,11 @@ */ package org.apache.aurora; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.Service; import com.google.common.util.concurrent.ServiceManager; import org.apache.aurora.GuavaUtils.LifecycleShutdownListener; -import org.apache.aurora.GuavaUtils.PassiveService; import org.apache.aurora.common.application.Lifecycle; import org.apache.aurora.common.base.Command; import org.apache.aurora.common.testing.easymock.EasyMockTest; @@ -25,6 +26,18 @@ import org.junit.Test; public class LifecycleShutdownListenerTest extends EasyMockTest { + private static final Service NOOP_SERVICE = new AbstractService() { + @Override + protected void doStart() { + // Noop. + } + + @Override + protected void doStop() { + // Noop. + } + }; + private Command shutdown; private ServiceManager.Listener listener; @@ -40,6 +53,6 @@ public class LifecycleShutdownListenerTest extends EasyMockTest { control.replay(); - listener.failure(new PassiveService()); + listener.failure(NOOP_SERVICE); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/9e77a6f3/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java index bab4567..e225ae5 100644 --- a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java @@ -24,7 +24,6 @@ import org.apache.aurora.common.testing.easymock.EasyMockTest; import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl; import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener; import org.apache.aurora.scheduler.SchedulerLifecycle.DelayedActions; -import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered; import org.apache.aurora.scheduler.mesos.Driver; import org.apache.aurora.scheduler.storage.Storage.StorageException; @@ -49,7 +48,6 @@ public class SchedulerLifecycleTest extends EasyMockTest { private Driver driver; private LeaderControl leaderControl; private DelayedActions delayedActions; - private EventSink eventSink; private FakeStatsProvider statsProvider; private ServiceManagerIface serviceManager; @@ -62,7 +60,6 @@ public class SchedulerLifecycleTest extends EasyMockTest { driver = createMock(Driver.class); leaderControl = createMock(LeaderControl.class); delayedActions = createMock(DelayedActions.class); - eventSink = createMock(EventSink.class); statsProvider = new FakeStatsProvider(); serviceManager = createMock(ServiceManagerIface.class); } @@ -84,7 +81,6 @@ public class SchedulerLifecycleTest extends EasyMockTest { new Lifecycle(shutdownRegistry), driver, delayedActions, - eventSink, shutdownRegistry, statsProvider, serviceManager); http://git-wip-us.apache.org/repos/asf/aurora/blob/9e77a6f3/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java index ffeee1b..99c27e8 100644 --- a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java @@ -18,8 +18,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.io.Closer; -import com.google.common.util.concurrent.Service; +import org.apache.aurora.common.application.Lifecycle; +import org.apache.aurora.common.base.Command; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.testing.easymock.EasyMockTest; @@ -49,9 +50,6 @@ import static org.apache.aurora.gen.ScheduleStatus.RUNNING; import static org.apache.aurora.gen.ScheduleStatus.STARTING; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expectLastCall; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; public class TaskHistoryPrunerTest extends EasyMockTest { private static final String SLAVE_HOST = "HOST_A"; @@ -67,6 +65,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest { private StorageTestUtil storageUtil; private TaskHistoryPruner pruner; private Closer closer; + private Command shutdownCommand; @Before public void setUp() { @@ -75,15 +74,15 @@ public class TaskHistoryPrunerTest extends EasyMockTest { stateManager = createMock(StateManager.class); storageUtil = new StorageTestUtil(this); storageUtil.expectOperations(); + shutdownCommand = createMock(Command.class); pruner = new TaskHistoryPruner( executor, stateManager, clock, new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY), - storageUtil.storage); + storageUtil.storage, + new Lifecycle(shutdownCommand)); closer = Closer.create(); - - pruner.startAsync().awaitRunning(); } @After @@ -242,20 +241,13 @@ public class TaskHistoryPrunerTest extends EasyMockTest { expectDeleteTasks("a"); expectLastCall().andThrow(new RuntimeException("oops")); + shutdownCommand.execute(); + control.replay(); changeState(running, killed); clock.advance(ONE_HOUR); delayedDelete.getValue().run(); - // awaitTerminated throws an IllegalStateException if the service fails - try { - pruner.awaitTerminated(); - fail(); - } catch (IllegalStateException e) { - assertEquals(Service.State.FAILED, pruner.state()); - } - - assertNotNull(pruner.failureCause()); } private void expectDeleteTasks(String... tasks) {
