Repository: aurora Updated Branches: refs/heads/master 2da17009c -> eae686023
Simplify TaskHistoryPruner tie-in to Lifecycle. This eliminates processing all futures to find the 1st failed one in favor of directly signalling a Service failure when a unit of async work fails. Testing Done: Locally green: `./gradlew -P build`. Bugs closed: AURORA-1582 Reviewed at https://reviews.apache.org/r/42639/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/eae68602 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/eae68602 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/eae68602 Branch: refs/heads/master Commit: eae6860232a1ce10615023d0833c269bb5e85356 Parents: 2da1700 Author: John Sirois <[email protected]> Authored: Fri Jan 22 14:50:54 2016 -0700 Committer: John Sirois <[email protected]> Committed: Fri Jan 22 14:50:54 2016 -0700 ---------------------------------------------------------------------- src/main/java/org/apache/aurora/GuavaUtils.java | 18 +++++++ .../scheduler/pruning/TaskHistoryPruner.java | 56 +++++++------------- .../aurora/LifecycleShutdownListenerTest.java | 17 +----- 3 files changed, 40 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/eae68602/src/main/java/org/apache/aurora/GuavaUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/GuavaUtils.java b/src/main/java/org/apache/aurora/GuavaUtils.java index 8c2ab57..7d569e0 100644 --- a/src/main/java/org/apache/aurora/GuavaUtils.java +++ b/src/main/java/org/apache/aurora/GuavaUtils.java @@ -20,6 +20,7 @@ import java.util.stream.Collector; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.AbstractService; import com.google.common.util.concurrent.Service; import com.google.common.util.concurrent.Service.State; import com.google.common.util.concurrent.ServiceManager; @@ -56,6 +57,23 @@ public final class GuavaUtils { } /** + * A Service that does nothing; useful for building passive services driven by an external + * event loop. + */ + public static class PassiveService extends AbstractService { + + @Override + protected void doStart() { + notifyStarted(); + } + + @Override + protected void doStop() { + notifyStopped(); + } + } + + /** * Collector to create a Guava ImmutableSet. */ public static <T> Collector<T, ?, ImmutableSet<T>> toImmutableSet() { http://git-wip-us.apache.org/repos/asf/aurora/blob/eae68602/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java index 2d4c58e..5441630 100644 --- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java +++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java @@ -14,9 +14,6 @@ package org.apache.aurora.scheduler.pruning; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; import javax.inject.Inject; @@ -25,10 +22,9 @@ import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.collect.Queues; import com.google.common.eventbus.Subscribe; -import com.google.common.util.concurrent.AbstractScheduledService; +import org.apache.aurora.GuavaUtils.PassiveService; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.util.Clock; @@ -56,7 +52,7 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; * Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks * transitioning into one of the inactive states. */ -public class TaskHistoryPruner extends AbstractScheduledService implements EventSubscriber { +public class TaskHistoryPruner extends PassiveService implements EventSubscriber { private static final Logger LOG = LoggerFactory.getLogger(TaskHistoryPruner.class); private final DelayExecutor executor; @@ -64,7 +60,6 @@ public class TaskHistoryPruner extends AbstractScheduledService implements Event private final Clock clock; private final HistoryPrunnerSettings settings; private final Storage storage; - private final ConcurrentLinkedQueue<FutureTask<Void>> futureTasks; private final Predicate<IScheduledTask> safeToDelete = new Predicate<IScheduledTask>() { @Override @@ -103,7 +98,6 @@ public class TaskHistoryPruner extends AbstractScheduledService implements Event this.clock = requireNonNull(clock); this.settings = requireNonNull(settings); this.storage = requireNonNull(storage); - this.futureTasks = Queues.newConcurrentLinkedQueue(); } @VisibleForTesting @@ -133,22 +127,6 @@ public class TaskHistoryPruner extends AbstractScheduledService implements Event } } - @Override - protected void runOneIteration() throws Exception { - // Check if the prune attempts fail and propagate the exception. This will trigger - // service (and the scheduler) to shut down. - FutureTask<Void> future; - - while ((future = futureTasks.poll()) != null) { - future.get(); - } - } - - @Override - protected Scheduler scheduler() { - return AbstractScheduledService.Scheduler.newFixedDelaySchedule(0, 5, TimeUnit.SECONDS); - } - private void deleteTasks(final Set<String> taskIds) { LOG.info("Pruning inactive tasks " + taskIds); storage.write( @@ -160,6 +138,16 @@ public class TaskHistoryPruner extends AbstractScheduledService implements Event return Query.jobScoped(jobKey).byStatus(apiConstants.TERMINAL_STATES); } + private Runnable failureNotifyingRunnable(Runnable runnable) { + return () -> { + try { + runnable.run(); + } catch (Throwable t) { + notifyFailed(t); + } + }; + } + private void registerInactiveTask( final IJobKey jobKey, final String taskId, @@ -167,15 +155,14 @@ public class TaskHistoryPruner extends AbstractScheduledService implements Event LOG.debug("Prune task " + taskId + " in " + timeRemaining + " ms."); - FutureTask<Void> pruneSingleTask = new FutureTask<>(() -> { - LOG.info("Pruning expired inactive task " + taskId); - deleteTasks(ImmutableSet.of(taskId)); - }, null); - futureTasks.add(pruneSingleTask); + executor.execute( + failureNotifyingRunnable(() -> { + LOG.info("Pruning expired inactive task " + taskId); + deleteTasks(ImmutableSet.of(taskId)); + }), + Amount.of(timeRemaining, Time.MILLISECONDS)); - executor.execute(pruneSingleTask, Amount.of(timeRemaining, Time.MILLISECONDS)); - - FutureTask<Void> pruneRemainingTasksFromJob = new FutureTask<>(() -> { + executor.execute(failureNotifyingRunnable(() -> { Iterable<IScheduledTask> inactiveTasks = Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey)); int numInactiveTasks = Iterables.size(inactiveTasks); @@ -191,9 +178,6 @@ public class TaskHistoryPruner extends AbstractScheduledService implements Event deleteTasks(toPrune); } } - }, null); - futureTasks.add(pruneRemainingTasksFromJob); - - executor.execute(pruneRemainingTasksFromJob); + })); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/eae68602/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java b/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java index 8d19c04..d91dc27 100644 --- a/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java +++ b/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java @@ -13,11 +13,10 @@ */ package org.apache.aurora; -import com.google.common.util.concurrent.AbstractService; -import com.google.common.util.concurrent.Service; import com.google.common.util.concurrent.ServiceManager; import org.apache.aurora.GuavaUtils.LifecycleShutdownListener; +import org.apache.aurora.GuavaUtils.PassiveService; import org.apache.aurora.common.application.Lifecycle; import org.apache.aurora.common.base.Command; import org.apache.aurora.common.testing.easymock.EasyMockTest; @@ -26,18 +25,6 @@ import org.junit.Test; public class LifecycleShutdownListenerTest extends EasyMockTest { - private static final Service NOOP_SERVICE = new AbstractService() { - @Override - protected void doStart() { - // Noop. - } - - @Override - protected void doStop() { - // Noop. - } - }; - private Command shutdown; private ServiceManager.Listener listener; @@ -53,6 +40,6 @@ public class LifecycleShutdownListenerTest extends EasyMockTest { control.replay(); - listener.failure(NOOP_SERVICE); + listener.failure(new PassiveService()); } }
