Centralizing offer/task matching in TaskAssigner. Bugs closed: AURORA-1416
Reviewed at https://reviews.apache.org/r/37001/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/fb032506 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/fb032506 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/fb032506 Branch: refs/heads/master Commit: fb032506529894628d9e0f85a0ded095c938bf49 Parents: 1c0086f Author: Maxim Khutornenko <[email protected]> Authored: Mon Aug 3 11:46:58 2015 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Mon Aug 3 11:46:58 2015 -0700 ---------------------------------------------------------------------- .../aurora/benchmark/SchedulingBenchmarks.java | 5 +- .../benchmark/fakes/FakeOfferManager.java | 20 +- .../aurora/scheduler/offers/OfferManager.java | 148 ++-- .../scheduler/scheduling/SchedulingModule.java | 2 +- .../scheduler/scheduling/TaskScheduler.java | 101 +-- .../aurora/scheduler/state/TaskAssigner.java | 220 +++--- .../scheduler/offers/OfferManagerImplTest.java | 164 +++-- .../scheduling/TaskSchedulerImplTest.java | 216 +++--- .../scheduler/scheduling/TaskSchedulerTest.java | 671 ------------------- .../scheduler/state/TaskAssignerImplTest.java | 122 +++- 10 files changed, 487 insertions(+), 1182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java index 5bc73d5..d75f090 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java @@ -38,6 +38,7 @@ import org.apache.aurora.benchmark.fakes.FakeRescheduleCalculator; import org.apache.aurora.benchmark.fakes.FakeStatsProvider; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.TaskIdGenerator; +import org.apache.aurora.scheduler.async.AsyncModule; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent; import org.apache.aurora.scheduler.filter.SchedulingFilter; @@ -117,7 +118,9 @@ public class SchedulingBenchmarks { new PrivateModule() { @Override protected void configure() { - bind(ScheduledExecutorService.class).toInstance(executor); + bind(ScheduledExecutorService.class) + .annotatedWith(AsyncModule.AsyncExecutor.class) + .toInstance(executor); bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class); bind(OfferManager.OfferManagerImpl.class).in(Singleton.class); bind(OfferManager.OfferReturnDelay.class).toInstance( http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java index f413301..fbd24ea 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java +++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java @@ -13,14 +13,12 @@ */ package org.apache.aurora.benchmark.fakes; -import com.google.common.base.Function; import com.google.common.base.Optional; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.events.PubsubEvent; import org.apache.aurora.scheduler.offers.OfferManager; -import org.apache.aurora.scheduler.state.TaskAssigner; import org.apache.mesos.Protos; public class FakeOfferManager implements OfferManager { @@ -30,15 +28,23 @@ public class FakeOfferManager implements OfferManager { } @Override - public void cancelOffer(Protos.OfferID offer) { + public void cancelOffer(Protos.OfferID offerId) { // no-op } @Override - public boolean launchFirst( - Function<HostOffer, TaskAssigner.Assignment> acceptor, - TaskGroupKey groupKey) throws LaunchException { - return false; + public void launchTask(Protos.OfferID offerId, Protos.TaskInfo taskInfo) throws LaunchException { + // no-op + } + + @Override + public void banOffer(Protos.OfferID offerId, TaskGroupKey groupKey) { + // no-op + } + + @Override + public Iterable<HostOffer> getOffers(TaskGroupKey groupKey) { + return null; } @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java index 14bf265..4b8a55f 100644 --- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java +++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java @@ -20,7 +20,6 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Level; import java.util.logging.Logger; import javax.inject.Inject; @@ -29,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Supplier; +import com.google.common.collect.FluentIterable; import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; @@ -47,8 +47,8 @@ import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.mesos.Driver; -import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.mesos.Protos; import org.apache.mesos.Protos.OfferID; import org.apache.mesos.Protos.SlaveID; @@ -76,22 +76,27 @@ public interface OfferManager extends EventSubscriber { * Invalidates an offer. This indicates that the scheduler should not attempt to match any * tasks against the offer. * - * @param offer Canceled offer. + * @param offerId Canceled offer. */ - void cancelOffer(OfferID offer); + void cancelOffer(OfferID offerId); /** - * Launches the first task that satisfies the {@code acceptor} by returning a {@link Assignment}. + * Exclude an offer that results in a static mismatch from further attempts to match against all + * tasks from the same group. * - * @param acceptor Function that determines if an offer is accepted. - * @param groupKey Task group key. - * @return {@code true} if the task was launched, {@code false} if no offers satisfied the - * {@code acceptor}. - * @throws LaunchException If the acceptor accepted an offer, but there was an error launching the - * task. + * @param offerId Offer ID to exclude for the given {@code groupKey}. + * @param groupKey Task group key to exclude. */ - boolean launchFirst(Function<HostOffer, Assignment> acceptor, TaskGroupKey groupKey) - throws LaunchException; + void banOffer(OfferID offerId, TaskGroupKey groupKey); + + /** + * Launches the task matched against the offer. + * + * @param offerId Matched offer ID. + * @param task Matched task info. + * @throws LaunchException If there was an error launching the task. + */ + void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException; /** * Notifies the offer queue that a host's attributes have changed. @@ -108,6 +113,14 @@ public interface OfferManager extends EventSubscriber { Iterable<HostOffer> getOffers(); /** + * Gets all offers that are not statically banned for the given {@code groupKey}. + * + * @param groupKey Task group key to check offers for. + * @return A snapshot of all offers eligible for the given {@code groupKey}. + */ + Iterable<HostOffer> getOffers(TaskGroupKey groupKey); + + /** * Gets an offer for the given slave ID. * * @param slaveId Slave ID to get offer for. @@ -127,7 +140,8 @@ public interface OfferManager extends EventSubscriber { * Thrown when there was an unexpected failure trying to launch a task. */ class LaunchException extends Exception { - LaunchException(String msg) { + @VisibleForTesting + public LaunchException(String msg) { super(msg); } @@ -218,6 +232,11 @@ public interface OfferManager extends EventSubscriber { } @Override + public Iterable<HostOffer> getOffers(TaskGroupKey groupKey) { + return hostOffers.getWeaklyConsistentOffers(groupKey); + } + + @Override public Optional<HostOffer> getOffer(SlaveID slaveId) { return hostOffers.get(slaveId); } @@ -268,7 +287,7 @@ public interface OfferManager extends EventSubscriber { private final Map<String, HostOffer> offersByHost = Maps.newHashMap(); // TODO(maxim): Expose via a debug endpoint. AURORA-1136. // Keep track of offer->groupKey mappings that will never be matched to avoid redundant - // scheduling attempts. See Assignment.Result for more details on static ban. + // scheduling attempts. See VetoGroup for more details on static ban. private final Multimap<OfferID, TaskGroupKey> staticallyBannedOffers = HashMultimap.create(); HostOffers() { @@ -304,7 +323,7 @@ public interface OfferManager extends EventSubscriber { if (offer != null) { // Remove and re-add a host's offer to re-sort based on its new hostStatus remove(offer.getOffer().getId()); - add(new HostOffer(offer.getOffer(), attributes)); + add(new HostOffer(offer.getOffer(), attributes)); } } @@ -312,27 +331,14 @@ public interface OfferManager extends EventSubscriber { return Iterables.unmodifiableIterable(offers); } - synchronized boolean isStaticallyBanned(HostOffer offer, TaskGroupKey groupKey) { - boolean result = staticallyBannedOffers.containsEntry(offer.getOffer().getId(), groupKey); - if (LOG.isLoggable(Level.FINE)) { - LOG.fine(String.format( - "Host offer %s is statically banned for %s: %s", - offer, - groupKey, - result)); - } - return result; + synchronized Iterable<HostOffer> getWeaklyConsistentOffers(TaskGroupKey groupKey) { + return Iterables.unmodifiableIterable(FluentIterable.from(offers).filter( + e -> !staticallyBannedOffers.containsEntry(e.getOffer().getId(), groupKey))); } - synchronized void addStaticGroupBan(HostOffer offer, TaskGroupKey groupKey) { - OfferID offerId = offer.getOffer().getId(); + synchronized void addStaticGroupBan(OfferID offerId, TaskGroupKey groupKey) { if (offersById.containsKey(offerId)) { staticallyBannedOffers.put(offerId, groupKey); - - if (LOG.isLoggable(Level.FINE)) { - LOG.fine( - String.format("Adding static ban for offer: %s, groupKey: %s", offer, groupKey)); - } } } @@ -345,63 +351,31 @@ public interface OfferManager extends EventSubscriber { } } - @Timed("offer_queue_launch_first") @Override - public boolean launchFirst(Function<HostOffer, Assignment> acceptor, TaskGroupKey groupKey) - throws LaunchException { - - // It's important that this method is not called concurrently - doing so would open up the - // possibility of a race between the same offers being accepted by different threads. - - for (HostOffer offer : hostOffers.getWeaklyConsistentOffers()) { - if (!hostOffers.isStaticallyBanned(offer, groupKey) - && acceptOffer(offer, acceptor, groupKey)) { - return true; - } - } - - return false; + public void banOffer(OfferID offerId, TaskGroupKey groupKey) { + hostOffers.addStaticGroupBan(offerId, groupKey); } - @Timed("offer_queue_accept_offer") - protected boolean acceptOffer( - HostOffer offer, - Function<HostOffer, Assignment> acceptor, - TaskGroupKey groupKey) throws LaunchException { - - Assignment assignment = acceptor.apply(offer); - switch (assignment.getResult()) { - - case SUCCESS: - // Guard against an offer being removed after we grabbed it from the iterator. - // If that happens, the offer will not exist in hostOffers, and we can immediately - // send it back to LOST for quick reschedule. - // Removing while iterating counts on the use of a weakly-consistent iterator being used, - // which is a feature of ConcurrentSkipListSet. - if (hostOffers.remove(offer.getOffer().getId())) { - try { - driver.launchTask(offer.getOffer().getId(), assignment.getTaskInfo().get()); - return true; - } catch (IllegalStateException e) { - // TODO(William Farner): Catch only the checked exception produced by Driver - // once it changes from throwing IllegalStateException when the driver is not yet - // registered. - throw new LaunchException("Failed to launch task.", e); - } - } else { - offerRaces.incrementAndGet(); - throw new LaunchException( - "Accepted offer no longer exists in offer queue, likely data race."); - } - - case FAILURE_STATIC_MISMATCH: - // Exclude an offer that results in a static mismatch from further attempts to match - // against all tasks from the same group. - hostOffers.addStaticGroupBan(offer, groupKey); - return false; - - default: - return false; + @Timed("offer_manager_launch_task") + @Override + public void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException { + // Guard against an offer being removed after we grabbed it from the iterator. + // If that happens, the offer will not exist in hostOffers, and we can immediately + // send it back to LOST for quick reschedule. + // Removing while iterating counts on the use of a weakly-consistent iterator being used, + // which is a feature of ConcurrentSkipListSet. + if (hostOffers.remove(offerId)) { + try { + driver.launchTask(offerId, task); + } catch (IllegalStateException e) { + // TODO(William Farner): Catch only the checked exception produced by Driver + // once it changes from throwing IllegalStateException when the driver is not yet + // registered. + throw new LaunchException("Failed to launch task.", e); + } + } else { + offerRaces.incrementAndGet(); + throw new LaunchException("Offer no longer exists in offer queue, likely data race."); } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java index c7a1a46..b9dccc6 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java @@ -112,7 +112,7 @@ public class SchedulingModule extends AbstractModule { install(new PrivateModule() { @Override protected void configure() { - bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).in(Singleton.class); + bind(new TypeLiteral<BiCache<TaskGroupKey, String>>() { }).in(Singleton.class); bind(BiCache.BiCacheSettings.class).toInstance( new BiCache.BiCacheSettings(RESERVATION_DURATION.get(), "reservation_cache_size")); bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java index d4bd529..0f0bfca 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java @@ -23,26 +23,21 @@ import javax.inject.Inject; import javax.inject.Qualifier; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.Iterables; import com.google.common.eventbus.Subscribe; import com.twitter.common.inject.TimedInterceptor.Timed; import com.twitter.common.stats.Stats; -import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; -import org.apache.aurora.scheduler.offers.OfferManager; import org.apache.aurora.scheduler.preemptor.BiCache; import org.apache.aurora.scheduler.preemptor.Preemptor; -import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.state.TaskAssigner; -import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.Storage.MutateWork; @@ -56,7 +51,6 @@ import static java.lang.annotation.ElementType.PARAMETER; import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.util.Objects.requireNonNull; -import static org.apache.aurora.gen.ScheduleStatus.LOST; import static org.apache.aurora.gen.ScheduleStatus.PENDING; /** @@ -91,11 +85,9 @@ public interface TaskScheduler extends EventSubscriber { private static final Logger LOG = Logger.getLogger(TaskSchedulerImpl.class.getName()); private final Storage storage; - private final StateManager stateManager; private final TaskAssigner assigner; - private final OfferManager offerManager; private final Preemptor preemptor; - private final BiCache<String, TaskGroupKey> reservations; + private final BiCache<TaskGroupKey, String> reservations; private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired"); private final AtomicLong attemptsFailed = Stats.exportLong("schedule_attempts_failed"); @@ -104,54 +96,16 @@ public interface TaskScheduler extends EventSubscriber { @Inject TaskSchedulerImpl( Storage storage, - StateManager stateManager, TaskAssigner assigner, - OfferManager offerManager, Preemptor preemptor, - BiCache<String, TaskGroupKey> reservations) { + BiCache<TaskGroupKey, String> reservations) { this.storage = requireNonNull(storage); - this.stateManager = requireNonNull(stateManager); this.assigner = requireNonNull(assigner); - this.offerManager = requireNonNull(offerManager); this.preemptor = requireNonNull(preemptor); this.reservations = requireNonNull(reservations); } - private Function<HostOffer, Assignment> getAssignerFunction( - final MutableStoreProvider storeProvider, - final ResourceRequest resourceRequest, - final String taskId) { - - // TODO(wfarner): Turn this into Predicate<Offer>, and in the caller, find the first match - // and perform the assignment at the very end. This will allow us to use optimistic locking - // at the top of the stack and avoid holding the write lock for too long. - return new Function<HostOffer, Assignment>() { - @Override - public Assignment apply(HostOffer offer) { - Optional<TaskGroupKey> reservation = - reservations.get(offer.getOffer().getSlaveId().getValue()); - - if (reservation.isPresent()) { - if (TaskGroupKey.from(resourceRequest.getTask()).equals(reservation.get())) { - // Slave is reserved to satisfy this task group. - return assigner.maybeAssign(storeProvider, offer, resourceRequest, taskId); - } else { - // Slave is reserved for another task. - return Assignment.failure(); - } - } else { - // Slave is not reserved. - return assigner.maybeAssign(storeProvider, offer, resourceRequest, taskId); - } - } - }; - } - - @VisibleForTesting - static final Optional<String> LAUNCH_FAILED_MSG = - Optional.of("Unknown exception attempting to schedule task."); - @Timed("task_schedule_attempt") @Override public boolean schedule(final String taskId) { @@ -186,35 +140,22 @@ public interface TaskScheduler extends EventSubscriber { } else { ITaskConfig task = assignedTask.getTask(); AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob()); - try { - boolean launched = offerManager.launchFirst( - getAssignerFunction(store, new ResourceRequest(task, aggregate), taskId), - TaskGroupKey.from(task)); - - if (!launched) { - // Task could not be scheduled. - // TODO(maxim): Now that preemption slots are searched asynchronously, consider - // retrying a launch attempt within the current scheduling round IFF a reservation is - // available. - maybePreemptFor(assignedTask, aggregate, store); - attemptsNoMatch.incrementAndGet(); - return false; - } - } catch (OfferManager.LaunchException e) { - LOG.log(Level.WARNING, "Failed to launch task.", e); - attemptsFailed.incrementAndGet(); - - // The attempt to schedule the task failed, so we need to backpedal on the - // assignment. - // It is in the LOST state and a new task will move to PENDING to replace it. - // Should the state change fail due to storage issues, that's okay. The task will - // time out in the ASSIGNED state and be moved to LOST. - stateManager.changeState( - store, - taskId, - Optional.of(PENDING), - LOST, - LAUNCH_FAILED_MSG); + + boolean launched = assigner.maybeAssign( + store, + new ResourceRequest(task, aggregate), + TaskGroupKey.from(task), + taskId, + reservations.get(TaskGroupKey.from(task))); + + if (!launched) { + // Task could not be scheduled. + // TODO(maxim): Now that preemption slots are searched asynchronously, consider + // retrying a launch attempt within the current scheduling round IFF a reservation is + // available. + maybePreemptFor(assignedTask, aggregate, store); + attemptsNoMatch.incrementAndGet(); + return false; } } @@ -226,12 +167,12 @@ public interface TaskScheduler extends EventSubscriber { AttributeAggregate jobState, MutableStoreProvider storeProvider) { - if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) { + if (reservations.get(TaskGroupKey.from(task.getTask())).isPresent()) { return; } Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, storeProvider); if (slaveId.isPresent()) { - reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask())); + reservations.put(TaskGroupKey.from(task.getTask()), slaveId.get()); } } @@ -240,7 +181,7 @@ public interface TaskScheduler extends EventSubscriber { if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) { IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask(); if (assigned.getSlaveId() != null) { - reservations.remove(assigned.getSlaveId(), TaskGroupKey.from(assigned.getTask())); + reservations.remove(TaskGroupKey.from(assigned.getTask()), assigned.getSlaveId()); } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java index 3acb45a..0e32990 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java +++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java @@ -16,20 +16,22 @@ package org.apache.aurora.scheduler.state; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; import java.util.logging.Logger; import javax.inject.Inject; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; -import com.google.common.base.Objects; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableSet; -import com.twitter.common.base.MorePreconditions; +import com.twitter.common.stats.Stats; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.ResourceSlot; +import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.configuration.Resources; import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; @@ -37,161 +39,64 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup; import org.apache.aurora.scheduler.mesos.MesosTaskFactory; +import org.apache.aurora.scheduler.offers.OfferManager; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.mesos.Protos.TaskInfo; import static java.util.Objects.requireNonNull; +import static org.apache.aurora.gen.ScheduleStatus.LOST; +import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import static org.apache.mesos.Protos.Offer; /** - * Responsible for matching a task against an offer. + * Responsible for matching a task against an offer and launching it. */ public interface TaskAssigner { - - final class Assignment { - - public enum Result { - /** - * Assignment successful. - */ - SUCCESS, - - /** - * Assignment failed. - */ - FAILURE, - - /** - * Assignment failed with static mismatch (i.e. all {@link Veto} instances group - * as {@link VetoGroup}). - * @see VetoGroup#STATIC - */ - FAILURE_STATIC_MISMATCH, - } - - private static final Optional<TaskInfo> NO_TASK_INFO = Optional.absent(); - private static final ImmutableSet<Veto> NO_VETOES = ImmutableSet.of(); - private final Optional<TaskInfo> taskInfo; - private final Set<Veto> vetoes; - - private Assignment(Optional<TaskInfo> taskInfo, Set<Veto> vetoes) { - this.taskInfo = taskInfo; - this.vetoes = vetoes; - } - - /** - * Creates a successful assignment instance. - * - * @param taskInfo {@link TaskInfo} to launch. - * @return A successful {@link Assignment}. - */ - public static Assignment success(TaskInfo taskInfo) { - return new Assignment(Optional.of(taskInfo), NO_VETOES); - } - - /** - * Creates a failed assignment instance with a set of {@link Veto} applied. - * - * @param vetoes Set of {@link Veto} instances issued for the failed offer/task match. - * @return A failed {@link Assignment}. - */ - public static Assignment failure(Set<Veto> vetoes) { - return new Assignment(NO_TASK_INFO, MorePreconditions.checkNotBlank(vetoes)); - } - - /** - * Creates a failed assignment instance. - * - * @return A failed {@link Assignment}. - */ - public static Assignment failure() { - return new Assignment(NO_TASK_INFO, NO_VETOES); - } - - /** - * Generates the {@link Result} based on the assignment details. - * - * @return An assignment {@link Result}. - */ - public Result getResult() { - if (taskInfo.isPresent()) { - return Result.SUCCESS; - } - - return Veto.identifyGroup(vetoes) == VetoGroup.STATIC - ? Result.FAILURE_STATIC_MISMATCH - : Result.FAILURE; - } - - /** - * A {@link TaskInfo} to launch. - * - * @return Optional of {@link TaskInfo}. - */ - public Optional<TaskInfo> getTaskInfo() { - return taskInfo; - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof Assignment)) { - return false; - } - - Assignment other = (Assignment) o; - - return Objects.equal(taskInfo, other.taskInfo) - && Objects.equal(vetoes, other.vetoes); - } - - @Override - public int hashCode() { - return Objects.hashCode(taskInfo, vetoes); - } - - @Override - public String toString() { - return Objects.toStringHelper(this) - .add("taskInfo", taskInfo) - .add("vetoes", vetoes) - .toString(); - } - } - /** - * Tries to match a task against an offer. If a match is found, the assigner should - * make the appropriate changes to the task and provide an {@link Assignment} result. + * Tries to match a task against an offer. If a match is found, the assigner makes the + * appropriate changes to the task and requests task launch. * * @param storeProvider Storage provider. - * @param offer The resource offer. * @param resourceRequest The request for resources being scheduled. + * @param groupKey Task group key. * @param taskId Task id to assign. - * @return {@link Assignment} with assignment result. + * @param slaveReservation Slave reservation for a given {@code groupKey}. + * @return Assignment result. */ - Assignment maybeAssign( + boolean maybeAssign( MutableStoreProvider storeProvider, - HostOffer offer, ResourceRequest resourceRequest, - String taskId); + TaskGroupKey groupKey, + String taskId, + Optional<String> slaveReservation); class TaskAssignerImpl implements TaskAssigner { private static final Logger LOG = Logger.getLogger(TaskAssignerImpl.class.getName()); + @VisibleForTesting + static final Optional<String> LAUNCH_FAILED_MSG = + Optional.of("Unknown exception attempting to schedule task."); + + private final AtomicLong launchFailures = Stats.exportLong("assigner_launch_failures"); + private final StateManager stateManager; private final SchedulingFilter filter; private final MesosTaskFactory taskFactory; + private final OfferManager offerManager; @Inject public TaskAssignerImpl( StateManager stateManager, SchedulingFilter filter, - MesosTaskFactory taskFactory) { + MesosTaskFactory taskFactory, + OfferManager offerManager) { this.stateManager = requireNonNull(stateManager); this.filter = requireNonNull(filter); this.taskFactory = requireNonNull(taskFactory); + this.offerManager = requireNonNull(offerManager); } private TaskInfo assign( @@ -225,26 +130,61 @@ public interface TaskAssigner { } @Override - public Assignment maybeAssign( + public boolean maybeAssign( MutableStoreProvider storeProvider, - HostOffer offer, ResourceRequest resourceRequest, - String taskId) { - - Set<Veto> vetoes = filter.filter( - new UnusedResource(ResourceSlot.from(offer.getOffer()), offer.getAttributes()), - resourceRequest); - if (vetoes.isEmpty()) { - return Assignment.success(assign( - storeProvider, - offer.getOffer(), - resourceRequest.getRequestedPorts(), - taskId)); - } else { - LOG.fine("Slave " + offer.getOffer().getHostname() - + " vetoed task " + taskId + ": " + vetoes); - return Assignment.failure(vetoes); + TaskGroupKey groupKey, + String taskId, + Optional<String> slaveReservation) { + + for (HostOffer offer : offerManager.getOffers(groupKey)) { + if (slaveReservation.isPresent() + && !slaveReservation.get().equals(offer.getOffer().getSlaveId().getValue())) { + // Task group has a slave reserved but this offer is for a different slave -> skip. + continue; + } + Set<Veto> vetoes = filter.filter( + new UnusedResource(ResourceSlot.from(offer.getOffer()), offer.getAttributes()), + resourceRequest); + if (vetoes.isEmpty()) { + TaskInfo taskInfo = assign( + storeProvider, + offer.getOffer(), + resourceRequest.getRequestedPorts(), + taskId); + + try { + offerManager.launchTask(offer.getOffer().getId(), taskInfo); + return true; + } catch (OfferManager.LaunchException e) { + LOG.log(Level.WARNING, "Failed to launch task.", e); + launchFailures.incrementAndGet(); + + // The attempt to schedule the task failed, so we need to backpedal on the + // assignment. + // It is in the LOST state and a new task will move to PENDING to replace it. + // Should the state change fail due to storage issues, that's okay. The task will + // time out in the ASSIGNED state and be moved to LOST. + stateManager.changeState( + storeProvider, + taskId, + Optional.of(PENDING), + LOST, + LAUNCH_FAILED_MSG); + return false; + } + } else { + if (Veto.identifyGroup(vetoes) == VetoGroup.STATIC) { + // Never attempt to match this offer/groupKey pair again. + offerManager.banOffer(offer.getOffer().getId(), groupKey); + } + + LOG.fine("Slave " + offer.getOffer().getHostname() + + " vetoed task " + taskId + ": " + vetoes); + return false; + } } + return false; } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java index 04be32e..088a4a6 100644 --- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java @@ -16,7 +16,7 @@ package org.apache.aurora.scheduler.offers; import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; -import com.google.common.base.Function; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.testing.TearDown; @@ -31,32 +31,34 @@ import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; -import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; +import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged; import org.apache.aurora.scheduler.mesos.Driver; import org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl; import org.apache.aurora.scheduler.offers.OfferManager.OfferReturnDelay; -import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; +import org.apache.mesos.Protos; import org.apache.mesos.Protos.TaskInfo; import org.junit.Before; import org.junit.Test; import static org.apache.aurora.gen.MaintenanceMode.DRAINING; import static org.apache.aurora.gen.MaintenanceMode.NONE; -import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class OfferManagerImplTest extends EasyMockTest { private static final Amount<Long, Time> RETURN_DELAY = Amount.of(1L, Time.DAYS); private static final String HOST_A = "HOST_A"; + private static final IHostAttributes HOST_ATTRIBUTES_A = + IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_A)); private static final HostOffer OFFER_A = new HostOffer( Offers.makeOffer("OFFER_A", HOST_A), - IHostAttributes.build(new HostAttributes().setMode(NONE))); + HOST_ATTRIBUTES_A); + private static final Protos.OfferID OFFER_A_ID = OFFER_A.getOffer().getId(); private static final String HOST_B = "HOST_B"; private static final HostOffer OFFER_B = new HostOffer( Offers.makeOffer("OFFER_B", HOST_B), @@ -67,10 +69,10 @@ public class OfferManagerImplTest extends EasyMockTest { IHostAttributes.build(new HostAttributes().setMode(NONE))); private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from( ITaskConfig.build(new TaskConfig().setJob(new JobKey("role", "env", "name")))); + private static final TaskInfo TASK_INFO = TaskInfo.getDefaultInstance(); private Driver driver; private FakeScheduledExecutor clock; - private Function<HostOffer, Assignment> offerAcceptor; private OfferManagerImpl offerManager; @Before @@ -92,7 +94,6 @@ public class OfferManagerImplTest extends EasyMockTest { clock.assertEmpty(); } }); - offerAcceptor = createMock(new Clazz<Function<HostOffer, Assignment>>() { }); OfferReturnDelay returnDelay = new OfferReturnDelay() { @Override public Amount<Long, Time> get() { @@ -109,11 +110,9 @@ public class OfferManagerImplTest extends EasyMockTest { HostOffer offerA = setMode(OFFER_A, DRAINING); HostOffer offerC = setMode(OFFER_C, DRAINING); - TaskInfo task = TaskInfo.getDefaultInstance(); - expect(offerAcceptor.apply(OFFER_B)).andReturn(Assignment.success(task)); - driver.launchTask(OFFER_B.getOffer().getId(), task); + driver.launchTask(OFFER_B.getOffer().getId(), TASK_INFO); - driver.declineOffer(offerA.getOffer().getId()); + driver.declineOffer(OFFER_A_ID); driver.declineOffer(offerC.getOffer().getId()); control.replay(); @@ -121,98 +120,165 @@ public class OfferManagerImplTest extends EasyMockTest { offerManager.addOffer(offerA); offerManager.addOffer(OFFER_B); offerManager.addOffer(offerC); - assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); + assertEquals( + ImmutableSet.of(OFFER_B, offerA, offerC), + ImmutableSet.copyOf(offerManager.getOffers())); + offerManager.launchTask(OFFER_B.getOffer().getId(), TASK_INFO); clock.advance(RETURN_DELAY); } @Test - public void testGetOffersReturnsAllOffers() throws Exception { - expect(offerAcceptor.apply(OFFER_A)) - .andReturn(Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied")))); + public void hostAttributeChangeUpdatesOfferSorting() throws Exception { + driver.declineOffer(OFFER_A_ID); + driver.declineOffer(OFFER_B.getOffer().getId()); control.replay(); + offerManager.hostAttributesChanged(new HostAttributesChanged(HOST_ATTRIBUTES_A)); + offerManager.addOffer(OFFER_A); - assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); - assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers())); + offerManager.addOffer(OFFER_B); + assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getOffers())); - offerManager.cancelOffer(OFFER_A.getOffer().getId()); - assertTrue(Iterables.isEmpty(offerManager.getOffers())); + HostOffer offerA = setMode(OFFER_A, DRAINING); + offerManager.hostAttributesChanged(new HostAttributesChanged(offerA.getAttributes())); + assertEquals(ImmutableSet.of(OFFER_B, offerA), ImmutableSet.copyOf(offerManager.getOffers())); + + offerA = setMode(OFFER_A, NONE); + HostOffer offerB = setMode(OFFER_B, DRAINING); + offerManager.hostAttributesChanged(new HostAttributesChanged(offerA.getAttributes())); + offerManager.hostAttributesChanged(new HostAttributesChanged(offerB.getAttributes())); + assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getOffers())); clock.advance(RETURN_DELAY); } @Test - public void testOfferFilteringDueToStaticBan() throws Exception { - expect(offerAcceptor.apply(OFFER_A)) - .andReturn(Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied")))); + public void testAddSameSlaveOffer() { + driver.declineOffer(OFFER_A_ID); + expectLastCall().times(2); - TaskInfo task = TaskInfo.getDefaultInstance(); - expect(offerAcceptor.apply(OFFER_B)).andReturn(Assignment.success(task)); - driver.launchTask(OFFER_B.getOffer().getId(), task); + control.replay(); - driver.declineOffer(OFFER_A.getOffer().getId()); + offerManager.addOffer(OFFER_A); + offerManager.addOffer(OFFER_A); + + clock.advance(RETURN_DELAY); + } + @Test + public void testGetOffersReturnsAllOffers() throws Exception { control.replay(); offerManager.addOffer(OFFER_A); - assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); - // Run again to make sure all offers are banned (via no expectations set). - assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); + assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers())); - // Add a new offer to accept the task previously banned for OFFER_A. - offerManager.addOffer(OFFER_B); - assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); + offerManager.cancelOffer(OFFER_A_ID); + assertTrue(Iterables.isEmpty(offerManager.getOffers())); clock.advance(RETURN_DELAY); } @Test - public void testStaticBanIsCleared() throws Exception { - expect(offerAcceptor.apply(OFFER_A)) - .andReturn(Assignment.failure(ImmutableSet.of(Veto.insufficientResources("ram", 100)))); + public void testOfferFilteringDueToStaticBan() throws Exception { + driver.declineOffer(OFFER_A_ID); - TaskInfo task = TaskInfo.getDefaultInstance(); - expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(task)); - driver.launchTask(OFFER_A.getOffer().getId(), task); + control.replay(); - expect(offerAcceptor.apply(OFFER_A)) - .andReturn(Assignment.failure(ImmutableSet.of(Veto.maintenance("draining")))); + // Static ban ignored when now offers. + offerManager.banOffer(OFFER_A_ID, GROUP_KEY); + offerManager.addOffer(OFFER_A); + assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY))); - expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(task)); - driver.launchTask(OFFER_A.getOffer().getId(), task); + assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers())); - driver.declineOffer(OFFER_A.getOffer().getId()); + // Add static ban. + offerManager.banOffer(OFFER_A_ID, GROUP_KEY); + assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers())); + assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY))); + + clock.advance(RETURN_DELAY); + } + + @Test + public void testStaticBanIsClearedOnOfferReturn() throws Exception { + driver.declineOffer(OFFER_A_ID); + expectLastCall().times(2); control.replay(); offerManager.addOffer(OFFER_A); - assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); + offerManager.banOffer(OFFER_A_ID, GROUP_KEY); + assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers())); + assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY))); // Make sure the static ban is cleared when the offers are returned. clock.advance(RETURN_DELAY); offerManager.addOffer(OFFER_A); - assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); + assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY))); + + clock.advance(RETURN_DELAY); + } + + @Test + public void testStaticBanIsClearedOnDriverDisconnect() throws Exception { + driver.declineOffer(OFFER_A_ID); + + control.replay(); offerManager.addOffer(OFFER_A); - assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); + offerManager.banOffer(OFFER_A_ID, GROUP_KEY); + assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers())); + assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY))); // Make sure the static ban is cleared when driver is disconnected. offerManager.driverDisconnected(new DriverDisconnected()); offerManager.addOffer(OFFER_A); - assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); + assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY))); clock.advance(RETURN_DELAY); } @Test + public void getOffer() { + driver.declineOffer(OFFER_A_ID); + + control.replay(); + + offerManager.addOffer(OFFER_A); + assertEquals(Optional.of(OFFER_A), offerManager.getOffer(OFFER_A.getOffer().getSlaveId())); + clock.advance(RETURN_DELAY); + } + + @Test(expected = OfferManager.LaunchException.class) + public void testLaunchTaskDriverThrows() throws OfferManager.LaunchException { + driver.launchTask(OFFER_A_ID, TASK_INFO); + expectLastCall().andThrow(new IllegalStateException()); + + control.replay(); + + offerManager.addOffer(OFFER_A); + + try { + offerManager.launchTask(OFFER_A_ID, TASK_INFO); + } finally { + clock.advance(RETURN_DELAY); + } + } + + @Test(expected = OfferManager.LaunchException.class) + public void testLaunchTaskOfferRaceThrows() throws OfferManager.LaunchException { + control.replay(); + offerManager.launchTask(OFFER_A_ID, TASK_INFO); + } + + @Test public void testFlushOffers() throws Exception { control.replay(); offerManager.addOffer(OFFER_A); offerManager.addOffer(OFFER_B); offerManager.driverDisconnected(new DriverDisconnected()); - assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); clock.advance(RETURN_DELAY); } http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java index a2e2d4c..350ec6f 100644 --- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java @@ -24,10 +24,7 @@ import com.twitter.common.stats.StatsProvider; import com.twitter.common.testing.easymock.EasyMockTest; import com.twitter.common.util.Clock; -import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskGroupKey; @@ -37,27 +34,19 @@ import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.events.PubsubEventModule; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; -import org.apache.aurora.scheduler.offers.OfferManager; -import org.apache.aurora.scheduler.offers.Offers; import org.apache.aurora.scheduler.preemptor.BiCache; import org.apache.aurora.scheduler.preemptor.Preemptor; import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl; import org.apache.aurora.scheduler.state.PubsubTestUtil; -import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.state.TaskAssigner; -import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; -import org.apache.aurora.scheduler.state.TaskAssigner.Assignment.Result; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.Storage.MutateWork; import org.apache.aurora.scheduler.storage.db.DbUtil; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.apache.aurora.scheduler.testing.FakeStatsProvider; -import org.apache.mesos.Protos.TaskInfo; -import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.IExpectationSetters; import org.junit.Before; @@ -67,10 +56,8 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.gen.ScheduleStatus.RUNNING; import static org.apache.aurora.gen.ScheduleStatus.THROTTLED; import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; -import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -78,34 +65,22 @@ public class TaskSchedulerImplTest extends EasyMockTest { private static final IScheduledTask TASK_A = TaskTestUtil.makeTask("a", JobKeys.from("a", "a", "a")); - private static final IScheduledTask TASK_B = - TaskTestUtil.makeTask("b", JobKeys.from("b", "b", "b")); - private static final HostOffer OFFER = new HostOffer( - Offers.makeOffer("OFFER_A", "HOST_A"), - IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE))); - - private static final String SLAVE_ID = OFFER.getOffer().getSlaveId().getValue(); - - private static final TaskGroupKey GROUP_A = TaskGroupKey.from(TASK_A.getAssignedTask().getTask()); - private static final TaskGroupKey GROUP_B = TaskGroupKey.from(TASK_B.getAssignedTask().getTask()); + private static final String SLAVE_ID = "HOST_A"; + private static final Optional<String> NO_RESERVATION = Optional.absent(); private StorageTestUtil storageUtil; - private StateManager stateManager; private TaskAssigner assigner; - private OfferManager offerManager; private TaskScheduler scheduler; private Preemptor preemptor; - private BiCache<String, TaskGroupKey> reservations; + private BiCache<TaskGroupKey, String> reservations; private EventSink eventSink; @Before public void setUp() throws Exception { storageUtil = new StorageTestUtil(this); - stateManager = createMock(StateManager.class); assigner = createMock(TaskAssigner.class); - offerManager = createMock(OfferManager.class); preemptor = createMock(Preemptor.class); - reservations = createMock(new Clazz<BiCache<String, TaskGroupKey>>() { }); + reservations = createMock(new Clazz<BiCache<TaskGroupKey, String>>() { }); Injector injector = getInjector(storageUtil.storage); scheduler = injector.getInstance(TaskScheduler.class); @@ -118,11 +93,9 @@ public class TaskSchedulerImplTest extends EasyMockTest { new AbstractModule() { @Override protected void configure() { - bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).toInstance(reservations); + bind(new TypeLiteral<BiCache<TaskGroupKey, String>>() { }).toInstance(reservations); bind(TaskScheduler.class).to(TaskSchedulerImpl.class); bind(Preemptor.class).toInstance(preemptor); - bind(OfferManager.class).toInstance(offerManager); - bind(StateManager.class).toInstance(stateManager); bind(TaskAssigner.class).toInstance(assigner); bind(Clock.class).toInstance(createMock(Clock.class)); bind(StatsProvider.class).toInstance(new FakeStatsProvider()); @@ -138,89 +111,100 @@ public class TaskSchedulerImplTest extends EasyMockTest { ImmutableSet.of(task)); } - private void expectAssigned(IScheduledTask task) { - expect(assigner.maybeAssign( + private IExpectationSetters<Boolean> expectAssigned( + IScheduledTask task, + Optional<String> reservation) { + + return expect(assigner.maybeAssign( storageUtil.mutableStoreProvider, - OFFER, new ResourceRequest(task.getAssignedTask().getTask(), EMPTY), - Tasks.id(task))).andReturn(Assignment.success(TaskInfo.getDefaultInstance())); + TaskGroupKey.from(task.getAssignedTask().getTask()), + Tasks.id(task), + reservation)); + } + + @Test + public void testSchedule() throws Exception { + storageUtil.expectOperations(); + + expectReservationCheck(TASK_A); + expectTaskStillPendingQuery(TASK_A); + expectActiveJobFetch(TASK_A); + expectAssigned(TASK_A, NO_RESERVATION).andReturn(true); + + control.replay(); + + assertTrue(scheduler.schedule("a")); + } + + @Test + public void testScheduleNoTask() throws Exception { + storageUtil.expectOperations(); + storageUtil.expectTaskFetch( + Query.taskScoped(Tasks.id(TASK_A)).byStatus(PENDING), + ImmutableSet.of()); + + control.replay(); + + assertTrue(scheduler.schedule("a")); } @Test public void testReservation() throws Exception { storageUtil.expectOperations(); + // No reservation available in preemptor expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); - expectLaunchAttempt(false); - // Reserve "a" with offerA - expectReservationCheck(TASK_A); + expectAssigned(TASK_A, NO_RESERVATION).andReturn(false); + expectReservationCheck(TASK_A).times(2); + expectPreemptorCall(TASK_A, NO_RESERVATION); + + // Slave is reserved. + expectTaskStillPendingQuery(TASK_A); + expectActiveJobFetch(TASK_A); + expectAssigned(TASK_A, NO_RESERVATION).andReturn(false); + expectReservationCheck(TASK_A).times(2); expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID)); - expectAddReservation(SLAVE_ID, TASK_A); + expectAddReservation(TASK_A, SLAVE_ID); // Use previously created reservation. expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); - expectGetReservation(SLAVE_ID, TASK_A); - expectAssigned(TASK_A); - AssignmentCapture assignment = expectLaunchAttempt(true); + expectGetReservation(TASK_A, SLAVE_ID); + expectAssigned(TASK_A, Optional.of(SLAVE_ID)).andReturn(true); control.replay(); assertFalse(scheduler.schedule("a")); + assertFalse(scheduler.schedule("a")); assertTrue(scheduler.schedule("a")); - assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment); } @Test - public void testReservationExpires() throws Exception { + public void testReservationUnusable() throws Exception { storageUtil.expectOperations(); expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); - expectLaunchAttempt(false); - // Reserve "a" with offerA expectReservationCheck(TASK_A); - expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID)); - expectAddReservation(SLAVE_ID, TASK_A); - - // First attempt -> reservation is active. - expectTaskStillPendingQuery(TASK_B); - expectActiveJobFetch(TASK_B); - AssignmentCapture firstAssignment = expectLaunchAttempt(false); - expectGetReservation(SLAVE_ID, TASK_A); - expectReservationCheck(TASK_B); - expectPreemptorCall(TASK_B, Optional.absent()); - - // Status changed -> reservation removed. - reservations.remove(SLAVE_ID, TaskGroupKey.from(TASK_A.getAssignedTask().getTask())); - - // Second attempt -> reservation expires. - expectGetNoReservation(SLAVE_ID); - expectTaskStillPendingQuery(TASK_B); - expectActiveJobFetch(TASK_B); - AssignmentCapture secondAssignment = expectLaunchAttempt(true); - expectAssigned(TASK_B); + expectAssigned(TASK_A, NO_RESERVATION).andReturn(false); + expectGetReservation(TASK_A, SLAVE_ID); control.replay(); assertFalse(scheduler.schedule("a")); - assertFalse(scheduler.schedule("b")); - assignAndAssert(Result.FAILURE, GROUP_B, OFFER, firstAssignment); - - eventSink.post(TaskStateChange.transition(assign(TASK_A, SLAVE_ID), PENDING)); - assertTrue(scheduler.schedule("b")); - assignAndAssert(Result.SUCCESS, GROUP_B, OFFER, secondAssignment); } @Test - public void testReservationUnusable() throws Exception { + public void testReservationRemoved() throws Exception { storageUtil.expectOperations(); expectTaskStillPendingQuery(TASK_A); - expectLaunchAttempt(false); - expect(reservations.getByValue(TaskGroupKey.from(TASK_A.getAssignedTask().getTask()))) - .andReturn(ImmutableSet.of(SLAVE_ID)); + expectActiveJobFetch(TASK_A); + expectReservationCheck(TASK_A); + expectAssigned(TASK_A, NO_RESERVATION).andReturn(false); + expectGetReservation(TASK_A, SLAVE_ID); control.replay(); @@ -236,17 +220,18 @@ public class TaskSchedulerImplTest extends EasyMockTest { @Test public void testPendingDeletedHandled() throws Exception { + reservations.remove(TaskGroupKey.from(TASK_A.getAssignedTask().getTask()), SLAVE_ID); + control.replay(); - IScheduledTask task = IScheduledTask.build(TASK_A.newBuilder().setStatus(PENDING)); - eventSink.post(TaskStateChange.transition(task, PENDING)); + ScheduledTask taskBuilder = TASK_A.newBuilder().setStatus(PENDING); + taskBuilder.getAssignedTask().setSlaveId(SLAVE_ID); + eventSink.post(TaskStateChange.transition(IScheduledTask.build(taskBuilder), PENDING)); } @Test public void testIgnoresThrottledTasks() throws Exception { - // Ensures that tasks in THROTTLED state are not considered part of the active job state passed - // to the assigner function. - + // Ensures that tasks in THROTTLED state are not considered part of the active job state. Storage memStorage = DbUtil.createStorage(); Injector injector = getInjector(memStorage); @@ -265,23 +250,31 @@ public class TaskSchedulerImplTest extends EasyMockTest { } }); - expectGetNoReservation(SLAVE_ID); - AssignmentCapture assignment = expectLaunchAttempt(true); + expectReservationCheck(TASK_A); expect(assigner.maybeAssign( EasyMock.anyObject(), - eq(OFFER), eq(new ResourceRequest(taskA.getAssignedTask().getTask(), EMPTY)), - eq(Tasks.id(taskA)))).andReturn(Assignment.success(TaskInfo.getDefaultInstance())); + eq(TaskGroupKey.from(taskA.getAssignedTask().getTask())), + eq(Tasks.id(taskA)), + eq(NO_RESERVATION))).andReturn(true); control.replay(); assertTrue(scheduler.schedule(Tasks.id(taskA))); - assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment); } - private static class AssignmentCapture { - public Capture<Function<HostOffer, Assignment>> assigner = createCapture(); - public Capture<TaskGroupKey> groupKey = createCapture(); + @Test + public void testScheduleThrows() throws Exception { + storageUtil.expectOperations(); + + expectReservationCheck(TASK_A); + expectTaskStillPendingQuery(TASK_A); + expectActiveJobFetch(TASK_A); + expectAssigned(TASK_A, NO_RESERVATION).andThrow(new IllegalArgumentException("expected")); + + control.replay(); + + assertFalse(scheduler.schedule("a")); } private void expectPreemptorCall(IScheduledTask task, Optional<String> result) { @@ -291,31 +284,6 @@ public class TaskSchedulerImplTest extends EasyMockTest { storageUtil.mutableStoreProvider)).andReturn(result); } - private AssignmentCapture expectLaunchAttempt(boolean taskLaunched) - throws OfferManager.LaunchException { - - AssignmentCapture capture = new AssignmentCapture(); - expect(offerManager.launchFirst(capture(capture.assigner), capture(capture.groupKey))) - .andReturn(taskLaunched); - return capture; - } - - private IScheduledTask assign(IScheduledTask task, String slaveId) { - ScheduledTask result = task.newBuilder(); - result.getAssignedTask().setSlaveId(slaveId); - return IScheduledTask.build(result); - } - - private void assignAndAssert( - Result result, - TaskGroupKey groupKey, - HostOffer offer, - AssignmentCapture capture) { - - assertEquals(result, capture.assigner.getValue().apply(offer).getResult()); - assertEquals(groupKey, capture.groupKey.getValue()); - } - private void expectActiveJobFetch(IScheduledTask task) { storageUtil.expectTaskFetch( Query.jobScoped(((Function<IScheduledTask, IJobKey>) Tasks::getJob).apply(task)) @@ -323,21 +291,17 @@ public class TaskSchedulerImplTest extends EasyMockTest { ImmutableSet.of()); } - private void expectAddReservation(String slaveId, IScheduledTask task) { - reservations.put(slaveId, TaskGroupKey.from(task.getAssignedTask().getTask())); - } - - private IExpectationSetters<?> expectGetReservation(String slaveId, IScheduledTask task) { - return expect(reservations.get(slaveId)) - .andReturn(Optional.of(TaskGroupKey.from(task.getAssignedTask().getTask()))); + private void expectAddReservation(IScheduledTask task, String slaveId) { + reservations.put(TaskGroupKey.from(task.getAssignedTask().getTask()), slaveId); } - private IExpectationSetters<?> expectGetNoReservation(String slaveId) { - return expect(reservations.get(slaveId)).andReturn(Optional.absent()); + private IExpectationSetters<?> expectGetReservation(IScheduledTask task, String slaveId) { + return expect(reservations.get(TaskGroupKey.from(task.getAssignedTask().getTask()))) + .andReturn(Optional.of(slaveId)); } private IExpectationSetters<?> expectReservationCheck(IScheduledTask task) { - return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask()))) - .andReturn(ImmutableSet.of()); + return expect(reservations.get(TaskGroupKey.from(task.getAssignedTask().getTask()))) + .andReturn(Optional.<String>absent()); } }
