Repository: aurora Updated Branches: refs/heads/master 60e5e4e67 -> f559e9306
Scheduling multiple tasks per round. Reviewed at https://reviews.apache.org/r/51929/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/f559e930 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/f559e930 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/f559e930 Branch: refs/heads/master Commit: f559e930659e25b3d7cacb7b845ebda50d18d66a Parents: 60e5e4e Author: Maxim Khutornenko <[email protected]> Authored: Tue Sep 27 12:27:04 2016 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Tue Sep 27 12:27:04 2016 -0700 ---------------------------------------------------------------------- .../aurora/benchmark/SchedulingBenchmarks.java | 19 +- .../scheduler/filter/AttributeAggregate.java | 41 ++-- .../scheduler/scheduling/SchedulingModule.java | 8 +- .../aurora/scheduler/scheduling/TaskGroup.java | 12 +- .../aurora/scheduler/scheduling/TaskGroups.java | 58 +++--- .../scheduler/scheduling/TaskScheduler.java | 127 +++++++----- .../aurora/scheduler/state/TaskAssigner.java | 41 ++-- .../events/NotifyingSchedulingFilterTest.java | 2 +- .../filter/AttributeAggregateTest.java | 32 +++ .../filter/SchedulingFilterImplTest.java | 18 +- .../scheduler/http/AbstractJettyTest.java | 3 +- .../preemptor/PreemptionVictimFilterTest.java | 4 +- .../scheduler/preemptor/PreemptorImplTest.java | 6 +- .../preemptor/PreemptorModuleTest.java | 2 +- .../scheduler/scheduling/TaskGroupsTest.java | 55 ++++-- .../scheduling/TaskSchedulerImplTest.java | 111 ++++++++--- .../scheduler/state/TaskAssignerImplTest.java | 197 +++++++++++-------- 17 files changed, 475 insertions(+), 261 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java index 6f1cbfb..1b56500 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java @@ -18,6 +18,7 @@ import java.util.concurrent.TimeUnit; import javax.inject.Singleton; +import com.google.common.collect.ImmutableSet; import com.google.common.eventbus.EventBus; import com.google.inject.AbstractModule; import com.google.inject.Guice; @@ -192,7 +193,9 @@ public class SchedulingBenchmarks { saveTasks(tasksToAssign); storage.write((NoResult.Quiet) store -> { for (IScheduledTask scheduledTask : tasksToAssign) { - taskScheduler.schedule(store, scheduledTask.getAssignedTask().getTaskId()); + taskScheduler.schedule( + store, + ImmutableSet.of(scheduledTask.getAssignedTask().getTaskId())); } }); } @@ -220,11 +223,13 @@ public class SchedulingBenchmarks { * See {@see http://openjdk.java.net/projects/code-tools/jmh/} for more info. */ @Benchmark - public boolean runBenchmark() { - return storage.write((Storage.MutateWork.Quiet<Boolean>) store -> { - boolean result = false; + public Set<String> runBenchmark() { + return storage.write((Storage.MutateWork.Quiet<Set<String>>) store -> { + Set<String> result = null; for (IScheduledTask task : settings.getTasks()) { - result = taskScheduler.schedule(store, task.getAssignedTask().getTaskId()); + result = taskScheduler.schedule( + store, + ImmutableSet.of(task.getAssignedTask().getTaskId())); } return result; }); @@ -313,10 +318,10 @@ public class SchedulingBenchmarks { } @Override - public boolean runBenchmark() { + public Set<String> runBenchmark() { pendingTaskProcessor.run(); // Return non-guessable result to satisfy "blackhole" requirement. - return System.currentTimeMillis() % 5 == 0; + return ImmutableSet.of("" + System.currentTimeMillis()); } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java index 87b9e19..f04149e 100644 --- a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java +++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java @@ -27,6 +27,7 @@ import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage.StoreProvider; import org.apache.aurora.scheduler.storage.entities.IAttribute; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -46,7 +47,7 @@ public final class AttributeAggregate { * A mapping from attribute name and value to the count of tasks with that name/value combination. * See doc for {@link #getNumTasksWithAttribute(String, String)} for further details. */ - private final Supplier<Multiset<Pair<String, String>>> aggregate; + private Supplier<Multiset<Pair<String, String>>> aggregate; private AttributeAggregate(Supplier<Multiset<Pair<String, String>>> aggregate) { this.aggregate = Suppliers.memoize(aggregate); @@ -92,25 +93,35 @@ public final class AttributeAggregate { @VisibleForTesting static AttributeAggregate create(Supplier<Iterable<IAttribute>> attributes) { Supplier<Multiset<Pair<String, String>>> aggregator = Suppliers.compose( - attributes1 -> { - ImmutableMultiset.Builder<Pair<String, String>> builder = ImmutableMultiset.builder(); - for (IAttribute attribute : attributes1) { - for (String value : attribute.getValues()) { - builder.add(Pair.of(attribute.getName(), value)); - } - } - - return builder.build(); - }, - attributes - ); + attributes1 -> addAttributes(ImmutableMultiset.builder(), attributes1).build(), + attributes); return new AttributeAggregate(aggregator); } + static ImmutableMultiset.Builder<Pair<String, String>> addAttributes( + ImmutableMultiset.Builder<Pair<String, String>> builder, + Iterable<IAttribute> attributes) { + + for (IAttribute attribute : attributes) { + for (String value : attribute.getValues()) { + builder.add(Pair.of(attribute.getName(), value)); + } + } + return builder; + } + + public void updateAttributeAggregate(IHostAttributes attributes) { + ImmutableMultiset.Builder<Pair<String, String>> builder = new ImmutableMultiset.Builder<>(); + builder.addAll(aggregate.get()); + addAttributes(builder, attributes.getAttributes()); + aggregate = Suppliers.memoize(() -> builder.build()); + } + @VisibleForTesting - public static final AttributeAggregate EMPTY = - new AttributeAggregate(Suppliers.ofInstance(ImmutableMultiset.of())); + public static AttributeAggregate empty() { + return new AttributeAggregate(Suppliers.ofInstance(ImmutableMultiset.of())); + } /** * Gets the total number of tasks with a given attribute name and value combination. http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java index 664bc6c..03a0e84 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java @@ -90,6 +90,11 @@ public class SchedulingModule extends AbstractModule { help = "The maximum number of scheduling attempts that can be processed in a batch.") private static final Arg<Integer> SCHEDULING_MAX_BATCH_SIZE = Arg.create(3); + @Positive + @CmdLine(name = "max_tasks_per_schedule_attempt", + help = "The maximum number of tasks to pick in a single scheduling attempt.") + private static final Arg<Integer> MAX_TASKS_PER_SCHEDULE_ATTEMPT = Arg.create(5); + @Override protected void configure() { install(new PrivateModule() { @@ -100,7 +105,8 @@ public class SchedulingModule extends AbstractModule { new TruncatedBinaryBackoff( INITIAL_SCHEDULE_PENALTY.get(), MAX_SCHEDULE_PENALTY.get()), - RateLimiter.create(MAX_SCHEDULE_ATTEMPTS_PER_SEC.get()))); + RateLimiter.create(MAX_SCHEDULE_ATTEMPTS_PER_SEC.get()), + MAX_TASKS_PER_SCHEDULE_ATTEMPT.get())); bind(RescheduleCalculatorImpl.RescheduleCalculatorSettings.class) .toInstance(new RescheduleCalculatorImpl.RescheduleCalculatorSettings( http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java index 5d31955..b521620 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java @@ -13,15 +13,17 @@ */ package org.apache.aurora.scheduler.scheduling; +import java.util.Collection; import java.util.Queue; import java.util.Set; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.aurora.scheduler.base.TaskGroupKey; +import static org.apache.aurora.GuavaUtils.toImmutableSet; + /** * A group of task IDs that are eligible for scheduling, but may be waiting for a backoff to expire. */ @@ -41,16 +43,16 @@ class TaskGroup { return key; } - synchronized Optional<String> peek() { - return Optional.fromNullable(tasks.peek()); + synchronized Set<String> peek(int maxTasks) { + return tasks.stream().limit(Math.min(tasks.size(), maxTasks)).collect(toImmutableSet()); } synchronized boolean hasMore() { return !tasks.isEmpty(); } - synchronized void remove(String taskId) { - tasks.remove(taskId); + synchronized void remove(Collection<String> taskIdsToRemove) { + tasks.removeAll(taskIdsToRemove); } synchronized void offer(String taskId) { http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java index d390c07..77187bc 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java @@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.scheduling; import java.lang.annotation.Retention; import java.lang.annotation.Target; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -23,7 +24,6 @@ import javax.inject.Inject; import javax.inject.Qualifier; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; @@ -71,11 +71,10 @@ public class TaskGroups implements EventSubscriber { private final ConcurrentMap<TaskGroupKey, TaskGroup> groups = Maps.newConcurrentMap(); private final DelayExecutor executor; + private final TaskGroupsSettings settings; private final TaskScheduler taskScheduler; - private final long firstScheduleDelay; - private final BackoffStrategy backoff; private final RescheduleCalculator rescheduleCalculator; - private final BatchWorker<Boolean> batchWorker; + private final BatchWorker<Set<String>> batchWorker; // Track the penalties of tasks at the time they were scheduled. This is to provide data that // may influence the selection of a different backoff strategy. @@ -91,7 +90,7 @@ public class TaskGroups implements EventSubscriber { public @interface SchedulingMaxBatchSize { } @VisibleForTesting - public static class TaskGroupBatchWorker extends BatchWorker<Boolean> { + public static class TaskGroupBatchWorker extends BatchWorker<Set<String>> { @Inject TaskGroupBatchWorker( Storage storage, @@ -111,15 +110,20 @@ public class TaskGroups implements EventSubscriber { private final Amount<Long, Time> firstScheduleDelay; private final BackoffStrategy taskGroupBackoff; private final RateLimiter rateLimiter; + private final int maxTasksPerSchedule; public TaskGroupsSettings( Amount<Long, Time> firstScheduleDelay, BackoffStrategy taskGroupBackoff, - RateLimiter rateLimiter) { + RateLimiter rateLimiter, + int maxTasksPerSchedule) { this.firstScheduleDelay = requireNonNull(firstScheduleDelay); + Preconditions.checkArgument(firstScheduleDelay.getValue() > 0); this.taskGroupBackoff = requireNonNull(taskGroupBackoff); this.rateLimiter = requireNonNull(rateLimiter); + this.maxTasksPerSchedule = maxTasksPerSchedule; + Preconditions.checkArgument(maxTasksPerSchedule > 0); } } @@ -131,21 +135,11 @@ public class TaskGroups implements EventSubscriber { RescheduleCalculator rescheduleCalculator, TaskGroupBatchWorker batchWorker) { - requireNonNull(settings.firstScheduleDelay); - Preconditions.checkArgument(settings.firstScheduleDelay.getValue() > 0); - this.executor = requireNonNull(executor); - requireNonNull(settings.rateLimiter); - requireNonNull(taskScheduler); - this.firstScheduleDelay = settings.firstScheduleDelay.as(Time.MILLISECONDS); - this.backoff = requireNonNull(settings.taskGroupBackoff); + this.settings = requireNonNull(settings); + this.taskScheduler = requireNonNull(taskScheduler); this.rescheduleCalculator = requireNonNull(rescheduleCalculator); this.batchWorker = requireNonNull(batchWorker); - - this.taskScheduler = (store, taskId) -> { - settings.rateLimiter.acquire(); - return taskScheduler.schedule(store, taskId); - }; } private synchronized void evaluateGroupLater(Runnable evaluate, TaskGroup group) { @@ -162,27 +156,29 @@ public class TaskGroups implements EventSubscriber { Runnable monitor = new Runnable() { @Override public void run() { - final Optional<String> taskId = group.peek(); + final Set<String> taskIds = group.peek(settings.maxTasksPerSchedule); long penaltyMs = 0; - if (taskId.isPresent()) { - CompletableFuture<Boolean> result = batchWorker.execute(storeProvider -> - taskScheduler.schedule(storeProvider, taskId.get())); - boolean isScheduled = false; + if (!taskIds.isEmpty()) { + settings.rateLimiter.acquire(); + CompletableFuture<Set<String>> result = batchWorker.execute(storeProvider -> + taskScheduler.schedule(storeProvider, taskIds)); + + Set<String> scheduled = null; try { - isScheduled = result.get(); + scheduled = result.get(); } catch (ExecutionException | InterruptedException e) { Thread.currentThread().interrupt(); Throwables.propagate(e); } - if (isScheduled) { + if (scheduled.isEmpty()) { + penaltyMs = settings.taskGroupBackoff.calculateBackoffMs(group.getPenaltyMs()); + } else { scheduledTaskPenalties.accumulate(group.getPenaltyMs()); - group.remove(taskId.get()); + group.remove(scheduled); if (group.hasMore()) { - penaltyMs = firstScheduleDelay; + penaltyMs = settings.firstScheduleDelay.as(Time.MILLISECONDS); } - } else { - penaltyMs = backoff.calculateBackoffMs(group.getPenaltyMs()); } } @@ -211,7 +207,7 @@ public class TaskGroups implements EventSubscriber { if (existing == null) { long penaltyMs; if (stateChange.isTransition()) { - penaltyMs = firstScheduleDelay; + penaltyMs = settings.firstScheduleDelay.as(Time.MILLISECONDS); } else { penaltyMs = rescheduleCalculator.getStartupScheduleDelayMs(task); } @@ -234,7 +230,7 @@ public class TaskGroups implements EventSubscriber { : Iterables.transform(deleted.getTasks(), IScheduledTask::getAssignedTask)) { TaskGroup group = groups.get(TaskGroupKey.from(task.getTask())); if (group != null) { - group.remove(task.getTaskId()); + group.remove(ImmutableSet.of(task.getTaskId())); } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java index 207d38d..31edb1d 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java @@ -15,14 +15,21 @@ package org.apache.aurora.scheduler.scheduling; import java.lang.annotation.Retention; import java.lang.annotation.Target; +import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import javax.inject.Inject; import javax.inject.Qualifier; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; import com.google.common.eventbus.Subscribe; import org.apache.aurora.common.inject.TimedInterceptor.Timed; @@ -51,6 +58,8 @@ import static java.lang.annotation.ElementType.PARAMETER; import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toMap; + import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources; @@ -63,11 +72,11 @@ public interface TaskScheduler extends EventSubscriber { * Attempts to schedule a task, possibly performing irreversible actions. * * @param storeProvider {@code MutableStoreProvider} instance to access data store. - * @param taskId The task to attempt to schedule. - * @return {@code true} if the task was scheduled, {@code false} otherwise. The caller should - * call schedule again if {@code false} is returned. + * @param taskIds The tasks to attempt to schedule. + * @return Successfully scheduled task IDs. The caller should call schedule again if a given + * task ID was not present in the result. */ - boolean schedule(MutableStoreProvider storeProvider, String taskId); + Set<String> schedule(MutableStoreProvider storeProvider, Iterable<String> taskIds); /** * An asynchronous task scheduler. Scheduling of tasks is performed on a delay, where each task @@ -109,62 +118,84 @@ public interface TaskScheduler extends EventSubscriber { } @Timed ("task_schedule_attempt") - public boolean schedule(MutableStoreProvider store, String taskId) { - attemptsFired.incrementAndGet(); + public Set<String> schedule(MutableStoreProvider store, Iterable<String> taskIds) { try { - return scheduleTask(store, taskId); + return scheduleTasks(store, taskIds); } catch (RuntimeException e) { // We catch the generic unchecked exception here to ensure tasks are not abandoned // if there is a transient issue resulting in an unchecked exception. LOG.warn("Task scheduling unexpectedly failed, will be retried", e); attemptsFailed.incrementAndGet(); - return false; + // Return empty set for all task IDs to be retried later. + // It's ok if some tasks were already assigned, those will be ignored in the next round. + return ImmutableSet.of(); } } - private boolean scheduleTask(MutableStoreProvider store, String taskId) { - LOG.debug("Attempting to schedule task " + taskId); - IAssignedTask assignedTask = Iterables.getOnlyElement( - Iterables.transform( - store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)), - IScheduledTask::getAssignedTask), - null); - - if (assignedTask == null) { - LOG.warn("Failed to look up task " + taskId + ", it may have been deleted."); - } else { - ITaskConfig task = assignedTask.getTask(); - AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob()); - - // Valid Docker tasks can have a container but no executor config - ResourceBag overhead = ResourceBag.EMPTY; - if (task.isSetExecutorConfig()) { - overhead = executorSettings.getExecutorOverhead(task.getExecutorConfig().getName()) - .orElseThrow( - () -> new IllegalArgumentException("Cannot find executor configuration")); - } + private Set<String> scheduleTasks(MutableStoreProvider store, Iterable<String> tasks) { + ImmutableSet<String> taskIds = ImmutableSet.copyOf(tasks); + String taskIdValues = Joiner.on(",").join(taskIds); + LOG.debug("Attempting to schedule tasks " + taskIdValues); + ImmutableSet<IAssignedTask> assignedTasks = + ImmutableSet.copyOf(Iterables.transform( + store.getTaskStore().fetchTasks(Query.taskScoped(taskIds).byStatus(PENDING)), + IScheduledTask::getAssignedTask)); + + if (Iterables.isEmpty(assignedTasks)) { + LOG.warn("Failed to look up all tasks in a scheduling round: " + taskIdValues); + return taskIds; + } - boolean launched = assigner.maybeAssign( - store, - new ResourceRequest( - task, - bagFromResources(task.getResources()).add(overhead), aggregate), - TaskGroupKey.from(task), - taskId, - reservations.asMap()); - - if (!launched) { - // Task could not be scheduled. - // TODO(maxim): Now that preemption slots are searched asynchronously, consider - // retrying a launch attempt within the current scheduling round IFF a reservation is - // available. - maybePreemptFor(assignedTask, aggregate, store); - attemptsNoMatch.incrementAndGet(); - return false; - } + Preconditions.checkState( + assignedTasks.stream() + .collect(Collectors.groupingBy(t -> t.getTask())) + .entrySet() + .size() == 1, + "Found multiple task groups for " + taskIdValues); + + Map<String, IAssignedTask> assignableTaskMap = + assignedTasks.stream().collect(toMap(t -> t.getTaskId(), t -> t)); + + if (taskIds.size() != assignedTasks.size()) { + LOG.warn("Failed to look up tasks " + + Joiner.on(", ").join(Sets.difference(taskIds, assignableTaskMap.keySet()))); + } + + // This is safe after all checks above. + ITaskConfig task = assignedTasks.stream().findFirst().get().getTask(); + AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob()); + + // Valid Docker tasks can have a container but no executor config + ResourceBag overhead = ResourceBag.EMPTY; + if (task.isSetExecutorConfig()) { + overhead = executorSettings.getExecutorOverhead(task.getExecutorConfig().getName()) + .orElseThrow( + () -> new IllegalArgumentException("Cannot find executor configuration")); } - return true; + Set<String> launched = assigner.maybeAssign( + store, + new ResourceRequest( + task, + bagFromResources(task.getResources()).add(overhead), aggregate), + TaskGroupKey.from(task), + assignableTaskMap.keySet(), + reservations.asMap()); + + attemptsFired.addAndGet(assignableTaskMap.size()); + Set<String> failedToLaunch = Sets.difference(assignableTaskMap.keySet(), launched); + + failedToLaunch.forEach(taskId -> { + // Task could not be scheduled. + // TODO(maxim): Now that preemption slots are searched asynchronously, consider + // retrying a launch attempt within the current scheduling round IFF a reservation is + // available. + maybePreemptFor(assignableTaskMap.get(taskId), aggregate, store); + }); + attemptsNoMatch.addAndGet(failedToLaunch.size()); + + // Return all successfully launched tasks as well as those weren't tried (not in PENDING). + return Sets.union(launched, Sets.difference(taskIds, assignableTaskMap.keySet())); } private void maybePreemptFor( http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java index 7f7b435..4c61762 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java +++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java @@ -13,6 +13,7 @@ */ package org.apache.aurora.scheduler.state; +import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -21,6 +22,8 @@ import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import org.apache.aurora.common.inject.TimedInterceptor.Timed; import org.apache.aurora.common.stats.Stats; @@ -60,15 +63,15 @@ public interface TaskAssigner { * @param storeProvider Storage provider. * @param resourceRequest The request for resources being scheduled. * @param groupKey Task group key. - * @param taskId Task id to assign. + * @param taskIds Task IDs to assign. * @param slaveReservations Slave reservations. - * @return Assignment result. + * @return Successfully assigned task IDs. */ - boolean maybeAssign( + Set<String> maybeAssign( MutableStoreProvider storeProvider, ResourceRequest resourceRequest, TaskGroupKey groupKey, - String taskId, + Iterable<String> taskIds, Map<String, TaskGroupKey> slaveReservations); class TaskAssignerImpl implements TaskAssigner { @@ -132,13 +135,22 @@ public interface TaskAssigner { @Timed("assigner_maybe_assign") @Override - public boolean maybeAssign( + public Set<String> maybeAssign( MutableStoreProvider storeProvider, ResourceRequest resourceRequest, TaskGroupKey groupKey, - String taskId, + Iterable<String> taskIds, Map<String, TaskGroupKey> slaveReservations) { + if (Iterables.isEmpty(taskIds)) { + return ImmutableSet.of(); + } + + TierInfo tierInfo = tierManager.getTier(groupKey.getTask()); + ImmutableSet.Builder<String> assignmentResult = ImmutableSet.builder(); + Iterator<String> remainingTasks = taskIds.iterator(); + String taskId = remainingTasks.next(); + for (HostOffer offer : offerManager.getOffers(groupKey)) { Optional<TaskGroupKey> reservedGroup = Optional.fromNullable( slaveReservations.get(offer.getOffer().getSlaveId().getValue())); @@ -148,7 +160,6 @@ public interface TaskAssigner { continue; } - TierInfo tierInfo = tierManager.getTier(groupKey.getTask()); Set<Veto> vetoes = filter.filter( new UnusedResource(offer.getResourceBag(tierInfo), offer.getAttributes()), resourceRequest); @@ -159,9 +170,17 @@ public interface TaskAssigner { offer.getOffer(), taskId); + resourceRequest.getJobState().updateAttributeAggregate(offer.getAttributes()); + try { offerManager.launchTask(offer.getOffer().getId(), taskInfo); - return true; + assignmentResult.add(taskId); + + if (remainingTasks.hasNext()) { + taskId = remainingTasks.next(); + } else { + break; + } } catch (OfferManager.LaunchException e) { LOG.warn("Failed to launch task.", e); launchFailures.incrementAndGet(); @@ -177,19 +196,19 @@ public interface TaskAssigner { Optional.of(PENDING), LOST, LAUNCH_FAILED_MSG); - return false; + break; } } else { if (Veto.identifyGroup(vetoes) == VetoGroup.STATIC) { // Never attempt to match this offer/groupKey pair again. offerManager.banOffer(offer.getOffer().getId(), groupKey); } - LOG.debug("Agent " + offer.getOffer().getHostname() + " vetoed task " + taskId + ": " + vetoes); } } - return false; + + return assignmentResult.build(); } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java index ece476b..b759427 100644 --- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java +++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java @@ -49,7 +49,7 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest { ResourceManager.bagFromResources(TASK.getResources()), IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE))); private static final ResourceRequest REQUEST = - new ResourceRequest(TASK, ResourceBag.EMPTY, AttributeAggregate.EMPTY); + new ResourceRequest(TASK, ResourceBag.EMPTY, AttributeAggregate.empty()); private static final Veto VETO_1 = Veto.insufficientResources("ram", 1); private static final Veto VETO_2 = Veto.insufficientResources("ram", 2); http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java b/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java index 209f929..7496a70 100644 --- a/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java +++ b/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java @@ -133,6 +133,38 @@ public class AttributeAggregateTest extends EasyMockTest { assertAggregate(aggregate, "hostc", "2", 0L); } + @Test + public void testUpdateAttributeAggregate() { + expectGetAttributes( + "a1", + attribute("host", "a1"), + attribute("rack", "a"), + attribute("pdu", "p1")); + + control.replay(); + + Multiset<Pair<String, String>> expected = ImmutableMultiset.<Pair<String, String>>builder() + .add(Pair.of("rack", "a")) + .add(Pair.of("host", "a1")) + .add(Pair.of("pdu", "p1")) + .build(); + + AttributeAggregate aggregate = aggregate(task("1", "a1")); + assertEquals(expected, aggregate.getAggregates()); + + aggregate.updateAttributeAggregate(IHostAttributes.build(new HostAttributes() + .setHost("a2") + .setAttributes(ImmutableSet.of(attribute("host", "a2"), attribute("rack", "b"))))); + + expected = ImmutableMultiset.<Pair<String, String>>builder() + .addAll(expected) + .add(Pair.of("rack", "b")) + .add(Pair.of("host", "a2")) + .build(); + + assertEquals(expected, aggregate.getAggregates()); + } + private AttributeAggregate aggregate(IScheduledTask... activeTasks) { return AttributeAggregate.create( Suppliers.ofInstance(ImmutableSet.copyOf(activeTasks)), http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java index 0cf23df..1d7f9f4 100644 --- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java @@ -53,7 +53,7 @@ import static org.apache.aurora.gen.Resource.diskMb; import static org.apache.aurora.gen.Resource.numCpus; import static org.apache.aurora.gen.Resource.ramMb; import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE; -import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty; import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources; import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange; import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar; @@ -128,22 +128,22 @@ public class SchedulingFilterImplTest extends EasyMockTest { none, defaultFilter.filter( new UnusedResource(DEFAULT_OFFER, hostA), - new ResourceRequest(noPortTask, bag(noPortTask), EMPTY))); + new ResourceRequest(noPortTask, bag(noPortTask), empty()))); assertEquals( none, defaultFilter.filter( new UnusedResource(DEFAULT_OFFER, hostA), - new ResourceRequest(onePortTask, bag(onePortTask), EMPTY))); + new ResourceRequest(onePortTask, bag(onePortTask), empty()))); assertEquals( none, defaultFilter.filter( new UnusedResource(DEFAULT_OFFER, hostA), - new ResourceRequest(twoPortTask, bag(twoPortTask), EMPTY))); + new ResourceRequest(twoPortTask, bag(twoPortTask), empty()))); assertEquals( ImmutableSet.of(veto(PORTS, 1)), defaultFilter.filter( new UnusedResource(DEFAULT_OFFER, hostA), - new ResourceRequest(threePortTask, bag(threePortTask), EMPTY))); + new ResourceRequest(threePortTask, bag(threePortTask), empty()))); } @Test @@ -409,7 +409,7 @@ public class SchedulingFilterImplTest extends EasyMockTest { ImmutableSet.of(), defaultFilter.filter( new UnusedResource(DEFAULT_OFFER, hostA), - new ResourceRequest(task, bag(task), EMPTY))); + new ResourceRequest(task, bag(task), empty()))); Constraint jvmNegated = jvmConstraint.deepCopy(); jvmNegated.getConstraint().getValue().setNegated(true); @@ -499,7 +499,7 @@ public class SchedulingFilterImplTest extends EasyMockTest { return checkConstraint( job, - EMPTY, + empty(), hostAttributes, constraintName, expected, @@ -537,7 +537,7 @@ public class SchedulingFilterImplTest extends EasyMockTest { } private void assertNoVetoes(ITaskConfig task, IHostAttributes hostAttributes) { - assertVetoes(task, hostAttributes, EMPTY); + assertVetoes(task, hostAttributes, empty()); } private void assertNoVetoes( @@ -549,7 +549,7 @@ public class SchedulingFilterImplTest extends EasyMockTest { } private void assertVetoes(ITaskConfig task, IHostAttributes hostAttributes, Veto... vetoes) { - assertVetoes(task, hostAttributes, EMPTY, vetoes); + assertVetoes(task, hostAttributes, empty(), vetoes); } private void assertVetoes( http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java index c1c3eca..fb03f25 100644 --- a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java +++ b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java @@ -120,7 +120,8 @@ public abstract class AbstractJettyTest extends EasyMockTest { new TaskGroupsSettings( Amount.of(1L, Time.MILLISECONDS), bindMock(BackoffStrategy.class), - RateLimiter.create(1000))); + RateLimiter.create(1000), + 5)); bind(ServiceGroupMonitor.class).toInstance(serviceGroupMonitor); bindMock(CronJobManager.class); bindMock(LockManager.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java index ee5c652..64da234 100644 --- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java @@ -65,7 +65,7 @@ import static org.apache.aurora.gen.ScheduleStatus.RUNNING; import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER; import static org.apache.aurora.scheduler.base.TaskTestUtil.PREFERRED_TIER; import static org.apache.aurora.scheduler.base.TaskTestUtil.REVOCABLE_TIER; -import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty; import static org.apache.aurora.scheduler.preemptor.PreemptionVictimFilter.PreemptionVictimFilterImpl.ORDER; import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME; import static org.apache.aurora.scheduler.resources.ResourceTestUtil.bag; @@ -129,7 +129,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { return filter.filterPreemptionVictims( ITaskConfig.build(pendingTask.getAssignedTask().getTask()), preemptionVictims(victims), - EMPTY, + empty(), offer, storageUtil.mutableStoreProvider); } http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java index 98048fa..40c42b1 100644 --- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java @@ -45,7 +45,7 @@ import org.junit.Before; import org.junit.Test; import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty; import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.slotValidationStatName; import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.successStatName; import static org.easymock.EasyMock.anyObject; @@ -133,7 +133,7 @@ public class PreemptorImplTest extends EasyMockTest { } private Optional<String> callPreemptor() { - return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), EMPTY, storeProvider); + return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), empty(), storeProvider); } private void expectSlotValidation( @@ -143,7 +143,7 @@ public class PreemptorImplTest extends EasyMockTest { expect(preemptionVictimFilter.filterPreemptionVictims( TASK.getAssignedTask().getTask(), slot.getVictims(), - EMPTY, + empty(), Optional.of(OFFER), storeProvider)).andReturn(victims); } http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java index 2c3e5f3..67b6d69 100644 --- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java @@ -78,7 +78,7 @@ public class PreemptorModuleTest extends EasyMockTest { Optional.absent(), injector.getInstance(Preemptor.class).attemptPreemptionFor( IAssignedTask.build(new AssignedTask()), - AttributeAggregate.EMPTY, + AttributeAggregate.empty(), storageUtil.mutableStoreProvider)); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java index 8872962..566e0d9 100644 --- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java @@ -13,6 +13,8 @@ */ package org.apache.aurora.scheduler.scheduling; +import java.util.Set; + import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.RateLimiter; @@ -27,8 +29,8 @@ import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.events.PubsubEvent; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; -import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted; import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupBatchWorker; import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupsSettings; import org.apache.aurora.scheduler.storage.entities.IJobKey; @@ -50,6 +52,7 @@ public class TaskGroupsTest extends EasyMockTest { private static final Amount<Long, Time> RESCHEDULE_DELAY = FIRST_SCHEDULE_DELAY; private static final IJobKey JOB_A = IJobKey.build(new JobKey("role", "test", "jobA")); private static final String TASK_A_ID = "a"; + private static final Set<String> SCHEDULED_RESULT = ImmutableSet.of(TASK_A_ID); private BackoffStrategy backoffStrategy; private TaskScheduler taskScheduler; @@ -73,7 +76,7 @@ public class TaskGroupsTest extends EasyMockTest { batchWorker = createMock(TaskGroupBatchWorker.class); taskGroups = new TaskGroups( executor, - new TaskGroupsSettings(FIRST_SCHEDULE_DELAY, backoffStrategy, rateLimiter), + new TaskGroupsSettings(FIRST_SCHEDULE_DELAY, backoffStrategy, rateLimiter, 2), taskScheduler, rescheduleCalculator, batchWorker); @@ -82,8 +85,10 @@ public class TaskGroupsTest extends EasyMockTest { @Test public void testEvaluatedAfterFirstSchedulePenalty() throws Exception { expect(rateLimiter.acquire()).andReturn(0D); - expect(taskScheduler.schedule(anyObject(), eq(TASK_A_ID))).andReturn(true); - expectBatchExecute(batchWorker, storageUtil.storage, control, true).anyTimes(); + expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of(TASK_A_ID)))) + .andReturn(SCHEDULED_RESULT); + expectBatchExecute(batchWorker, storageUtil.storage, control, SCHEDULED_RESULT) + .anyTimes(); control.replay(); @@ -95,15 +100,17 @@ public class TaskGroupsTest extends EasyMockTest { public void testTaskDeletedBeforeEvaluating() throws Exception { final IScheduledTask task = makeTask(TASK_A_ID); expect(rateLimiter.acquire()).andReturn(0D); - expect(taskScheduler.schedule(anyObject(), eq(Tasks.id(task)))).andAnswer(() -> { - // Test a corner case where a task is deleted while it is being evaluated by the task - // scheduler. If not handled carefully, this could result in the scheduler trying again - // later to satisfy the deleted task. - taskGroups.tasksDeleted(new TasksDeleted(ImmutableSet.of(task))); - - return false; - }); - expectBatchExecute(batchWorker, storageUtil.storage, control, false).anyTimes(); + expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of(TASK_A_ID)))) + .andAnswer(() -> { + // Test a corner case where a task is deleted while it is being evaluated by the task + // scheduler. If not handled carefully, this could result in the scheduler trying again + // later to satisfy the deleted task. + taskGroups.tasksDeleted(new PubsubEvent.TasksDeleted(ImmutableSet.of(task))); + + return ImmutableSet.of(); + }); + expectBatchExecute(batchWorker, storageUtil.storage, control, ImmutableSet.of()) + .anyTimes(); expect(backoffStrategy.calculateBackoffMs(FIRST_SCHEDULE_DELAY.as(Time.MILLISECONDS))) .andReturn(0L); @@ -117,8 +124,10 @@ public class TaskGroupsTest extends EasyMockTest { public void testEvaluatedOnStartup() throws Exception { expect(rateLimiter.acquire()).andReturn(0D); expect(rescheduleCalculator.getStartupScheduleDelayMs(makeTask(TASK_A_ID))).andReturn(1L); - expect(taskScheduler.schedule(anyObject(), eq(TASK_A_ID))).andReturn(true); - expectBatchExecute(batchWorker, storageUtil.storage, control, true).anyTimes(); + expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of(TASK_A_ID)))) + .andReturn(ImmutableSet.of(TASK_A_ID)); + expectBatchExecute(batchWorker, storageUtil.storage, control, SCHEDULED_RESULT) + .anyTimes(); control.replay(); @@ -128,11 +137,19 @@ public class TaskGroupsTest extends EasyMockTest { } @Test - public void testResistStarvation() throws Exception { + public void testMultipleTasksAndResistStarvation() throws Exception { expect(rateLimiter.acquire()).andReturn(0D).times(2); - expect(taskScheduler.schedule(anyObject(), eq("a0"))).andReturn(true); - expect(taskScheduler.schedule(anyObject(), eq("b0"))).andReturn(true); - expectBatchExecute(batchWorker, storageUtil.storage, control, true).anyTimes(); + expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of("a0", "a1")))) + .andReturn(ImmutableSet.of("a0", "a1")); + expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of("b0")))) + .andReturn(ImmutableSet.of("b0")); + expectBatchExecute( + batchWorker, + storageUtil.storage, + control, + ImmutableSet.of("a0", "a1")).anyTimes(); + expectBatchExecute(batchWorker, storageUtil.storage, control, ImmutableSet.of("b0")) + .anyTimes(); control.replay(); http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java index a4e87d2..fa1a8178 100644 --- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java @@ -14,6 +14,7 @@ package org.apache.aurora.scheduler.scheduling; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; import com.google.common.base.Function; @@ -63,21 +64,23 @@ import org.junit.Test; import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.gen.ScheduleStatus.RUNNING; import static org.apache.aurora.gen.ScheduleStatus.THROTTLED; -import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty; import static org.apache.aurora.scheduler.mesos.TestExecutorSettings.THERMOS_EXECUTOR; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; public class TaskSchedulerImplTest extends EasyMockTest { - + private static final String TASK_ID = "a"; private static final IScheduledTask TASK_A = - TaskTestUtil.makeTask("a", JobKeys.from("a", "a", "a")); + TaskTestUtil.makeTask(TASK_ID, JobKeys.from("a", "a", "a")); private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK_A.getAssignedTask().getTask()); private static final String SLAVE_ID = "HOST_A"; private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of(); + private static final ImmutableSet<String> SINGLE_TASK = ImmutableSet.of(TASK_ID); + private static final Set<String> SCHEDULED_RESULT = ImmutableSet.of(TASK_ID); + private static final Set<String> NOT_SCHEDULED_RESULT = ImmutableSet.of(); private StorageTestUtil storageUtil; private TaskAssigner assigner; @@ -134,15 +137,15 @@ public class TaskSchedulerImplTest extends EasyMockTest { .getName()).get()); } - private IExpectationSetters<Boolean> expectAssigned( + private IExpectationSetters<Set<String>> expectAssigned( IScheduledTask task, Map<String, TaskGroupKey> reservationMap) { return expect(assigner.maybeAssign( storageUtil.mutableStoreProvider, - new ResourceRequest(task.getAssignedTask().getTask(), bag(task), EMPTY), + new ResourceRequest(task.getAssignedTask().getTask(), bag(task), empty()), TaskGroupKey.from(task.getAssignedTask().getTask()), - Tasks.id(task), + ImmutableSet.of(Tasks.id(task)), reservationMap)); } @@ -153,11 +156,13 @@ public class TaskSchedulerImplTest extends EasyMockTest { expectAsMap(NO_RESERVATION); expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); - expectAssigned(TASK_A, NO_RESERVATION).andReturn(true); + expectAssigned(TASK_A, NO_RESERVATION).andReturn(SCHEDULED_RESULT); control.replay(); - assertTrue(scheduler.schedule(storageUtil.mutableStoreProvider, "a")); + assertEquals( + SCHEDULED_RESULT, + scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK)); } @Test @@ -169,7 +174,45 @@ public class TaskSchedulerImplTest extends EasyMockTest { control.replay(); - assertTrue(scheduler.schedule(storageUtil.mutableStoreProvider, "a")); + assertEquals( + SCHEDULED_RESULT, + scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK)); + } + + @Test + public void testSchedulePartial() throws Exception { + storageUtil.expectOperations(); + + String taskB = "b"; + expectAsMap(NO_RESERVATION); + storageUtil.expectTaskFetch( + Query.taskScoped(Tasks.id(TASK_A), taskB).byStatus(PENDING), + ImmutableSet.of(TASK_A)); + expectActiveJobFetch(TASK_A); + expectAssigned(TASK_A, NO_RESERVATION).andReturn(SCHEDULED_RESULT); + + control.replay(); + + // Task b should be returned as well to be purged from its TaskGroup. + assertEquals( + ImmutableSet.of(TASK_ID, taskB), + scheduler.schedule(storageUtil.mutableStoreProvider, ImmutableSet.of(TASK_ID, taskB))); + } + + @Test + public void testMultipleGroupsRejected() { + storageUtil.expectOperations(); + + String taskB = "b"; + storageUtil.expectTaskFetch( + Query.taskScoped(Tasks.id(TASK_A), taskB).byStatus(PENDING), + ImmutableSet.of(TASK_A, TaskTestUtil.makeTask(taskB, JobKeys.from("b", "b", "b")))); + + control.replay(); + + assertEquals( + NOT_SCHEDULED_RESULT, + scheduler.schedule(storageUtil.mutableStoreProvider, ImmutableSet.of(TASK_ID, taskB))); } @Test @@ -179,15 +222,15 @@ public class TaskSchedulerImplTest extends EasyMockTest { // No reservation available in preemptor expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); - expectAssigned(TASK_A, NO_RESERVATION).andReturn(false); + expectAssigned(TASK_A, NO_RESERVATION).andReturn(NOT_SCHEDULED_RESULT); expectAsMap(NO_RESERVATION); expectNoReservation(TASK_A); - expectPreemptorCall(TASK_A, Optional.<String>absent()); + expectPreemptorCall(TASK_A, Optional.absent()); // Slave is reserved. expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); - expectAssigned(TASK_A, NO_RESERVATION).andReturn(false); + expectAssigned(TASK_A, NO_RESERVATION).andReturn(NOT_SCHEDULED_RESULT); expectAsMap(NO_RESERVATION); expectNoReservation(TASK_A); expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID)); @@ -197,13 +240,19 @@ public class TaskSchedulerImplTest extends EasyMockTest { expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); expectAsMap(ImmutableMap.of(SLAVE_ID, GROUP_KEY)); - expectAssigned(TASK_A, ImmutableMap.of(SLAVE_ID, GROUP_KEY)).andReturn(true); + expectAssigned(TASK_A, ImmutableMap.of(SLAVE_ID, GROUP_KEY)).andReturn(SCHEDULED_RESULT); control.replay(); - assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a")); - assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a")); - assertTrue(scheduler.schedule(storageUtil.mutableStoreProvider, "a")); + assertEquals( + NOT_SCHEDULED_RESULT, + scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK)); + assertEquals( + NOT_SCHEDULED_RESULT, + scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK)); + assertEquals( + SCHEDULED_RESULT, + scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK)); } @Test @@ -213,12 +262,14 @@ public class TaskSchedulerImplTest extends EasyMockTest { expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); expectAsMap(NO_RESERVATION); - expectAssigned(TASK_A, NO_RESERVATION).andReturn(false); + expectAssigned(TASK_A, NO_RESERVATION).andReturn(NOT_SCHEDULED_RESULT); expectGetReservation(TASK_A, SLAVE_ID); control.replay(); - assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a")); + assertEquals( + NOT_SCHEDULED_RESULT, + scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK)); } @Test @@ -228,12 +279,14 @@ public class TaskSchedulerImplTest extends EasyMockTest { expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); expectAsMap(NO_RESERVATION); - expectAssigned(TASK_A, NO_RESERVATION).andReturn(false); + expectAssigned(TASK_A, NO_RESERVATION).andReturn(NOT_SCHEDULED_RESULT); expectGetReservation(TASK_A, SLAVE_ID); control.replay(); - assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a")); + assertEquals( + NOT_SCHEDULED_RESULT, + scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK)); } @Test @@ -274,15 +327,15 @@ public class TaskSchedulerImplTest extends EasyMockTest { expectAsMap(NO_RESERVATION); expect(assigner.maybeAssign( EasyMock.anyObject(), - eq(new ResourceRequest(taskA.getAssignedTask().getTask(), bag(taskA), EMPTY)), + eq(new ResourceRequest(taskA.getAssignedTask().getTask(), bag(taskA), empty())), eq(TaskGroupKey.from(taskA.getAssignedTask().getTask())), - eq(Tasks.id(taskA)), - eq(NO_RESERVATION))).andReturn(true); + eq(SINGLE_TASK), + eq(NO_RESERVATION))).andReturn(SCHEDULED_RESULT); control.replay(); memStorage.write((NoResult.Quiet) - store -> assertTrue(scheduler.schedule(store, Tasks.id(taskA)))); + store -> assertEquals(SCHEDULED_RESULT, scheduler.schedule(store, SINGLE_TASK))); } @Test @@ -296,13 +349,15 @@ public class TaskSchedulerImplTest extends EasyMockTest { control.replay(); - assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a")); + assertEquals( + NOT_SCHEDULED_RESULT, + scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK)); } private void expectPreemptorCall(IScheduledTask task, Optional<String> result) { expect(preemptor.attemptPreemptionFor( task.getAssignedTask(), - EMPTY, + empty(), storageUtil.mutableStoreProvider)).andReturn(result); } http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java index b4d27f6..b482be5 100644 --- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java @@ -14,6 +14,7 @@ package org.apache.aurora.scheduler.state; import java.util.Map; +import java.util.Set; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; @@ -21,6 +22,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.aurora.common.testing.easymock.EasyMockTest; import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.Attribute; import org.apache.aurora.gen.HostAttributes; import org.apache.aurora.gen.JobKey; import org.apache.aurora.gen.TaskConfig; @@ -28,6 +30,7 @@ import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.TierManager; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; @@ -57,7 +60,7 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER; import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB; import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; -import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty; import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources; import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange; import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer; @@ -70,8 +73,7 @@ import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotEquals; public class TaskAssignerImplTest extends EasyMockTest { @@ -79,7 +81,10 @@ public class TaskAssignerImplTest extends EasyMockTest { private static final Offer MESOS_OFFER = offer(mesosRange(PORTS, PORT)); private static final String SLAVE_ID = MESOS_OFFER.getSlaveId().getValue(); private static final HostOffer OFFER = - new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes())); + new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes() + .setHost(MESOS_OFFER.getHostname()) + .setAttributes(ImmutableSet.of( + new Attribute("host", ImmutableSet.of(MESOS_OFFER.getHostname())))))); private static final IScheduledTask TASK = makeTask("id", JOB); private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask()); private static final TaskInfo TASK_INFO = TaskInfo.newBuilder() @@ -91,10 +96,23 @@ public class TaskAssignerImplTest extends EasyMockTest { private static final UnusedResource UNUSED = new UnusedResource( bagFromMesosResources(MESOS_OFFER.getResourcesList()), OFFER.getAttributes()); - private static final ResourceRequest RESOURCE_REQUEST = new ResourceRequest( - TASK.getAssignedTask().getTask(), - ResourceBag.EMPTY, - EMPTY); + private static final HostOffer OFFER_2 = new HostOffer( + Offer.newBuilder() + .setId(OfferID.newBuilder().setValue("offerId0")) + .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId")) + .setSlaveId(SlaveID.newBuilder().setValue("slaveId0")) + .setHostname("hostName0") + .addResources(Resource.newBuilder() + .setName("ports") + .setType(Type.RANGES) + .setRanges( + Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT)))) + .build(), + IHostAttributes.build(new HostAttributes())); + + private static final Set<String> NO_ASSIGNMENT = ImmutableSet.of(); + + private ResourceRequest resourceRequest; private MutableStoreProvider storeProvider; private StateManager stateManager; @@ -113,26 +131,43 @@ public class TaskAssignerImplTest extends EasyMockTest { offerManager = createMock(OfferManager.class); tierManager = createMock(TierManager.class); assigner = new TaskAssignerImpl(stateManager, filter, taskFactory, offerManager, tierManager); + resourceRequest = new ResourceRequest( + TASK.getAssignedTask().getTask(), + ResourceBag.EMPTY, + empty()); } @Test - public void testAssignNoVetoes() throws Exception { + public void testAssignNoTasks() throws Exception { + control.replay(); + + assertEquals( + NO_ASSIGNMENT, + assigner.maybeAssign(storeProvider, null, null, ImmutableSet.of(), null)); + } + + @Test + public void testAssignPartialNoVetoes() throws Exception { expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of()); + expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of()); expectAssignTask(MESOS_OFFER); expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER)) .andReturn(TASK_INFO); control.replay(); - assertTrue(assigner.maybeAssign( - storeProvider, - new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, EMPTY), - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - Tasks.id(TASK), - ImmutableMap.of(SLAVE_ID, GROUP_KEY))); + AttributeAggregate aggregate = empty(); + assertEquals( + ImmutableSet.of(Tasks.id(TASK)), + assigner.maybeAssign( + storeProvider, + new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate), + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of(Tasks.id(TASK), "id2", "id3"), + ImmutableMap.of(SLAVE_ID, GROUP_KEY))); + assertNotEquals(empty(), aggregate); } @Test @@ -140,43 +175,47 @@ public class TaskAssignerImplTest extends EasyMockTest { expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY); expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - expect(filter.filter(UNUSED, RESOURCE_REQUEST)) + expect(filter.filter(UNUSED, resourceRequest)) .andReturn(ImmutableSet.of(Veto.constraintMismatch("denied"))); control.replay(); - assertFalse(assigner.maybeAssign( - storeProvider, - RESOURCE_REQUEST, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - Tasks.id(TASK), - NO_RESERVATION)); + assertEquals( + NO_ASSIGNMENT, + assigner.maybeAssign( + storeProvider, + resourceRequest, + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of(Tasks.id(TASK)), + NO_RESERVATION)); } @Test public void testAssignVetoesWithNoStaticBan() throws Exception { expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - expect(filter.filter(UNUSED, RESOURCE_REQUEST)) + expect(filter.filter(UNUSED, resourceRequest)) .andReturn(ImmutableSet.of(Veto.unsatisfiedLimit("limit"))); control.replay(); - assertFalse(assigner.maybeAssign( - storeProvider, - RESOURCE_REQUEST, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - Tasks.id(TASK), - NO_RESERVATION)); + assertEquals( + NO_ASSIGNMENT, + assigner.maybeAssign( + storeProvider, + resourceRequest, + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of(Tasks.id(TASK)), + NO_RESERVATION)); } @Test public void testAssignmentClearedOnError() throws Exception { - expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); + expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER, OFFER_2)); offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); expectLastCall().andThrow(new OfferManager.LaunchException("expected")); expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of()); + expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of()); expectAssignTask(MESOS_OFFER); expect(stateManager.changeState( storeProvider, @@ -190,27 +229,33 @@ public class TaskAssignerImplTest extends EasyMockTest { control.replay(); - assertFalse(assigner.maybeAssign( - storeProvider, - RESOURCE_REQUEST, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - Tasks.id(TASK), - NO_RESERVATION)); + // Ensures scheduling loop terminates on the first launch failure. + assertEquals( + NO_ASSIGNMENT, + assigner.maybeAssign( + storeProvider, + resourceRequest, + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of(Tasks.id(TASK), "id2", "id3"), + NO_RESERVATION)); } @Test public void testAssignmentSkippedForReservedSlave() throws Exception { + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); control.replay(); - assertFalse(assigner.maybeAssign( - storeProvider, - RESOURCE_REQUEST, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - Tasks.id(TASK), - ImmutableMap.of(SLAVE_ID, TaskGroupKey.from( - ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n"))))))); + assertEquals( + NO_ASSIGNMENT, + assigner.maybeAssign( + storeProvider, + resourceRequest, + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of(Tasks.id(TASK)), + ImmutableMap.of(SLAVE_ID, TaskGroupKey.from( + ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n"))))))); } @Test @@ -218,36 +263,28 @@ public class TaskAssignerImplTest extends EasyMockTest { // Ensures slave/task reservation relationship is only enforced in slave->task direction // and permissive in task->slave direction. In other words, a task with a slave reservation // should still be tried against other unreserved slaves. - HostOffer offer = new HostOffer( - Offer.newBuilder() - .setId(OfferID.newBuilder().setValue("offerId0")) - .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId")) - .setSlaveId(SlaveID.newBuilder().setValue("slaveId0")) - .setHostname("hostName0") - .addResources(Resource.newBuilder() - .setName("ports") - .setType(Type.RANGES) - .setRanges( - Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT)))) - .build(), - IHostAttributes.build(new HostAttributes())); - - expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(offer, OFFER)); + expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER_2, OFFER)); expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of()); - expectAssignTask(offer.getOffer()); - expect(taskFactory.createFrom(TASK.getAssignedTask(), offer.getOffer())) + expect(filter.filter( + new UnusedResource( + bagFromMesosResources(OFFER_2.getOffer().getResourcesList()), + OFFER_2.getAttributes()), + resourceRequest)).andReturn(ImmutableSet.of()); + expectAssignTask(OFFER_2.getOffer()); + expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER_2.getOffer())) .andReturn(TASK_INFO); - offerManager.launchTask(offer.getOffer().getId(), TASK_INFO); + offerManager.launchTask(OFFER_2.getOffer().getId(), TASK_INFO); control.replay(); - assertTrue(assigner.maybeAssign( - storeProvider, - RESOURCE_REQUEST, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - Tasks.id(TASK), - ImmutableMap.of(SLAVE_ID, GROUP_KEY))); + assertEquals( + ImmutableSet.of(Tasks.id(TASK)), + assigner.maybeAssign( + storeProvider, + resourceRequest, + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of(Tasks.id(TASK)), + ImmutableMap.of(SLAVE_ID, GROUP_KEY))); } @Test @@ -268,18 +305,18 @@ public class TaskAssignerImplTest extends EasyMockTest { IHostAttributes.build(new HostAttributes())); expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(mismatched, OFFER)); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER).times(2); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); expect(filter.filter( new UnusedResource( bagFromMesosResources(mismatched.getOffer().getResourcesList()), mismatched.getAttributes()), - RESOURCE_REQUEST)) + resourceRequest)) .andReturn(ImmutableSet.of(Veto.constraintMismatch("constraint mismatch"))); offerManager.banOffer(mismatched.getOffer().getId(), GROUP_KEY); expect(filter.filter( new UnusedResource( bagFromMesosResources(MESOS_OFFER.getResourcesList()), OFFER.getAttributes()), - RESOURCE_REQUEST)) + resourceRequest)) .andReturn(ImmutableSet.of()); expectAssignTask(MESOS_OFFER); @@ -289,12 +326,14 @@ public class TaskAssignerImplTest extends EasyMockTest { control.replay(); - assertTrue(assigner.maybeAssign( - storeProvider, - RESOURCE_REQUEST, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - Tasks.id(TASK), - ImmutableMap.of(SLAVE_ID, GROUP_KEY))); + assertEquals( + ImmutableSet.of(Tasks.id(TASK)), + assigner.maybeAssign( + storeProvider, + resourceRequest, + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of(Tasks.id(TASK)), + ImmutableMap.of(SLAVE_ID, GROUP_KEY))); } @Test
