Repository: aurora Updated Branches: refs/heads/master 4b9c759cf -> a7b95d95b
Making preemptor asynchronous. Part 3(final) - background service. Bugs closed: AURORA-1158 Reviewed at https://reviews.apache.org/r/32352/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/a7b95d95 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/a7b95d95 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/a7b95d95 Branch: refs/heads/master Commit: a7b95d95bf99b2698ec5ca89f504c825d10b1754 Parents: 4b9c759 Author: Maxim Khutornenko <[email protected]> Authored: Thu Apr 2 14:29:39 2015 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Thu Apr 2 14:29:39 2015 -0700 ---------------------------------------------------------------------- .../aurora/benchmark/SchedulingBenchmarks.java | 10 +- .../aurora/scheduler/async/TaskScheduler.java | 52 ++--- .../async/preemptor/PendingTaskProcessor.java | 147 ++++++++++++ .../async/preemptor/PreemptionSlotCache.java | 2 +- .../async/preemptor/PreemptionSlotFinder.java | 2 + .../scheduler/async/preemptor/Preemptor.java | 90 +++++++- .../async/preemptor/PreemptorImpl.java | 226 ------------------- .../async/preemptor/PreemptorMetrics.java | 10 +- .../async/preemptor/PreemptorModule.java | 84 +++++-- .../scheduler/filter/AttributeAggregate.java | 26 +++ .../scheduler/async/TaskSchedulerImplTest.java | 32 +-- .../scheduler/async/TaskSchedulerTest.java | 19 +- .../preemptor/PendingTaskProcessorTest.java | 169 ++++++++++++++ .../async/preemptor/PreemptorImplTest.java | 159 +++---------- .../async/preemptor/PreemptorModuleTest.java | 12 +- 15 files changed, 592 insertions(+), 448 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java index 5309e81..75a67dc 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java @@ -79,6 +79,8 @@ public class SchedulingBenchmarks { */ @State(Scope.Thread) public abstract static class AbstractBase { + private static final Amount<Long, Time> NO_DELAY = Amount.of(0L, Time.MILLISECONDS); + private static final Amount<Long, Time> DELAY_FOREVER = Amount.of(30L, Time.DAYS); protected Storage storage; protected Preemptor preemptor; protected ScheduledThreadPoolExecutor executor; @@ -105,7 +107,7 @@ public class SchedulingBenchmarks { // TODO(maxim): Find a way to DRY it and reuse existing modules instead. Injector injector = Guice.createInjector( new StateModule(), - new PreemptorModule(true, Amount.of(0L, Time.MILLISECONDS), executor), + new PreemptorModule(true, NO_DELAY, NO_DELAY), new PrivateModule() { @Override protected void configure() { @@ -116,7 +118,7 @@ public class SchedulingBenchmarks { new OfferManager.OfferReturnDelay() { @Override public Amount<Long, Time> get() { - return Amount.of(30L, Time.DAYS); + return DELAY_FOREVER; } }); @@ -128,7 +130,7 @@ public class SchedulingBenchmarks { protected void configure() { bind(new TypeLiteral<Amount<Long, Time>>() { }) .annotatedWith(ReservationDuration.class) - .toInstance(Amount.of(30L, Time.DAYS)); + .toInstance(DELAY_FOREVER); bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class); bind(TaskScheduler.TaskSchedulerImpl.class).in(Singleton.class); bind(TaskIdGenerator.class).to(TaskIdGenerator.TaskIdGeneratorImpl.class); @@ -333,7 +335,7 @@ public class SchedulingBenchmarks { new AttributeAggregate(taskSupplier, storeProvider.getAttributeStore()); Optional<String> result = - preemptor.attemptPreemptionFor(assignedTask.getTaskId(), aggregate); + preemptor.attemptPreemptionFor(assignedTask, aggregate, storeProvider); while (executor.getActiveCount() > 0) { // Using a tight loop to wait for a search completion. This is executed on a benchmark http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java index 1b9d741..ebc520e 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java +++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java @@ -27,11 +27,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; import com.google.common.base.Ticker; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.eventbus.Subscribe; import com.twitter.common.inject.TimedInterceptor.Timed; @@ -56,9 +54,7 @@ import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.Storage.MutateWork; -import org.apache.aurora.scheduler.storage.Storage.StoreProvider; -import org.apache.aurora.scheduler.storage.entities.IJobKey; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.mesos.Protos.SlaveID; @@ -164,25 +160,6 @@ public interface TaskScheduler extends EventSubscriber { static final Optional<String> LAUNCH_FAILED_MSG = Optional.of("Unknown exception attempting to schedule task."); - @VisibleForTesting - static Query.Builder activeJobStateQuery(IJobKey jobKey) { - return Query.jobScoped(jobKey).byStatus(Tasks.SLAVE_ASSIGNED_STATES); - } - - private AttributeAggregate getJobState( - final StoreProvider storeProvider, - final IJobKey jobKey) { - - Supplier<ImmutableSet<IScheduledTask>> taskSupplier = Suppliers.memoize( - new Supplier<ImmutableSet<IScheduledTask>>() { - @Override - public ImmutableSet<IScheduledTask> get() { - return storeProvider.getTaskStore().fetchTasks(activeJobStateQuery(jobKey)); - } - }); - return new AttributeAggregate(taskSupplier, storeProvider.getAttributeStore()); - } - @Timed("task_schedule_attempt") @Override public boolean schedule(final String taskId) { @@ -206,15 +183,17 @@ public interface TaskScheduler extends EventSubscriber { @Timed("task_schedule_attempt_locked") protected boolean scheduleTask(MutableStoreProvider store, String taskId) { LOG.fine("Attempting to schedule task " + taskId); - final ITaskConfig task = Iterables.getOnlyElement( + IAssignedTask assignedTask = Iterables.getOnlyElement( Iterables.transform( store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)), - Tasks.SCHEDULED_TO_INFO), + Tasks.SCHEDULED_TO_ASSIGNED), null); - if (task == null) { + + if (assignedTask == null) { LOG.warning("Failed to look up task " + taskId + ", it may have been deleted."); } else { - AttributeAggregate aggregate = getJobState(store, task.getJob()); + ITaskConfig task = assignedTask.getTask(); + AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob()); try { boolean launched = offerManager.launchFirst( getAssignerFunction(store, new ResourceRequest(task, taskId, aggregate)), @@ -222,7 +201,10 @@ public interface TaskScheduler extends EventSubscriber { if (!launched) { // Task could not be scheduled. - maybePreemptFor(taskId, aggregate); + // TODO(maxim): Now that preemption slots are searched asynchronously, consider + // retrying a launch attempt within the current scheduling round IFF a reservation is + // available. + maybePreemptFor(assignedTask, aggregate, store); attemptsNoMatch.incrementAndGet(); return false; } @@ -247,13 +229,17 @@ public interface TaskScheduler extends EventSubscriber { return true; } - private void maybePreemptFor(String taskId, AttributeAggregate attributeAggregate) { - if (reservations.hasReservationForTask(taskId)) { + private void maybePreemptFor( + IAssignedTask task, + AttributeAggregate jobState, + MutableStoreProvider storeProvider) { + + if (reservations.hasReservationForTask(task.getTaskId())) { return; } - Optional<String> slaveId = preemptor.attemptPreemptionFor(taskId, attributeAggregate); + Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, storeProvider); if (slaveId.isPresent()) { - this.reservations.add(SlaveID.newBuilder().setValue(slaveId.get()).build(), taskId); + reservations.add(SlaveID.newBuilder().setValue(slaveId.get()).build(), task.getTaskId()); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java new file mode 100644 index 0000000..67ad5d7 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java @@ -0,0 +1,147 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.async.preemptor; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import javax.inject.Inject; +import javax.inject.Qualifier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.Clock; + +import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.filter.AttributeAggregate; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.StoreProvider; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import static java.util.Objects.requireNonNull; + +import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED; + +/** + * Attempts to find preemption slots for all PENDING tasks eligible for preemption. + */ +class PendingTaskProcessor implements Runnable { + private final Storage storage; + private final PreemptionSlotFinder preemptionSlotFinder; + private final PreemptorMetrics metrics; + private final Amount<Long, Time> preemptionCandidacyDelay; + private final PreemptionSlotCache slotCache; + private final Clock clock; + + /** + * Binding annotation for the time interval after which a pending task becomes eligible to + * preempt other tasks. To avoid excessive churn, the preemptor requires that a task is PENDING + * for a duration (dictated by {@link #preemptionCandidacyDelay}) before it becomes eligible + * to preempt other tasks. + */ + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + public @interface PreemptionDelay { } + + @Inject + PendingTaskProcessor( + Storage storage, + PreemptionSlotFinder preemptionSlotFinder, + PreemptorMetrics metrics, + @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay, + PreemptionSlotCache slotCache, + Clock clock) { + + this.storage = requireNonNull(storage); + this.preemptionSlotFinder = requireNonNull(preemptionSlotFinder); + this.metrics = requireNonNull(metrics); + this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay); + this.slotCache = requireNonNull(slotCache); + this.clock = requireNonNull(clock); + } + + @Override + public void run() { + metrics.recordTaskProcessorRun(); + storage.read(new Storage.Work.Quiet<Void>() { + @Override + public Void apply(StoreProvider storeProvider) { + Multimap<IJobKey, IAssignedTask> pendingTasks = fetchIdlePendingTasks(storeProvider); + + for (IJobKey job : pendingTasks.keySet()) { + AttributeAggregate jobState = AttributeAggregate.getJobActiveState(storeProvider, job); + + for (IAssignedTask pendingTask : pendingTasks.get(job)) { + ITaskConfig task = pendingTask.getTask(); + metrics.recordPreemptionAttemptFor(task); + + Optional<PreemptionSlot> slot = preemptionSlotFinder.findPreemptionSlotFor( + pendingTask, + jobState, + storeProvider); + + metrics.recordSlotSearchResult(slot, task); + + if (slot.isPresent()) { + slotCache.add(pendingTask.getTaskId(), slot.get()); + } + } + } + return null; + } + }); + } + + private Multimap<IJobKey, IAssignedTask> fetchIdlePendingTasks(StoreProvider store) { + return Multimaps.index( + FluentIterable + .from(store.getTaskStore().fetchTasks(Query.statusScoped(PENDING))) + .filter(Predicates.and(isIdleTask, Predicates.not(hasCachedSlot))) + .transform(SCHEDULED_TO_ASSIGNED), + Tasks.ASSIGNED_TO_JOB_KEY); + } + + private final Predicate<IScheduledTask> hasCachedSlot = new Predicate<IScheduledTask>() { + @Override + public boolean apply(IScheduledTask input) { + return slotCache.get(input.getAssignedTask().getTaskId()).isPresent(); + } + }; + + private final Predicate<IScheduledTask> isIdleTask = new Predicate<IScheduledTask>() { + @Override + public boolean apply(IScheduledTask task) { + return (clock.nowMillis() - Tasks.getLatestEvent(task).getTimestamp()) + >= preemptionCandidacyDelay.as(Time.MILLISECONDS); + } + }; +} http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java index 4ca36e5..b5a42a0 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java @@ -53,7 +53,7 @@ class PreemptionSlotCache { @VisibleForTesting static final String PREEMPTION_SLOT_CACHE_SIZE_STAT = "preemption_slot_cache_size"; - private final Cache<String, PreemptionSlotFinder.PreemptionSlot> slots; + private final Cache<String, PreemptionSlot> slots; @Inject PreemptionSlotCache( http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java index 84bcdc5..427c0de 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java @@ -202,6 +202,8 @@ public interface PreemptionSlotFinder { } }; + // TODO(maxim): This should take pre-computed mappings (e.g. slaveToOffers) to avoid + // unnecessary repeated work. @Override public Optional<PreemptionSlot> findPreemptionSlotFor( final IAssignedTask pendingTask, http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java index 84791a2..77617ec 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java @@ -13,22 +13,98 @@ */ package org.apache.aurora.scheduler.async.preemptor; +import javax.inject.Inject; + import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; import org.apache.aurora.scheduler.filter.AttributeAggregate; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; + +import static java.util.Objects.requireNonNull; + +import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING; /** - * Preempts active tasks in favor of higher priority tasks. + * Attempts to preempt active tasks in favor of the provided PENDING task in case a preemption + * slot has been previously found. */ public interface Preemptor { - - // TODO(maxim): Move AttributeAggregate creation into implementing class. /** - * Attempts to preempt active tasks in favor of the input task. + * Preempts victim tasks in case a valid preemption slot exists. * - * @param taskId ID of the preempting task. - * @param attributeAggregate Attribute information for tasks in the job containing {@code task}. + * @param task Preempting task. + * @param jobState Current job state aggregate. + * @param storeProvider Store provider to use for task preemption. * @return ID of the slave where preemption occurred. */ - Optional<String> attemptPreemptionFor(String taskId, AttributeAggregate attributeAggregate); + Optional<String> attemptPreemptionFor( + IAssignedTask task, + AttributeAggregate jobState, + MutableStoreProvider storeProvider); + + class PreemptorImpl implements Preemptor { + private final StateManager stateManager; + private final PreemptionSlotFinder preemptionSlotFinder; + private final PreemptorMetrics metrics; + private final PreemptionSlotCache slotCache; + + @Inject + PreemptorImpl( + StateManager stateManager, + PreemptionSlotFinder preemptionSlotFinder, + PreemptorMetrics metrics, + PreemptionSlotCache slotCache) { + + this.stateManager = requireNonNull(stateManager); + this.preemptionSlotFinder = requireNonNull(preemptionSlotFinder); + this.metrics = requireNonNull(metrics); + this.slotCache = requireNonNull(slotCache); + } + + @Override + public Optional<String> attemptPreemptionFor( + IAssignedTask pendingTask, + AttributeAggregate jobState, + MutableStoreProvider storeProvider) { + + final Optional<PreemptionSlot> preemptionSlot = slotCache.get(pendingTask.getTaskId()); + + // A preemption slot is available -> attempt to preempt tasks. + if (preemptionSlot.isPresent()) { + slotCache.remove(pendingTask.getTaskId()); + + // Validate a PreemptionSlot is still valid for the given task. + Optional<ImmutableSet<PreemptionVictim>> validatedVictims = + preemptionSlotFinder.validatePreemptionSlotFor( + pendingTask, + jobState, + preemptionSlot.get(), + storeProvider); + + metrics.recordSlotValidationResult(validatedVictims); + if (!validatedVictims.isPresent()) { + // Previously found victims are no longer valid -> let the next run find a new slot. + return Optional.absent(); + } + + for (PreemptionVictim toPreempt : validatedVictims.get()) { + metrics.recordTaskPreemption(toPreempt); + stateManager.changeState( + storeProvider, + toPreempt.getTaskId(), + Optional.<ScheduleStatus>absent(), + PREEMPTING, + Optional.of("Preempting in favor of " + pendingTask.getTaskId())); + } + return Optional.of(preemptionSlot.get().getSlaveId()); + } + + return Optional.absent(); + } + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java deleted file mode 100644 index 18a2e60..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java +++ /dev/null @@ -1,226 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import java.lang.annotation.Retention; -import java.lang.annotation.Target; -import java.util.concurrent.ScheduledExecutorService; - -import javax.inject.Inject; -import javax.inject.Qualifier; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Predicate; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.util.Clock; - -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.filter.AttributeAggregate; -import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.StoreProvider; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; - -import static java.lang.annotation.ElementType.FIELD; -import static java.lang.annotation.ElementType.METHOD; -import static java.lang.annotation.ElementType.PARAMETER; -import static java.lang.annotation.RetentionPolicy.RUNTIME; -import static java.util.Objects.requireNonNull; - -import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING; -import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED; - -/** - * Coordinates preemption slot search for a PENDING tasks and triggers preemption if such - * slot is found. - */ -@VisibleForTesting -public class PreemptorImpl implements Preemptor { - - private final Storage storage; - private final StateManager stateManager; - private final PreemptionSlotFinder preemptionSlotFinder; - private final PreemptorMetrics metrics; - private final Amount<Long, Time> preemptionCandidacyDelay; - private final ScheduledExecutorService executor; - private final PreemptionSlotCache slotCache; - private final Clock clock; - - /** - * Binding annotation for the time interval after which a pending task becomes eligible to - * preempt other tasks. To avoid excessive churn, the preemptor requires that a task is PENDING - * for a duration (dictated by {@link #preemptionCandidacyDelay}) before it becomes eligible - * to preempt other tasks. - */ - @VisibleForTesting - @Qualifier - @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) - public @interface PreemptionDelay { } - - @VisibleForTesting - @Qualifier - @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) - public @interface PreemptionExecutor { } - - @Inject - PreemptorImpl( - Storage storage, - StateManager stateManager, - PreemptionSlotFinder preemptionSlotFinder, - PreemptorMetrics metrics, - @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay, - @PreemptionExecutor ScheduledExecutorService executor, - PreemptionSlotCache slotCache, - Clock clock) { - - this.storage = requireNonNull(storage); - this.stateManager = requireNonNull(stateManager); - this.preemptionSlotFinder = requireNonNull(preemptionSlotFinder); - this.metrics = requireNonNull(metrics); - this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay); - this.executor = requireNonNull(executor); - this.slotCache = requireNonNull(slotCache); - this.clock = requireNonNull(clock); - } - - @Override - public synchronized Optional<String> attemptPreemptionFor( - final String taskId, - final AttributeAggregate attributeAggregate) { - - final Optional<PreemptionSlot> preemptionSlot = slotCache.get(taskId); - if (preemptionSlot.isPresent()) { - // A preemption slot is available -> attempt to preempt tasks. - slotCache.remove(taskId); - return preemptTasks(taskId, preemptionSlot.get(), attributeAggregate); - } else { - // TODO(maxim): There is a potential race between preemption requests and async search. - // The side-effect of the race is benign as it only wastes CPU time and is unlikely to happen - // often given our schedule penalty >> slot search time. However, we may want to re-evaluate - // this when moving preemptor into background mode. - searchForPreemptionSlot(taskId, attributeAggregate); - return Optional.absent(); - } - } - - private Optional<String> preemptTasks( - final String taskId, - final PreemptionSlot preemptionSlot, - final AttributeAggregate attributeAggregate) { - - return storage.write(new Storage.MutateWork.Quiet<Optional<String>>() { - @Override - public Optional<String> apply(Storage.MutableStoreProvider storeProvider) { - final Optional<IAssignedTask> pendingTask = fetchIdlePendingTask(taskId, storeProvider); - - // Task is no longer PENDING no need to preempt. - if (!pendingTask.isPresent()) { - return Optional.absent(); - } - - // Validate a PreemptionSlot is still valid for the given task. - Optional<ImmutableSet<PreemptionVictim>> validatedVictims = - preemptionSlotFinder.validatePreemptionSlotFor( - pendingTask.get(), - attributeAggregate, - preemptionSlot, - storeProvider); - - metrics.recordSlotValidationResult(validatedVictims); - if (!validatedVictims.isPresent()) { - // Previously found victims are no longer valid -> trigger a new search. - searchForPreemptionSlot(taskId, attributeAggregate); - return Optional.absent(); - } - - for (PreemptionVictim toPreempt : validatedVictims.get()) { - metrics.recordTaskPreemption(toPreempt); - stateManager.changeState( - storeProvider, - toPreempt.getTaskId(), - Optional.<ScheduleStatus>absent(), - PREEMPTING, - Optional.of("Preempting in favor of " + taskId)); - } - return Optional.of(preemptionSlot.getSlaveId()); - } - }); - } - - private void searchForPreemptionSlot( - final String taskId, - final AttributeAggregate attributeAggregate) { - - executor.execute(new Runnable() { - @Override - public void run() { - Optional<PreemptionSlot> slot = storage.read( - new Storage.Work.Quiet<Optional<PreemptionSlot>>() { - @Override - public Optional<PreemptionSlot> apply(StoreProvider storeProvider) { - Optional<IAssignedTask> pendingTask = fetchIdlePendingTask(taskId, storeProvider); - - // Task is no longer PENDING no need to search for preemption slot. - if (!pendingTask.isPresent()) { - return Optional.absent(); - } - - ITaskConfig task = pendingTask.get().getTask(); - metrics.recordPreemptionAttemptFor(task); - - Optional<PreemptionSlot> result = preemptionSlotFinder.findPreemptionSlotFor( - pendingTask.get(), - attributeAggregate, - storeProvider); - - metrics.recordSlotSearchResult(result, task); - return result; - } - }); - - if (slot.isPresent()) { - slotCache.add(taskId, slot.get()); - } - } - }); - } - - private Optional<IAssignedTask> fetchIdlePendingTask(String taskId, Storage.StoreProvider store) { - Query.Builder query = Query.taskScoped(taskId).byStatus(PENDING); - Iterable<IAssignedTask> result = FluentIterable - .from(store.getTaskStore().fetchTasks(query)) - .filter(isIdleTask) - .transform(SCHEDULED_TO_ASSIGNED); - return Optional.fromNullable(Iterables.getOnlyElement(result, null)); - } - - private final Predicate<IScheduledTask> isIdleTask = new Predicate<IScheduledTask>() { - @Override - public boolean apply(IScheduledTask task) { - return (clock.nowMillis() - Tasks.getLatestEvent(task).getTimestamp()) - >= preemptionCandidacyDelay.as(Time.MILLISECONDS); - } - }; -} http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java index 782e751..dc7eb44 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java @@ -34,6 +34,9 @@ public class PreemptorMetrics { @VisibleForTesting static final String MISSING_ATTRIBUTES_NAME = "preemptor_missing_attributes"; + @VisibleForTesting + static final String PENDING_PROCESSOR_RUN_NAME = "preemptor_task_processor_runs"; + private volatile boolean exported = false; private final CachedCounters counters; @@ -68,7 +71,8 @@ public class PreemptorMetrics { slotSearchStatName(false, true), slotValidationStatName(true), slotValidationStatName(false), - MISSING_ATTRIBUTES_NAME); + MISSING_ATTRIBUTES_NAME, + PENDING_PROCESSOR_RUN_NAME); for (String stat : allStats) { counters.get(stat); } @@ -120,4 +124,8 @@ public class PreemptorMetrics { void recordMissingAttributes() { increment(MISSING_ATTRIBUTES_NAME); } + + void recordTaskProcessorRun() { + increment(PENDING_PROCESSOR_RUN_NAME); + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java index 7034a07..1092c05 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java @@ -13,13 +13,14 @@ */ package org.apache.aurora.scheduler.async.preemptor; -import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Logger; +import javax.inject.Inject; import javax.inject.Singleton; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; +import com.google.common.util.concurrent.AbstractScheduledService; import com.google.inject.AbstractModule; import com.google.inject.PrivateModule; import com.google.inject.TypeLiteral; @@ -28,14 +29,14 @@ import com.twitter.common.args.CmdLine; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; -import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlotFinderImpl; +import org.apache.aurora.scheduler.SchedulerServicesModule; import org.apache.aurora.scheduler.events.PubsubEventModule; import org.apache.aurora.scheduler.filter.AttributeAggregate; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import static java.util.Objects.requireNonNull; -import static org.apache.aurora.scheduler.base.AsyncUtil.singleThreadLoggingScheduledExecutor; - public class PreemptorModule extends AbstractModule { private static final Logger LOG = Logger.getLogger(PreemptorModule.class.getName()); @@ -47,33 +48,35 @@ public class PreemptorModule extends AbstractModule { @CmdLine(name = "preemption_delay", help = "Time interval after which a pending task becomes eligible to preempt other tasks") private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY = - Arg.create(Amount.of(10L, Time.MINUTES)); + Arg.create(Amount.of(3L, Time.MINUTES)); @CmdLine(name = "preemption_slot_hold_time", help = "Time to hold a preemption slot found before it is discarded.") private static final Arg<Amount<Long, Time>> PREEMPTION_SLOT_HOLD_TIME = - Arg.create(Amount.of(3L, Time.MINUTES)); + Arg.create(Amount.of(5L, Time.MINUTES)); + + @CmdLine(name = "preemption_slot_search_interval", + help = "Time interval between pending task preemption slot searches.") + private static final Arg<Amount<Long, Time>> PREEMPTION_SLOT_SEARCH_INTERVAL = + Arg.create(Amount.of(1L, Time.MINUTES)); private final boolean enablePreemptor; private final Amount<Long, Time> preemptionDelay; - private final ScheduledExecutorService executor; + private final Amount<Long, Time> slotSearchInterval; @VisibleForTesting public PreemptorModule( boolean enablePreemptor, Amount<Long, Time> preemptionDelay, - ScheduledExecutorService executor) { + Amount<Long, Time> slotSearchInterval) { this.enablePreemptor = enablePreemptor; this.preemptionDelay = requireNonNull(preemptionDelay); - this.executor = requireNonNull(executor); + this.slotSearchInterval = requireNonNull(slotSearchInterval); } public PreemptorModule() { - this( - ENABLE_PREEMPTOR.get(), - PREEMPTION_DELAY.get(), - singleThreadLoggingScheduledExecutor("PreemptorProcessor-%d", LOG)); + this(ENABLE_PREEMPTOR.get(), PREEMPTION_DELAY.get(), PREEMPTION_SLOT_SEARCH_INTERVAL.get()); } @Override @@ -83,29 +86,36 @@ public class PreemptorModule extends AbstractModule { protected void configure() { if (enablePreemptor) { LOG.info("Preemptor Enabled."); - bind(ScheduledExecutorService.class) - .annotatedWith(PreemptorImpl.PreemptionExecutor.class) - .toInstance(executor); bind(PreemptorMetrics.class).in(Singleton.class); - bind(PreemptionSlotFinder.class).to(PreemptionSlotFinderImpl.class); - bind(PreemptionSlotFinderImpl.class).in(Singleton.class); - bind(Preemptor.class).to(PreemptorImpl.class); - bind(PreemptorImpl.class).in(Singleton.class); + bind(PreemptionSlotFinder.class) + .to(PreemptionSlotFinder.PreemptionSlotFinderImpl.class); + bind(PreemptionSlotFinder.PreemptionSlotFinderImpl.class).in(Singleton.class); + bind(Preemptor.class).to(Preemptor.PreemptorImpl.class); + bind(Preemptor.PreemptorImpl.class).in(Singleton.class); bind(new TypeLiteral<Amount<Long, Time>>() { }) - .annotatedWith(PreemptorImpl.PreemptionDelay.class) + .annotatedWith(PendingTaskProcessor.PreemptionDelay.class) .toInstance(preemptionDelay); bind(new TypeLiteral<Amount<Long, Time>>() { }) .annotatedWith(PreemptionSlotCache.PreemptionSlotHoldDuration.class) .toInstance(PREEMPTION_SLOT_HOLD_TIME.get()); bind(PreemptionSlotCache.class).in(Singleton.class); + bind(PendingTaskProcessor.class).in(Singleton.class); bind(ClusterState.class).to(ClusterStateImpl.class); bind(ClusterStateImpl.class).in(Singleton.class); expose(ClusterStateImpl.class); + + bind(PreemptorService.class).in(Singleton.class); + bind(AbstractScheduledService.Scheduler.class).toInstance( + AbstractScheduledService.Scheduler.newFixedRateSchedule( + 0L, + slotSearchInterval.getValue(), + slotSearchInterval.getUnit().getTimeUnit())); + + expose(PreemptorService.class); } else { bind(Preemptor.class).toInstance(NULL_PREEMPTOR); LOG.warning("Preemptor Disabled."); } - expose(Preemptor.class); } }); @@ -114,13 +124,39 @@ public class PreemptorModule extends AbstractModule { // and private modules due to multiple injectors. We accept the added complexity here to keep // the other bindings private. PubsubEventModule.bindSubscriber(binder(), ClusterStateImpl.class); + if (enablePreemptor) { + SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()) + .to(PreemptorService.class); + } + } + + static class PreemptorService extends AbstractScheduledService { + private final PendingTaskProcessor slotFinder; + private final Scheduler schedule; + + @Inject + PreemptorService(PendingTaskProcessor slotFinder, Scheduler schedule) { + this.slotFinder = requireNonNull(slotFinder); + this.schedule = requireNonNull(schedule); + } + + @Override + protected void runOneIteration() { + slotFinder.run(); + } + + @Override + protected Scheduler scheduler() { + return schedule; + } } private static final Preemptor NULL_PREEMPTOR = new Preemptor() { @Override public Optional<String> attemptPreemptionFor( - String taskId, - AttributeAggregate attributeAggregate) { + IAssignedTask task, + AttributeAggregate jobState, + Storage.MutableStoreProvider storeProvider) { return Optional.absent(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java index da7b662..ed82ae9 100644 --- a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java +++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java @@ -26,8 +26,12 @@ import com.google.common.collect.Iterables; import com.google.common.util.concurrent.AtomicLongMap; import com.twitter.common.collections.Pair; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.storage.AttributeStore; +import org.apache.aurora.scheduler.storage.Storage.StoreProvider; import org.apache.aurora.scheduler.storage.entities.IAttribute; +import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import static java.util.Objects.requireNonNull; @@ -51,6 +55,28 @@ public class AttributeAggregate { private final Supplier<Map<Pair<String, String>, Long>> aggregate; /** + * Initializes an {@link AttributeAggregate} instance from data store. + * + * @param storeProvider Store provider to get data from. + * @param jobKey Job key. + * @return An {@link AttributeAggregate} instance. + */ + public static AttributeAggregate getJobActiveState( + final StoreProvider storeProvider, + final IJobKey jobKey) { + + Supplier<ImmutableSet<IScheduledTask>> taskSupplier = Suppliers.memoize( + new Supplier<ImmutableSet<IScheduledTask>>() { + @Override + public ImmutableSet<IScheduledTask> get() { + return storeProvider.getTaskStore().fetchTasks( + Query.jobScoped(jobKey).byStatus(Tasks.SLAVE_ASSIGNED_STATES)); + } + }); + return new AttributeAggregate(taskSupplier, storeProvider.getAttributeStore()); + } + + /** * Creates a new attribute aggregate, which will be computed from the provided external state. * * @param activeTaskSupplier Supplier of active tasks within the aggregated scope. http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java index 29fe156..c5643d9 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java @@ -36,7 +36,6 @@ import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl; import org.apache.aurora.scheduler.async.preemptor.Preemptor; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskGroupKey; @@ -82,6 +81,8 @@ public class TaskSchedulerImplTest extends EasyMockTest { Offers.makeOffer("OFFER_A", "HOST_A"), IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE))); + private static final String SLAVE_ID = OFFER.getOffer().getSlaveId().getValue(); + private static final TaskGroupKey GROUP_A = TaskGroupKey.from(TASK_A.getAssignedTask().getTask()); private static final TaskGroupKey GROUP_B = TaskGroupKey.from(TASK_B.getAssignedTask().getTask()); @@ -157,13 +158,12 @@ public class TaskSchedulerImplTest extends EasyMockTest { expectActiveJobFetch(TASK_A); expectLaunchAttempt(false); // Reserve "a" with offerA - expect(preemptor.attemptPreemptionFor("a", emptyJob)) - .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue())); + expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID)); expectTaskStillPendingQuery(TASK_B); expectActiveJobFetch(TASK_B); AssignmentCapture firstAssignment = expectLaunchAttempt(false); - expect(preemptor.attemptPreemptionFor("b", emptyJob)).andReturn(Optional.<String>absent()); + expectPreemptorCall(TASK_B, Optional.<String>absent()); expectTaskStillPendingQuery(TASK_B); expectActiveJobFetch(TASK_B); @@ -192,8 +192,7 @@ public class TaskSchedulerImplTest extends EasyMockTest { expectActiveJobFetch(TASK_A); expectLaunchAttempt(false); // Reserve "a" with offerA - expect(preemptor.attemptPreemptionFor("a", emptyJob)) - .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue())); + expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID)); expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); @@ -228,8 +227,7 @@ public class TaskSchedulerImplTest extends EasyMockTest { expectActiveJobFetch(TASK_A); expectLaunchAttempt(false); // Reserve "a" with offerA - expect(preemptor.attemptPreemptionFor("a", emptyJob)) - .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue())); + expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID)); expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); @@ -253,8 +251,7 @@ public class TaskSchedulerImplTest extends EasyMockTest { expectLaunchAttempt(false); // Reserve "a" with offerA - expect(preemptor.attemptPreemptionFor("a", emptyJob)) - .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue())); + expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID)); expectTaskStillPendingQuery(TASK_B); expectActiveJobFetch(TASK_B); @@ -278,8 +275,7 @@ public class TaskSchedulerImplTest extends EasyMockTest { expectActiveJobFetch(TASK_B); expectLaunchAttempt(false); // Reserve "b" with offer1 - expect(preemptor.attemptPreemptionFor("b", emptyJob)) - .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue())); + expectPreemptorCall(TASK_B, Optional.of(SLAVE_ID)); expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); @@ -347,6 +343,13 @@ public class TaskSchedulerImplTest extends EasyMockTest { public Capture<TaskGroupKey> groupKey = createCapture(); } + private void expectPreemptorCall(IScheduledTask task, Optional<String> result) { + expect(preemptor.attemptPreemptionFor( + task.getAssignedTask(), + emptyJob, + storageUtil.mutableStoreProvider)).andReturn(result); + } + private AssignmentCapture expectLaunchAttempt(boolean taskLaunched) throws OfferManager.LaunchException { @@ -366,9 +369,10 @@ public class TaskSchedulerImplTest extends EasyMockTest { assertEquals(groupKey, capture.groupKey.getValue()); } - private void expectActiveJobFetch(IScheduledTask taskInJob) { + private void expectActiveJobFetch(IScheduledTask task) { storageUtil.expectTaskFetch( - TaskSchedulerImpl.activeJobStateQuery(Tasks.SCHEDULED_TO_JOB_KEY.apply(taskInJob)), + Query.jobScoped(Tasks.SCHEDULED_TO_JOB_KEY.apply(task)) + .byStatus(Tasks.SLAVE_ASSIGNED_STATES), ImmutableSet.<IScheduledTask>of()); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java index f5c2128..88c0163 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java @@ -238,7 +238,7 @@ public class TaskSchedulerTest extends EasyMockTest { public void testNoOffers() { Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); - expect(preemptor.attemptPreemptionFor("a", emptyJob)).andReturn(Optional.<String>absent()); + expectPreemptorCall(makeTask("a")); replayAndCreateScheduler(); @@ -316,7 +316,7 @@ public class TaskSchedulerTest extends EasyMockTest { Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.failure()); - expect(preemptor.attemptPreemptionFor("a", emptyJob)).andReturn(Optional.<String>absent()); + expectPreemptorCall(task); Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.success(mesosTask)); @@ -324,7 +324,7 @@ public class TaskSchedulerTest extends EasyMockTest { Capture<Runnable> timeoutCapture3 = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); - expect(preemptor.attemptPreemptionFor("b", emptyJob)).andReturn(Optional.<String>absent()); + expectPreemptorCall(makeTask("b")); replayAndCreateScheduler(); @@ -404,10 +404,10 @@ public class TaskSchedulerTest extends EasyMockTest { expectAnyMaintenanceCalls(); expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.failure()); Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); - expect(preemptor.attemptPreemptionFor("a", emptyJob)).andReturn(Optional.<String>absent()); + expectPreemptorCall(task); driver.declineOffer(OFFER_A.getOffer().getId()); expectTaskGroupBackoff(10, 20); - expect(preemptor.attemptPreemptionFor("a", emptyJob)).andReturn(Optional.<String>absent()); + expectPreemptorCall(task); replayAndCreateScheduler(); @@ -604,7 +604,7 @@ public class TaskSchedulerTest extends EasyMockTest { Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.failure()); expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 20); - expect(preemptor.attemptPreemptionFor("a", emptyJob)).andReturn(Optional.<String>absent()); + expectPreemptorCall(task); replayAndCreateScheduler(); @@ -651,4 +651,11 @@ public class TaskSchedulerTest extends EasyMockTest { .setAttributes(ImmutableSet.<Attribute>of()) .setMode(mode))); } + + private void expectPreemptorCall(IScheduledTask task) { + expect(preemptor.attemptPreemptionFor( + eq(task.getAssignedTask()), + eq(emptyJob), + EasyMock.<MutableStoreProvider>anyObject())).andReturn(Optional.<String>absent()); + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java new file mode 100644 index 0000000..bcd1b4e --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java @@ -0,0 +1,169 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.async.preemptor; + +import java.util.Arrays; + +import com.google.common.base.Optional; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableSet; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.common.util.testing.FakeClock; + +import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.JobKey; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.gen.TaskEvent; +import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.filter.AttributeAggregate; +import org.apache.aurora.scheduler.stats.CachedCounters; +import org.apache.aurora.scheduler.storage.AttributeStore; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.easymock.IExpectationSetters; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.PENDING_PROCESSOR_RUN_NAME; +import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.attemptsStatName; +import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.slotSearchStatName; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; + +public class PendingTaskProcessorTest extends EasyMockTest { + private static final String TASK_ID_A = "task_a"; + private static final String TASK_ID_B = "task_b"; + private static final ScheduledTask TASK_A = makeTask(TASK_ID_A); + private static final ScheduledTask TASK_B = makeTask(TASK_ID_B); + private static final PreemptionSlot SLOT_A = createPreemptionSlot(TASK_A); + private static final PreemptionSlot SLOT_B = createPreemptionSlot(TASK_B); + private static final String SLAVE_ID = "slave_id"; + private static final IJobKey JOB_KEY = IJobKey.build(TASK_A.getAssignedTask().getTask().getJob()); + + private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, Time.SECONDS); + + private static final Optional<PreemptionSlot> EMPTY_SLOT = Optional.absent(); + + private StorageTestUtil storageUtil; + private FakeStatsProvider statsProvider; + private PreemptionSlotFinder preemptionSlotFinder; + private PendingTaskProcessor slotFinder; + private AttributeAggregate attrAggregate; + private PreemptionSlotCache slotCache; + private FakeClock clock; + + @Before + public void setUp() { + storageUtil = new StorageTestUtil(this); + storageUtil.expectOperations(); + preemptionSlotFinder = createMock(PreemptionSlotFinder.class); + slotCache = createMock(PreemptionSlotCache.class); + statsProvider = new FakeStatsProvider(); + clock = new FakeClock(); + attrAggregate = new AttributeAggregate( + Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()), + createMock(AttributeStore.class)); + + slotFinder = new PendingTaskProcessor( + storageUtil.storage, + preemptionSlotFinder, + new PreemptorMetrics(new CachedCounters(statsProvider)), + PREEMPTION_DELAY, + slotCache, + clock); + } + @Test + public void testSearchSlotSuccessful() throws Exception { + expect(slotCache.get(TASK_ID_A)).andReturn(EMPTY_SLOT); + expect(slotCache.get(TASK_ID_B)).andReturn(EMPTY_SLOT); + expectGetPendingTasks(TASK_A, TASK_B); + expectAttributeAggegateFetchTasks(); + expectSlotSearch(TASK_A, Optional.of(SLOT_A)); + expectSlotSearch(TASK_B, Optional.of(SLOT_B)); + slotCache.add(TASK_ID_A, SLOT_A); + slotCache.add(TASK_ID_B, SLOT_B); + + control.replay(); + + clock.advance(PREEMPTION_DELAY); + + slotFinder.run(); + assertEquals(1L, statsProvider.getLongValue(PENDING_PROCESSOR_RUN_NAME)); + assertEquals(2L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(true, true))); + assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true))); + } + + @Test + public void testSearchSlotFailed() throws Exception { + expect(slotCache.get(TASK_ID_A)).andReturn(EMPTY_SLOT); + expectGetPendingTasks(TASK_A); + expectAttributeAggegateFetchTasks(); + expectSlotSearch(TASK_A, EMPTY_SLOT); + + control.replay(); + + clock.advance(PREEMPTION_DELAY); + + slotFinder.run(); + assertEquals(1L, statsProvider.getLongValue(PENDING_PROCESSOR_RUN_NAME)); + assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true))); + assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(false, true))); + } + + private void expectSlotSearch(ScheduledTask task, Optional<PreemptionSlot> slot) { + expect(preemptionSlotFinder.findPreemptionSlotFor( + IAssignedTask.build(task.getAssignedTask()), + attrAggregate, + storageUtil.storeProvider)).andReturn(slot); + } + + private static PreemptionSlot createPreemptionSlot(ScheduledTask task) { + IAssignedTask assigned = IAssignedTask.build(task.getAssignedTask()); + return new PreemptionSlot(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), SLAVE_ID); + } + + private static ScheduledTask makeTask(String taskId) { + ScheduledTask task = new ScheduledTask() + .setAssignedTask(new AssignedTask() + .setTaskId(taskId) + .setTask(new TaskConfig() + .setPriority(1) + .setProduction(true) + .setJob(new JobKey("role", "env", "name")))); + task.addToTaskEvents(new TaskEvent(0, PENDING)); + return task; + } + + private IExpectationSetters<?> expectAttributeAggegateFetchTasks() { + return storageUtil.expectTaskFetch( + Query.jobScoped(JOB_KEY).byStatus(Tasks.SLAVE_ASSIGNED_STATES)); + } + + private void expectGetPendingTasks(ScheduledTask... returnedTasks) { + storageUtil.expectTaskFetch( + Query.statusScoped(PENDING), + IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks))); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java index d17c4fb..281f4e0 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java @@ -13,16 +13,10 @@ */ package org.apache.aurora.scheduler.async.preemptor; -import java.util.Arrays; -import java.util.concurrent.ScheduledExecutorService; - import com.google.common.base.Optional; import com.google.common.base.Suppliers; -import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableSet; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; import com.twitter.common.testing.easymock.EasyMockTest; import org.apache.aurora.gen.AssignedTask; @@ -32,26 +26,24 @@ import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.TaskEvent; import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; -import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.async.preemptor.Preemptor.PreemptorImpl; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.stats.CachedCounters; import org.apache.aurora.scheduler.storage.AttributeStore; +import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; -import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.attemptsStatName; -import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.slotSearchStatName; import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.slotValidationStatName; import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.successStatName; +import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; @@ -59,171 +51,93 @@ import static org.junit.Assert.assertEquals; public class PreemptorImplTest extends EasyMockTest { private static final String TASK_ID = "task_a"; private static final String SLAVE_ID = "slave_id"; - - private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, Time.SECONDS); + private static final IScheduledTask TASK = IScheduledTask.build(makeTask()); + private static final PreemptionSlot SLOT = createPreemptionSlot(TASK); private static final Optional<PreemptionSlot> EMPTY_SLOT = Optional.absent(); private static final Optional<String> EMPTY_RESULT = Optional.absent(); - private StorageTestUtil storageUtil; private StateManager stateManager; private FakeStatsProvider statsProvider; private PreemptionSlotFinder preemptionSlotFinder; private PreemptorImpl preemptor; private AttributeAggregate attrAggregate; private PreemptionSlotCache slotCache; - private FakeScheduledExecutor clock; + private Storage.MutableStoreProvider storeProvider; @Before public void setUp() { - storageUtil = new StorageTestUtil(this); - storageUtil.expectOperations(); + storeProvider = createMock(Storage.MutableStoreProvider.class); stateManager = createMock(StateManager.class); preemptionSlotFinder = createMock(PreemptionSlotFinder.class); slotCache = createMock(PreemptionSlotCache.class); statsProvider = new FakeStatsProvider(); - ScheduledExecutorService executor = createMock(ScheduledExecutorService.class); - clock = FakeScheduledExecutor.scheduleExecutor(executor); attrAggregate = new AttributeAggregate( Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()), createMock(AttributeStore.class)); preemptor = new PreemptorImpl( - storageUtil.storage, stateManager, preemptionSlotFinder, new PreemptorMetrics(new CachedCounters(statsProvider)), - PREEMPTION_DELAY, - executor, - slotCache, - clock); - } - - @Test - public void testSearchSlotSuccessful() throws Exception { - ScheduledTask task = makeTask(); - PreemptionSlot slot = createPreemptionSlot(task); - - expect(slotCache.get(TASK_ID)).andReturn(EMPTY_SLOT); - expectGetPendingTasks(task); - expectSlotSearch(task, Optional.of(slot)); - slotCache.add(TASK_ID, slot); - - control.replay(); - - clock.advance(PREEMPTION_DELAY); - - assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, attrAggregate)); - assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); - assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(true, true))); - assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true))); - } - - @Test - public void testSearchSlotFailed() throws Exception { - ScheduledTask task = makeTask(); - - expect(slotCache.get(TASK_ID)).andReturn(EMPTY_SLOT); - expectGetPendingTasks(task); - expectSlotSearch(task, EMPTY_SLOT); - - control.replay(); - - clock.advance(PREEMPTION_DELAY); - - assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, attrAggregate)); - assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); - assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true))); - assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(false, true))); - } - - @Test - public void testSearchSlotTaskNoLongerPending() throws Exception { - expect(slotCache.get(TASK_ID)).andReturn(EMPTY_SLOT); - storageUtil.expectTaskFetch(Query.statusScoped(PENDING).byId(TASK_ID)); - - control.replay(); - - assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, attrAggregate)); + slotCache); } @Test public void testPreemptTasksSuccessful() throws Exception { - ScheduledTask task = makeTask(); - PreemptionSlot slot = createPreemptionSlot(task); - - expect(slotCache.get(TASK_ID)).andReturn(Optional.of(slot)); + expect(slotCache.get(TASK_ID)).andReturn(Optional.of(SLOT)); slotCache.remove(TASK_ID); - expectGetPendingTasks(task); - expectSlotValidation(task, slot, Optional.of(ImmutableSet.of( - PreemptionVictim.fromTask(IAssignedTask.build(task.getAssignedTask()))))); + expectSlotValidation(Optional.of(ImmutableSet.of( + PreemptionVictim.fromTask(TASK.getAssignedTask())))); - expectPreempted(task); + expectPreempted(TASK); control.replay(); - clock.advance(PREEMPTION_DELAY); - - assertEquals(Optional.of(SLAVE_ID), preemptor.attemptPreemptionFor(TASK_ID, attrAggregate)); + assertEquals(Optional.of(SLAVE_ID), callPreemptor()); assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(true))); assertEquals(1L, statsProvider.getLongValue(successStatName(true))); } @Test public void testPreemptTasksValidationFailed() throws Exception { - ScheduledTask task = makeTask(); - PreemptionSlot slot = createPreemptionSlot(task); - - expect(slotCache.get(TASK_ID)).andReturn(Optional.of(slot)); + expect(slotCache.get(TASK_ID)).andReturn(Optional.of(SLOT)); slotCache.remove(TASK_ID); - expectGetPendingTasks(task); - storageUtil.expectTaskFetch(Query.statusScoped(PENDING).byId(TASK_ID)); - expectSlotValidation(task, slot, Optional.<ImmutableSet<PreemptionVictim>>absent()); + expectSlotValidation(Optional.<ImmutableSet<PreemptionVictim>>absent()); control.replay(); - clock.advance(PREEMPTION_DELAY); - - assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, attrAggregate)); + assertEquals(EMPTY_RESULT, callPreemptor()); assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(false))); assertEquals(0L, statsProvider.getLongValue(successStatName(true))); } @Test - public void testPreemptTaskNoLongerPending() throws Exception { - ScheduledTask task = makeTask(); - PreemptionSlot slot = createPreemptionSlot(task); - expect(slotCache.get(TASK_ID)).andReturn(Optional.of(slot)); - slotCache.remove(TASK_ID); - storageUtil.expectTaskFetch(Query.statusScoped(PENDING).byId(TASK_ID)); + public void testNoCachedSlot() throws Exception { + expect(slotCache.get(TASK_ID)).andReturn(EMPTY_SLOT); control.replay(); - assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, attrAggregate)); + assertEquals(EMPTY_RESULT, callPreemptor()); + assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(false))); + assertEquals(0L, statsProvider.getLongValue(successStatName(true))); } - private void expectSlotSearch(ScheduledTask task, Optional<PreemptionSlot> slot) { - expect(preemptionSlotFinder.findPreemptionSlotFor( - IAssignedTask.build(task.getAssignedTask()), - attrAggregate, - storageUtil.storeProvider)).andReturn(slot); + private Optional<String> callPreemptor() { + return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), attrAggregate, storeProvider); } - private void expectSlotValidation( - ScheduledTask task, - PreemptionSlot slot, - Optional<ImmutableSet<PreemptionVictim>> victims) { - + private void expectSlotValidation(Optional<ImmutableSet<PreemptionVictim>> victims) { expect(preemptionSlotFinder.validatePreemptionSlotFor( - IAssignedTask.build(task.getAssignedTask()), - attrAggregate, - slot, - storageUtil.mutableStoreProvider)).andReturn(victims); + eq(TASK.getAssignedTask()), + eq(attrAggregate), + eq(SLOT), + anyObject(Storage.MutableStoreProvider.class))).andReturn(victims); } - private void expectPreempted(ScheduledTask preempted) throws Exception { + private void expectPreempted(IScheduledTask preempted) throws Exception { expect(stateManager.changeState( - eq(storageUtil.mutableStoreProvider), + anyObject(Storage.MutableStoreProvider.class), eq(Tasks.id(preempted)), eq(Optional.<ScheduleStatus>absent()), eq(ScheduleStatus.PREEMPTING), @@ -231,8 +145,8 @@ public class PreemptorImplTest extends EasyMockTest { .andReturn(true); } - private static PreemptionSlot createPreemptionSlot(ScheduledTask task) { - IAssignedTask assigned = IAssignedTask.build(task.getAssignedTask()); + private static PreemptionSlot createPreemptionSlot(IScheduledTask task) { + IAssignedTask assigned = task.getAssignedTask(); return new PreemptionSlot(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), SLAVE_ID); } @@ -247,13 +161,4 @@ public class PreemptorImplTest extends EasyMockTest { task.addToTaskEvents(new TaskEvent(0, PENDING)); return task; } - - private void expectGetPendingTasks(ScheduledTask... returnedTasks) { - Iterable<String> taskIds = FluentIterable.from(Arrays.asList(returnedTasks)) - .transform(IScheduledTask.FROM_BUILDER) - .transform(Tasks.SCHEDULED_TO_ID); - storageUtil.expectTaskFetch( - Query.statusScoped(PENDING).byId(taskIds), - IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks))); - } } http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java index 0e2e958..7e2d1c5 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java @@ -13,8 +13,6 @@ */ package org.apache.aurora.scheduler.async.preemptor; -import java.util.concurrent.ScheduledExecutorService; - import com.google.common.base.Optional; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableSet; @@ -30,12 +28,14 @@ import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; import com.twitter.common.testing.easymock.EasyMockTest; +import org.apache.aurora.gen.AssignedTask; import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.state.TaskAssigner; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.junit.Before; @@ -78,7 +78,7 @@ public class PreemptorModuleTest extends EasyMockTest { Injector injector = createInjector(new PreemptorModule( false, Amount.of(0L, Time.SECONDS), - createMock(ScheduledExecutorService.class))); + Amount.of(0L, Time.SECONDS))); Supplier<ImmutableSet<IScheduledTask>> taskSupplier = createMock(new EasyMockTest.Clazz<Supplier<ImmutableSet<IScheduledTask>>>() { }); @@ -91,7 +91,9 @@ public class PreemptorModuleTest extends EasyMockTest { injector.getBindings(); assertEquals( Optional.<String>absent(), - injector.getInstance(Preemptor.class) - .attemptPreemptionFor("a", new AttributeAggregate(taskSupplier, attributeStore))); + injector.getInstance(Preemptor.class).attemptPreemptionFor( + IAssignedTask.build(new AssignedTask()), + new AttributeAggregate(taskSupplier, attributeStore), + storageUtil.mutableStoreProvider)); } }
