http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelector.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelector.java b/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelector.java new file mode 100644 index 0000000..ee65bab --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelector.java @@ -0,0 +1,29 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import com.google.common.base.Optional; +import com.google.common.collect.Iterables; + +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; + +public class FirstFitOfferSelector implements OfferSelector { + + @Override + public Optional<HostOffer> select(Iterable<HostOffer> offers, ResourceRequest resourceRequest) { + + return Optional.fromNullable(Iterables.getFirst(offers, null)); + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorModule.java new file mode 100644 index 0000000..4d36487 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorModule.java @@ -0,0 +1,26 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import com.google.inject.AbstractModule; +import com.google.inject.Singleton; + +public class FirstFitOfferSelectorModule extends AbstractModule { + + @Override + protected void configure() { + bind(OfferSelector.class).to(FirstFitOfferSelector.class); + bind(FirstFitOfferSelector.class).in(Singleton.class); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/OfferSelector.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/OfferSelector.java b/src/main/java/org/apache/aurora/scheduler/scheduling/OfferSelector.java new file mode 100644 index 0000000..c95b980 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/OfferSelector.java @@ -0,0 +1,36 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import com.google.common.base.Optional; + +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; + +/** + * Injected into {@link TaskAssignerImpl}, this class scores the offers available and returns an + * option containing the offer to use. + */ +public interface OfferSelector { + + /** + * Score offers that fit within the given {@link ResourceRequest} and return an option containing + * the offer to use for assignment. + * + * @param offers A stream of offers that match the given {@link ResourceRequest}. + * @param resourceRequest The {@link ResourceRequest} for the task to assign. + * @return An {@link Optional} containing the offer to use. + */ + Optional<HostOffer> select(Iterable<HostOffer> offers, ResourceRequest resourceRequest); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java index 0796712..f72dacd 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java @@ -139,8 +139,8 @@ public class SchedulingModule extends AbstractModule { bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).in(Singleton.class); bind(BiCache.BiCacheSettings.class).toInstance( new BiCache.BiCacheSettings(options.reservationDuration, "reservation")); - bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class); - bind(TaskScheduler.TaskSchedulerImpl.class).in(Singleton.class); + bind(TaskScheduler.class).to(TaskSchedulerImpl.class); + bind(TaskSchedulerImpl.class).in(Singleton.class); expose(TaskScheduler.class); } }); http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java new file mode 100644 index 0000000..87619b5 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java @@ -0,0 +1,46 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import java.util.Map; +import java.util.Set; + +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; + +import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; + +/** + * Responsible for matching a task against an offer and launching it. + */ +public interface TaskAssigner { + /** + * Tries to match a task against an offer. If a match is found, the assigner makes the + * appropriate changes to the task and requests task launch. + * + * @param storeProvider Storage provider. + * @param resourceRequest The request for resources being scheduled. + * @param groupKey Task group key. + * @param tasks Tasks to assign. + * @param preemptionReservations Slave reservations. + * @return Successfully assigned task IDs. + */ + Set<String> maybeAssign( + MutableStoreProvider storeProvider, + ResourceRequest resourceRequest, + TaskGroupKey groupKey, + Iterable<IAssignedTask> tasks, + Map<String, TaskGroupKey> preemptionReservations); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java new file mode 100644 index 0000000..a1dd74f --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java @@ -0,0 +1,284 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import javax.inject.Inject; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.TierManager; +import org.apache.aurora.scheduler.base.InstanceKeys; +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; +import org.apache.aurora.scheduler.mesos.MesosTaskFactory; +import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.resources.ResourceManager; +import org.apache.aurora.scheduler.resources.ResourceType; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.IInstanceKey; +import org.apache.aurora.scheduler.updater.UpdateAgentReserver; +import org.apache.mesos.v1.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + +import static org.apache.aurora.common.inject.TimedInterceptor.Timed; +import static org.apache.aurora.gen.ScheduleStatus.LOST; +import static org.apache.aurora.gen.ScheduleStatus.PENDING; + +public class TaskAssignerImpl implements TaskAssigner { + private static final Logger LOG = LoggerFactory.getLogger(TaskAssignerImpl.class); + + @VisibleForTesting + static final Optional<String> LAUNCH_FAILED_MSG = + Optional.of("Unknown exception attempting to schedule task."); + @VisibleForTesting + static final String ASSIGNER_LAUNCH_FAILURES = "assigner_launch_failures"; + + private final AtomicLong launchFailures; + + private final StateManager stateManager; + private final MesosTaskFactory taskFactory; + private final OfferManager offerManager; + private final TierManager tierManager; + private final UpdateAgentReserver updateAgentReserver; + private final OfferSelector offerSelector; + + @Inject + public TaskAssignerImpl( + StateManager stateManager, + MesosTaskFactory taskFactory, + OfferManager offerManager, + TierManager tierManager, + UpdateAgentReserver updateAgentReserver, + StatsProvider statsProvider, + OfferSelector offerSelector) { + + this.stateManager = requireNonNull(stateManager); + this.taskFactory = requireNonNull(taskFactory); + this.offerManager = requireNonNull(offerManager); + this.tierManager = requireNonNull(tierManager); + this.launchFailures = statsProvider.makeCounter(ASSIGNER_LAUNCH_FAILURES); + this.updateAgentReserver = requireNonNull(updateAgentReserver); + this.offerSelector = requireNonNull(offerSelector); + } + + @VisibleForTesting + IAssignedTask mapAndAssignResources(Protos.Offer offer, IAssignedTask task) { + IAssignedTask assigned = task; + for (ResourceType type : ResourceManager.getTaskResourceTypes(assigned)) { + if (type.getMapper().isPresent()) { + assigned = type.getMapper().get().mapAndAssign(offer, assigned); + } + } + return assigned; + } + + private Protos.TaskInfo assign( + Storage.MutableStoreProvider storeProvider, + Protos.Offer offer, + String taskId, + boolean revocable) { + + String host = offer.getHostname(); + IAssignedTask assigned = stateManager.assignTask( + storeProvider, + taskId, + host, + offer.getAgentId(), + task -> mapAndAssignResources(offer, task)); + LOG.info( + "Offer on agent {} (id {}) is being assigned task for {}.", + host, offer.getAgentId().getValue(), taskId); + return taskFactory.createFrom(assigned, offer, revocable); + } + + private void launchUsingOffer( + Storage.MutableStoreProvider storeProvider, + boolean revocable, + ResourceRequest resourceRequest, + IAssignedTask task, + HostOffer offer, + ImmutableSet.Builder<String> assignmentResult) throws OfferManager.LaunchException { + + String taskId = task.getTaskId(); + Protos.TaskInfo taskInfo = assign(storeProvider, offer.getOffer(), taskId, revocable); + resourceRequest.getJobState().updateAttributeAggregate(offer.getAttributes()); + try { + offerManager.launchTask(offer.getOffer().getId(), taskInfo); + assignmentResult.add(taskId); + } catch (OfferManager.LaunchException e) { + LOG.warn("Failed to launch task.", e); + launchFailures.incrementAndGet(); + + // The attempt to schedule the task failed, so we need to backpedal on the assignment. + // It is in the LOST state and a new task will move to PENDING to replace it. + // Should the state change fail due to storage issues, that's okay. The task will + // time out in the ASSIGNED state and be moved to LOST. + stateManager.changeState( + storeProvider, + taskId, + Optional.of(PENDING), + LOST, + LAUNCH_FAILED_MSG); + throw e; + } + } + + private Iterable<IAssignedTask> maybeAssignReserved( + Iterable<IAssignedTask> tasks, + Storage.MutableStoreProvider storeProvider, + boolean revocable, + ResourceRequest resourceRequest, + TaskGroupKey groupKey, + ImmutableSet.Builder<String> assignmentResult) { + + if (!updateAgentReserver.hasReservations(groupKey)) { + return tasks; + } + + // Data structure to record which tasks should be excluded from the regular (non-reserved) + // scheduling loop. This is important because we release reservations once they are used, + // so we need to record them separately to avoid them being double-scheduled. + ImmutableSet.Builder<IInstanceKey> excludeBuilder = ImmutableSet.builder(); + + for (IAssignedTask task : tasks) { + IInstanceKey key = InstanceKeys.from(task.getTask().getJob(), task.getInstanceId()); + Optional<String> maybeAgentId = updateAgentReserver.getAgent(key); + if (maybeAgentId.isPresent()) { + excludeBuilder.add(key); + Optional<HostOffer> offer = offerManager.getMatching( + Protos.AgentID.newBuilder().setValue(maybeAgentId.get()).build(), + resourceRequest, + revocable); + if (offer.isPresent()) { + try { + // The offer can still be veto'd because of changed constraints, or because the + // Scheduler hasn't been updated by Mesos yet... + launchUsingOffer(storeProvider, + revocable, + resourceRequest, + task, + offer.get(), + assignmentResult); + LOG.info("Used update reservation for {} on {}", key, maybeAgentId.get()); + updateAgentReserver.release(maybeAgentId.get(), key); + } catch (OfferManager.LaunchException e) { + updateAgentReserver.release(maybeAgentId.get(), key); + } + } else { + LOG.info( + "Tried to reuse offer on {} for {}, but was not ready yet.", + maybeAgentId.get(), + key); + } + } + } + + // Return only the tasks that didn't have reservations. Offers on agents that were reserved + // might not have been seen by Aurora yet, so we need to wait until the reservation expires + // before giving up and falling back to the first-fit algorithm. + Set<IInstanceKey> toBeExcluded = excludeBuilder.build(); + return Iterables.filter(tasks, t -> !toBeExcluded.contains( + InstanceKeys.from(t.getTask().getJob(), t.getInstanceId()))); + } + + /** + * Determine whether or not the offer is reserved for a different task via preemption or + * update affinity. + */ + @SuppressWarnings("PMD.UselessParentheses") // TODO(jly): PMD bug, remove when upgrade from 5.5.3 + private boolean isAgentReserved(HostOffer offer, + TaskGroupKey groupKey, + Map<String, TaskGroupKey> preemptionReservations) { + + String agentId = offer.getOffer().getAgentId().getValue(); + Optional<TaskGroupKey> reservedGroup = Optional.fromNullable( + preemptionReservations.get(agentId)); + + return (reservedGroup.isPresent() && !reservedGroup.get().equals(groupKey)) + || !updateAgentReserver.getReservations(agentId).isEmpty(); + } + + @Timed("assigner_maybe_assign") + @Override + public Set<String> maybeAssign( + Storage.MutableStoreProvider storeProvider, + ResourceRequest resourceRequest, + TaskGroupKey groupKey, + Iterable<IAssignedTask> tasks, + Map<String, TaskGroupKey> preemptionReservations) { + + if (Iterables.isEmpty(tasks)) { + return ImmutableSet.of(); + } + + boolean revocable = tierManager.getTier(groupKey.getTask()).isRevocable(); + ImmutableSet.Builder<String> assignmentResult = ImmutableSet.builder(); + + // Assign tasks reserved for a specific agent (e.g. for update affinity) + Iterable<IAssignedTask> nonReservedTasks = maybeAssignReserved( + tasks, + storeProvider, + revocable, + resourceRequest, + groupKey, + assignmentResult); + + // Assign the rest of the non-reserved tasks + for (IAssignedTask task : nonReservedTasks) { + try { + // Get all offers that will satisfy the given ResourceRequest and that are not reserved + // for updates or preemption + FluentIterable<HostOffer> matchingOffers = FluentIterable + .from(offerManager.getAllMatching(groupKey, resourceRequest, revocable)) + .filter(o -> !isAgentReserved(o, groupKey, preemptionReservations)); + + // Determine which is the optimal offer to select for the given request + Optional<HostOffer> optionalOffer = offerSelector.select(matchingOffers, resourceRequest); + + // If no offer is chosen, continue to the next task + if (!optionalOffer.isPresent()) { + continue; + } + + // Attempt to launch the task using the chosen offer + HostOffer offer = optionalOffer.get(); + launchUsingOffer(storeProvider, + revocable, + resourceRequest, + task, + offer, + assignmentResult); + } catch (OfferManager.LaunchException e) { + // Any launch exception causes the scheduling round to terminate for this TaskGroup. + break; + } + } + + return assignmentResult.build(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplModule.java new file mode 100644 index 0000000..2ddd4f5 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplModule.java @@ -0,0 +1,59 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import java.util.List; +import javax.inject.Singleton; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.google.common.collect.ImmutableList; +import com.google.inject.AbstractModule; +import com.google.inject.Module; + +import org.apache.aurora.scheduler.app.MoreModules; +import org.apache.aurora.scheduler.config.CliOptions; + +/** + * The default TaskAssigner implementation that allows the injection of custom offer + * selecting modules via the '-offer_selector_modules' flag. + */ +public class TaskAssignerImplModule extends AbstractModule { + + @Parameters(separators = "=") + public static class Options { + @Parameter(names = "-offer_selector_modules", + description = "Guice module for customizing the TaskAssignerImpl's OfferSelector.") + @SuppressWarnings("rawtypes") + public List<Class> offerSelectorModules = + ImmutableList.of(FirstFitOfferSelectorModule.class); + } + + private final CliOptions cliOptions; + + public TaskAssignerImplModule(CliOptions cliOptions) { + this.cliOptions = cliOptions; + } + + @Override + protected void configure() { + Options options = cliOptions.taskAssigner; + for (Module module : MoreModules.instantiateAll(options.offerSelectorModules, cliOptions)) { + install(module); + } + + bind(TaskAssigner.class).to(TaskAssignerImpl.class); + bind(TaskAssignerImpl.class).in(Singleton.class); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java index 0002b0c..3c38f95 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java @@ -13,55 +13,10 @@ */ package org.apache.aurora.scheduler.scheduling; -import java.lang.annotation.Retention; -import java.lang.annotation.Target; -import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; -import javax.inject.Inject; -import javax.inject.Qualifier; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; -import com.google.common.eventbus.Subscribe; - -import org.apache.aurora.common.inject.TimedInterceptor.Timed; -import org.apache.aurora.common.stats.Stats; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; -import org.apache.aurora.scheduler.filter.AttributeAggregate; -import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; -import org.apache.aurora.scheduler.preemptor.BiCache; -import org.apache.aurora.scheduler.preemptor.Preemptor; -import org.apache.aurora.scheduler.resources.ResourceBag; -import org.apache.aurora.scheduler.state.TaskAssigner; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.lang.annotation.ElementType.FIELD; -import static java.lang.annotation.ElementType.METHOD; -import static java.lang.annotation.ElementType.PARAMETER; -import static java.lang.annotation.RetentionPolicy.RUNTIME; -import static java.util.Objects.requireNonNull; - -import static java.util.stream.Collectors.toMap; - -import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources; /** * Enables scheduling and preemption of tasks. @@ -77,150 +32,4 @@ public interface TaskScheduler extends EventSubscriber { * task ID was not present in the result. */ Set<String> schedule(MutableStoreProvider storeProvider, Iterable<String> taskIds); - - /** - * An asynchronous task scheduler. Scheduling of tasks is performed on a delay, where each task - * backs off after a failed scheduling attempt. - * <p> - * Pending tasks are advertised to the scheduler via internal pubsub notifications. - */ - class TaskSchedulerImpl implements TaskScheduler { - /** - * Binding annotation for the time duration of reservations. - */ - @VisibleForTesting - @Qualifier - @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) - public @interface ReservationDuration { } - - private static final Logger LOG = LoggerFactory.getLogger(TaskSchedulerImpl.class); - - private final TaskAssigner assigner; - private final Preemptor preemptor; - private final ExecutorSettings executorSettings; - private final BiCache<String, TaskGroupKey> reservations; - - private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired"); - private final AtomicLong attemptsFailed = Stats.exportLong("schedule_attempts_failed"); - private final AtomicLong attemptsNoMatch = Stats.exportLong("schedule_attempts_no_match"); - - @Inject - TaskSchedulerImpl( - TaskAssigner assigner, - Preemptor preemptor, - ExecutorSettings executorSettings, - BiCache<String, TaskGroupKey> reservations) { - - this.assigner = requireNonNull(assigner); - this.preemptor = requireNonNull(preemptor); - this.executorSettings = requireNonNull(executorSettings); - this.reservations = requireNonNull(reservations); - } - - @Timed ("task_schedule_attempt") - public Set<String> schedule(MutableStoreProvider store, Iterable<String> taskIds) { - try { - return scheduleTasks(store, taskIds); - } catch (RuntimeException e) { - // We catch the generic unchecked exception here to ensure tasks are not abandoned - // if there is a transient issue resulting in an unchecked exception. - LOG.warn("Task scheduling unexpectedly failed, will be retried", e); - attemptsFailed.incrementAndGet(); - // Return empty set for all task IDs to be retried later. - // It's ok if some tasks were already assigned, those will be ignored in the next round. - return ImmutableSet.of(); - } - } - - private Set<String> scheduleTasks(MutableStoreProvider store, Iterable<String> tasks) { - ImmutableSet<String> taskIds = ImmutableSet.copyOf(tasks); - String taskIdValues = Joiner.on(",").join(taskIds); - LOG.debug("Attempting to schedule tasks {}", taskIdValues); - ImmutableSet<IAssignedTask> assignedTasks = - ImmutableSet.copyOf(Iterables.transform( - store.getTaskStore().fetchTasks(Query.taskScoped(taskIds).byStatus(PENDING)), - IScheduledTask::getAssignedTask)); - - if (Iterables.isEmpty(assignedTasks)) { - LOG.warn("Failed to look up all tasks in a scheduling round: {}", taskIdValues); - return taskIds; - } - - Preconditions.checkState( - assignedTasks.stream() - .collect(Collectors.groupingBy(t -> t.getTask())) - .entrySet() - .size() == 1, - "Found multiple task groups for %s", - taskIdValues); - - Map<String, IAssignedTask> assignableTaskMap = - assignedTasks.stream().collect(toMap(t -> t.getTaskId(), t -> t)); - - if (taskIds.size() != assignedTasks.size()) { - LOG.warn("Failed to look up tasks " - + Joiner.on(", ").join(Sets.difference(taskIds, assignableTaskMap.keySet()))); - } - - // This is safe after all checks above. - ITaskConfig task = assignedTasks.stream().findFirst().get().getTask(); - AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob()); - - // Valid Docker tasks can have a container but no executor config - ResourceBag overhead = ResourceBag.EMPTY; - if (task.isSetExecutorConfig()) { - overhead = executorSettings.getExecutorOverhead(task.getExecutorConfig().getName()) - .orElseThrow( - () -> new IllegalArgumentException("Cannot find executor configuration")); - } - - Set<String> launched = assigner.maybeAssign( - store, - new ResourceRequest( - task, - bagFromResources(task.getResources()).add(overhead), aggregate), - TaskGroupKey.from(task), - assignedTasks, - reservations.asMap()); - - attemptsFired.addAndGet(assignableTaskMap.size()); - Set<String> failedToLaunch = Sets.difference(assignableTaskMap.keySet(), launched); - - failedToLaunch.forEach(taskId -> { - // Task could not be scheduled. - // TODO(maxim): Now that preemption slots are searched asynchronously, consider - // retrying a launch attempt within the current scheduling round IFF a reservation is - // available. - maybePreemptFor(assignableTaskMap.get(taskId), aggregate, store); - }); - attemptsNoMatch.addAndGet(failedToLaunch.size()); - - // Return all successfully launched tasks as well as those weren't tried (not in PENDING). - return Sets.union(launched, Sets.difference(taskIds, assignableTaskMap.keySet())); - } - - private void maybePreemptFor( - IAssignedTask task, - AttributeAggregate jobState, - MutableStoreProvider storeProvider) { - - if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) { - return; - } - Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, storeProvider); - if (slaveId.isPresent()) { - reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask())); - } - } - - @Subscribe - public void taskChanged(final TaskStateChange stateChangeEvent) { - if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) { - IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask(); - if (assigned.getSlaveId() != null) { - reservations.remove(assigned.getSlaveId(), TaskGroupKey.from(assigned.getTask())); - } - } - } - } } http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java new file mode 100644 index 0000000..b6d5d95 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java @@ -0,0 +1,207 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import javax.inject.Inject; +import javax.inject.Qualifier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import com.google.common.eventbus.Subscribe; + +import org.apache.aurora.common.inject.TimedInterceptor.Timed; +import org.apache.aurora.common.stats.Stats; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; +import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.filter.AttributeAggregate; +import org.apache.aurora.scheduler.filter.SchedulingFilter; +import org.apache.aurora.scheduler.preemptor.BiCache; +import org.apache.aurora.scheduler.preemptor.Preemptor; +import org.apache.aurora.scheduler.resources.ResourceBag; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toMap; + +import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources; + +/** + * An asynchronous task scheduler. Scheduling of tasks is performed on a delay, where each task + * backs off after a failed scheduling attempt. + * <p> + * Pending tasks are advertised to the scheduler via internal pubsub notifications. + */ +public class TaskSchedulerImpl implements TaskScheduler { + /** + * Binding annotation for the time duration of reservations. + */ + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + public @interface ReservationDuration { } + + private static final Logger LOG = + LoggerFactory.getLogger(org.apache.aurora.scheduler.scheduling.TaskSchedulerImpl.class); + + private final TaskAssigner assigner; + private final Preemptor preemptor; + private final ExecutorSettings executorSettings; + private final BiCache<String, TaskGroupKey> reservations; + + private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired"); + private final AtomicLong attemptsFailed = Stats.exportLong("schedule_attempts_failed"); + private final AtomicLong attemptsNoMatch = Stats.exportLong("schedule_attempts_no_match"); + + @Inject + TaskSchedulerImpl( + TaskAssigner assigner, + Preemptor preemptor, + ExecutorSettings executorSettings, + BiCache<String, TaskGroupKey> reservations) { + + this.assigner = requireNonNull(assigner); + this.preemptor = requireNonNull(preemptor); + this.executorSettings = requireNonNull(executorSettings); + this.reservations = requireNonNull(reservations); + } + + @Timed("task_schedule_attempt") + public Set<String> schedule(Storage.MutableStoreProvider store, Iterable<String> taskIds) { + try { + return scheduleTasks(store, taskIds); + } catch (RuntimeException e) { + // We catch the generic unchecked exception here to ensure tasks are not abandoned + // if there is a transient issue resulting in an unchecked exception. + LOG.warn("Task scheduling unexpectedly failed, will be retried", e); + attemptsFailed.incrementAndGet(); + // Return empty set for all task IDs to be retried later. + // It's ok if some tasks were already assigned, those will be ignored in the next round. + return ImmutableSet.of(); + } + } + + private Set<String> scheduleTasks(Storage.MutableStoreProvider store, Iterable<String> tasks) { + ImmutableSet<String> taskIds = ImmutableSet.copyOf(tasks); + String taskIdValues = Joiner.on(",").join(taskIds); + LOG.debug("Attempting to schedule tasks {}", taskIdValues); + ImmutableSet<IAssignedTask> assignedTasks = + ImmutableSet.copyOf(Iterables.transform( + store.getTaskStore().fetchTasks(Query.taskScoped(taskIds).byStatus(PENDING)), + IScheduledTask::getAssignedTask)); + + if (Iterables.isEmpty(assignedTasks)) { + LOG.warn("Failed to look up all tasks in a scheduling round: {}", taskIdValues); + return taskIds; + } + + Preconditions.checkState( + assignedTasks.stream() + .collect(Collectors.groupingBy(t -> t.getTask())) + .entrySet() + .size() == 1, + "Found multiple task groups for %s", + taskIdValues); + + Map<String, IAssignedTask> assignableTaskMap = + assignedTasks.stream().collect(toMap(t -> t.getTaskId(), t -> t)); + + if (taskIds.size() != assignedTasks.size()) { + LOG.warn("Failed to look up tasks " + + Joiner.on(", ").join(Sets.difference(taskIds, assignableTaskMap.keySet()))); + } + + // This is safe after all checks above. + ITaskConfig task = assignedTasks.stream().findFirst().get().getTask(); + AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob()); + + // Valid Docker tasks can have a container but no executor config + ResourceBag overhead = ResourceBag.EMPTY; + if (task.isSetExecutorConfig()) { + overhead = executorSettings.getExecutorOverhead(task.getExecutorConfig().getName()) + .orElseThrow( + () -> new IllegalArgumentException("Cannot find executor configuration")); + } + + Set<String> launched = assigner.maybeAssign( + store, + new SchedulingFilter.ResourceRequest( + task, + bagFromResources(task.getResources()).add(overhead), aggregate), + TaskGroupKey.from(task), + assignedTasks, + reservations.asMap()); + + attemptsFired.addAndGet(assignableTaskMap.size()); + Set<String> failedToLaunch = Sets.difference(assignableTaskMap.keySet(), launched); + + failedToLaunch.forEach(taskId -> { + // Task could not be scheduled. + // TODO(maxim): Now that preemption slots are searched asynchronously, consider + // retrying a launch attempt within the current scheduling round IFF a reservation is + // available. + maybePreemptFor(assignableTaskMap.get(taskId), aggregate, store); + }); + attemptsNoMatch.addAndGet(failedToLaunch.size()); + + // Return all successfully launched tasks as well as those weren't tried (not in PENDING). + return Sets.union(launched, Sets.difference(taskIds, assignableTaskMap.keySet())); + } + + private void maybePreemptFor( + IAssignedTask task, + AttributeAggregate jobState, + Storage.MutableStoreProvider storeProvider) { + + if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) { + return; + } + Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, storeProvider); + if (slaveId.isPresent()) { + reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask())); + } + } + + @Subscribe + public void taskChanged(final PubsubEvent.TaskStateChange stateChangeEvent) { + if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) { + IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask(); + if (assigned.getSlaveId() != null) { + reservations.remove(assigned.getSlaveId(), TaskGroupKey.from(assigned.getTask())); + } + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java b/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java deleted file mode 100644 index dc244ee..0000000 --- a/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.state; - -import javax.inject.Singleton; - -import com.google.inject.AbstractModule; - -import org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner; - -/** - * Exposes the default TaskAssigner implementation, which is a first-fit scheduling algorithm. - */ -public class FirstFitTaskAssignerModule extends AbstractModule { - @Override - protected void configure() { - bind(TaskAssigner.class).to(FirstFitTaskAssigner.class); - bind(FirstFitTaskAssigner.class).in(Singleton.class); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/state/StateModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java index b7a3c0b..46e9227 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java +++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java @@ -29,6 +29,7 @@ import org.apache.aurora.scheduler.config.CliOptions; import org.apache.aurora.scheduler.events.PubsubEventModule; import org.apache.aurora.scheduler.mesos.MesosTaskFactory; import org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl; +import org.apache.aurora.scheduler.scheduling.TaskAssignerImplModule; import org.apache.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl; import org.apache.aurora.scheduler.state.UUIDGenerator.UUIDGeneratorImpl; @@ -42,7 +43,7 @@ public class StateModule extends AbstractModule { @Parameter(names = "-task_assigner_modules", description = "Guice modules for customizing task assignment.") @SuppressWarnings("rawtypes") - public List<Class> taskAssignerModules = ImmutableList.of(FirstFitTaskAssignerModule.class); + public List<Class> taskAssignerModules = ImmutableList.of(TaskAssignerImplModule.class); } private final CliOptions options; http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java deleted file mode 100644 index cdd0d15..0000000 --- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java +++ /dev/null @@ -1,338 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.state; - -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -import javax.inject.Inject; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; - -import org.apache.aurora.common.inject.TimedInterceptor.Timed; -import org.apache.aurora.common.stats.StatsProvider; -import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.TierManager; -import org.apache.aurora.scheduler.base.InstanceKeys; -import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.filter.SchedulingFilter; -import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; -import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; -import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; -import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup; -import org.apache.aurora.scheduler.mesos.MesosTaskFactory; -import org.apache.aurora.scheduler.offers.OfferManager; -import org.apache.aurora.scheduler.resources.ResourceManager; -import org.apache.aurora.scheduler.resources.ResourceType; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.apache.aurora.scheduler.storage.entities.IInstanceKey; -import org.apache.aurora.scheduler.updater.UpdateAgentReserver; -import org.apache.mesos.v1.Protos; -import org.apache.mesos.v1.Protos.TaskInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.util.Objects.requireNonNull; - -import static org.apache.aurora.gen.ScheduleStatus.LOST; -import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import static org.apache.mesos.v1.Protos.Offer; - -/** - * Responsible for matching a task against an offer and launching it. - */ -public interface TaskAssigner { - /** - * Tries to match a task against an offer. If a match is found, the assigner makes the - * appropriate changes to the task and requests task launch. - * - * @param storeProvider Storage provider. - * @param resourceRequest The request for resources being scheduled. - * @param groupKey Task group key. - * @param tasks Tasks to assign. - * @param preemptionReservations Slave reservations. - * @return Successfully assigned task IDs. - */ - Set<String> maybeAssign( - MutableStoreProvider storeProvider, - ResourceRequest resourceRequest, - TaskGroupKey groupKey, - Iterable<IAssignedTask> tasks, - Map<String, TaskGroupKey> preemptionReservations); - - class FirstFitTaskAssigner implements TaskAssigner { - private static final Logger LOG = LoggerFactory.getLogger(FirstFitTaskAssigner.class); - - @VisibleForTesting - static final Optional<String> LAUNCH_FAILED_MSG = - Optional.of("Unknown exception attempting to schedule task."); - @VisibleForTesting - static final String ASSIGNER_LAUNCH_FAILURES = "assigner_launch_failures"; - @VisibleForTesting - static final String ASSIGNER_EVALUATED_OFFERS = "assigner_evaluated_offers"; - - private final AtomicLong launchFailures; - private final AtomicLong evaluatedOffers; - - private final StateManager stateManager; - private final SchedulingFilter filter; - private final MesosTaskFactory taskFactory; - private final OfferManager offerManager; - private final TierManager tierManager; - private final UpdateAgentReserver updateAgentReserver; - - @Inject - public FirstFitTaskAssigner( - StateManager stateManager, - SchedulingFilter filter, - MesosTaskFactory taskFactory, - OfferManager offerManager, - TierManager tierManager, - UpdateAgentReserver updateAgentReserver, - StatsProvider statsProvider) { - - this.stateManager = requireNonNull(stateManager); - this.filter = requireNonNull(filter); - this.taskFactory = requireNonNull(taskFactory); - this.offerManager = requireNonNull(offerManager); - this.tierManager = requireNonNull(tierManager); - this.launchFailures = statsProvider.makeCounter(ASSIGNER_LAUNCH_FAILURES); - this.evaluatedOffers = statsProvider.makeCounter(ASSIGNER_EVALUATED_OFFERS); - this.updateAgentReserver = requireNonNull(updateAgentReserver); - } - - @VisibleForTesting - IAssignedTask mapAndAssignResources(Offer offer, IAssignedTask task) { - IAssignedTask assigned = task; - for (ResourceType type : ResourceManager.getTaskResourceTypes(assigned)) { - if (type.getMapper().isPresent()) { - assigned = type.getMapper().get().mapAndAssign(offer, assigned); - } - } - return assigned; - } - - private TaskInfo assign( - MutableStoreProvider storeProvider, - Offer offer, - String taskId, - boolean revocable) { - - String host = offer.getHostname(); - IAssignedTask assigned = stateManager.assignTask( - storeProvider, - taskId, - host, - offer.getAgentId(), - task -> mapAndAssignResources(offer, task)); - LOG.info( - "Offer on agent {} (id {}) is being assigned task for {}.", - host, offer.getAgentId().getValue(), taskId); - return taskFactory.createFrom(assigned, offer, revocable); - } - - private boolean evaluateOffer( - MutableStoreProvider storeProvider, - boolean revocable, - ResourceRequest resourceRequest, - TaskGroupKey groupKey, - IAssignedTask task, - HostOffer offer, - ImmutableSet.Builder<String> assignmentResult) throws OfferManager.LaunchException { - - String taskId = task.getTaskId(); - Set<Veto> vetoes = filter.filter( - new UnusedResource( - offer.getResourceBag(revocable), - offer.getAttributes(), - offer.getUnavailabilityStart()), - resourceRequest); - - if (vetoes.isEmpty()) { - TaskInfo taskInfo = assign( - storeProvider, - offer.getOffer(), - taskId, - revocable); - resourceRequest.getJobState().updateAttributeAggregate(offer.getAttributes()); - - try { - offerManager.launchTask(offer.getOffer().getId(), taskInfo); - assignmentResult.add(taskId); - return true; - } catch (OfferManager.LaunchException e) { - LOG.warn("Failed to launch task.", e); - launchFailures.incrementAndGet(); - - // The attempt to schedule the task failed, so we need to backpedal on the - // assignment. - // It is in the LOST state and a new task will move to PENDING to replace it. - // Should the state change fail due to storage issues, that's okay. The task will - // time out in the ASSIGNED state and be moved to LOST. - stateManager.changeState( - storeProvider, - taskId, - Optional.of(PENDING), - LOST, - LAUNCH_FAILED_MSG); - throw e; - } - } else { - if (Veto.identifyGroup(vetoes) == VetoGroup.STATIC) { - // Never attempt to match this offer/groupKey pair again. - offerManager.banOfferForTaskGroup(offer.getOffer().getId(), groupKey); - } - LOG.debug("Agent {} vetoed task {}: {}", offer.getOffer().getHostname(), taskId, vetoes); - } - return false; - } - - private Iterable<IAssignedTask> maybeAssignReserved( - Iterable<IAssignedTask> tasks, - MutableStoreProvider storeProvider, - boolean revocable, - ResourceRequest resourceRequest, - TaskGroupKey groupKey, - ImmutableSet.Builder<String> assignmentResult) { - - if (!updateAgentReserver.hasReservations(groupKey)) { - return tasks; - } - - // Data structure to record which tasks should be excluded from the regular (non-reserved) - // scheduling loop. This is important because we release reservations once they are used, - // so we need to record them separately to avoid them being double-scheduled. - ImmutableSet.Builder<IInstanceKey> excludeBuilder = ImmutableSet.builder(); - - for (IAssignedTask task: tasks) { - IInstanceKey key = InstanceKeys.from(task.getTask().getJob(), task.getInstanceId()); - Optional<String> maybeAgentId = updateAgentReserver.getAgent(key); - if (maybeAgentId.isPresent()) { - excludeBuilder.add(key); - Optional<HostOffer> offer = offerManager.getOffer( - Protos.AgentID.newBuilder().setValue(maybeAgentId.get()).build()); - if (offer.isPresent()) { - try { - // The offer can still be veto'd because of changed constraints, or because the - // Scheduler hasn't been updated by Mesos yet... - if (evaluateOffer( - storeProvider, - revocable, - resourceRequest, - groupKey, - task, - offer.get(), - assignmentResult)) { - - LOG.info("Used update reservation for {} on {}", key, maybeAgentId.get()); - updateAgentReserver.release(maybeAgentId.get(), key); - } else { - LOG.info( - "Tried to reuse offer on {} for {}, but was not ready yet.", - maybeAgentId.get(), - key); - } - } catch (OfferManager.LaunchException e) { - updateAgentReserver.release(maybeAgentId.get(), key); - } - } - } - } - - // Return only the tasks that didn't have reservations. Offers on agents that were reserved - // might not have been seen by Aurora yet, so we need to wait until the reservation expires - // before giving up and falling back to the first-fit algorithm. - Set<IInstanceKey> toBeExcluded = excludeBuilder.build(); - return Iterables.filter(tasks, t -> !toBeExcluded.contains( - InstanceKeys.from(t.getTask().getJob(), t.getInstanceId()))); - } - - @Timed("assigner_maybe_assign") - @Override - public Set<String> maybeAssign( - MutableStoreProvider storeProvider, - ResourceRequest resourceRequest, - TaskGroupKey groupKey, - Iterable<IAssignedTask> tasks, - Map<String, TaskGroupKey> preemptionReservations) { - - if (Iterables.isEmpty(tasks)) { - return ImmutableSet.of(); - } - - boolean revocable = tierManager.getTier(groupKey.getTask()).isRevocable(); - ImmutableSet.Builder<String> assignmentResult = ImmutableSet.builder(); - - Iterable<IAssignedTask> nonReservedTasks = maybeAssignReserved( - tasks, - storeProvider, - revocable, - resourceRequest, - groupKey, - assignmentResult); - - Iterator<IAssignedTask> remainingTasks = nonReservedTasks.iterator(); - // Make sure we still have tasks to process after reservations are processed. - if (remainingTasks.hasNext()) { - IAssignedTask task = remainingTasks.next(); - for (HostOffer offer : offerManager.getOffers(groupKey)) { - - if (!offer.hasCpuAndMem()) { - // This offer lacks any type of CPU or mem resource, and therefore will never match - // a task. - continue; - } - - String agentId = offer.getOffer().getAgentId().getValue(); - - Optional<TaskGroupKey> reservedGroup = Optional.fromNullable( - preemptionReservations.get(agentId)); - - if (reservedGroup.isPresent() && !reservedGroup.get().equals(groupKey)) { - // This slave is reserved for a different task group -> skip. - continue; - } - - if (!updateAgentReserver.getReservations(agentId).isEmpty()) { - // This agent has been reserved for an update in-progress, skip. - continue; - } - - evaluatedOffers.incrementAndGet(); - try { - boolean offerUsed = evaluateOffer( - storeProvider, revocable, resourceRequest, groupKey, task, offer, assignmentResult); - if (offerUsed) { - if (remainingTasks.hasNext()) { - task = remainingTasks.next(); - } else { - break; - } - } - } catch (OfferManager.LaunchException e) { - break; - } - } - } - - return assignmentResult.build(); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java index 6033c01..e629093 100644 --- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java +++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java @@ -157,7 +157,7 @@ public class AsyncStatsModule extends AbstractModule { @Override public Iterable<MachineResource> get() { - Iterable<HostOffer> offers = offerManager.getOffers(); + Iterable<HostOffer> offers = offerManager.getAll(); ImmutableList.Builder<MachineResource> builder = ImmutableList.builder(); for (HostOffer offer : offers) { http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java index 0ec4de6..5cb5310 100644 --- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java +++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java @@ -160,6 +160,7 @@ public class CommandLineTest { expected.scheduling.reservationDuration = TEST_TIME; expected.scheduling.schedulingMaxBatchSize = 42; expected.scheduling.maxTasksPerScheduleAttempt = 42; + expected.taskAssigner.offerSelectorModules = ImmutableList.of(NoopModule.class); expected.async.asyncWorkerThreads = 42; expected.zk.inProcess = true; expected.zk.zkEndpoints = ImmutableList.of(InetSocketAddress.createUnresolved("testing", 42)); @@ -266,11 +267,11 @@ public class CommandLineTest { "-hold_offers_forever=true", "-min_offer_hold_time=42days", "-offer_hold_jitter_window=42days", - "-offer_static_ban_cache_max_size=42", "-offer_filter_duration=42days", "-unavailability_threshold=42days", "-offer_order=CPU,DISK", "-offer_order_modules=org.apache.aurora.scheduler.config.CommandLineTest$NoopModule", + "-offer_static_ban_cache_max_size=42", "-custom_executor_config=" + tempFile.getAbsolutePath(), "-thermos_executor_path=testing", "-thermos_executor_resources=testing", @@ -306,6 +307,7 @@ public class CommandLineTest { "-offer_reservation_duration=42days", "-scheduling_max_batch_size=42", "-max_tasks_per_schedule_attempt=42", + "-offer_selector_modules=org.apache.aurora.scheduler.config.CommandLineTest$NoopModule", "-async_worker_threads=42", "-zk_in_proc=true", "-zk_endpoints=testing:42", http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java b/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java index 3069959..549d2e3 100644 --- a/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java +++ b/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java @@ -49,7 +49,7 @@ public class OffersTest extends EasyMockTest { @Test public void testNoOffers() throws Exception { - expect(offerManager.getOffers()).andReturn(ImmutableSet.of()); + expect(offerManager.getAll()).andReturn(ImmutableSet.of()); control.replay(); @@ -134,7 +134,7 @@ public class OffersTest extends EasyMockTest { .build(), IHostAttributes.build(new HostAttributes().setMode(NONE))); - expect(offerManager.getOffers()).andReturn(ImmutableSet.of(offer)); + expect(offerManager.getAll()).andReturn(ImmutableSet.of(offer)); control.replay(); http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java index 45ae6bb..64efc0d 100644 --- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java @@ -259,7 +259,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest { public void testOffers() { storageUtil.expectOperations(); expectOfferAttributesSaved(HOST_OFFER); - offerManager.addOffer(HOST_OFFER); + offerManager.add(HOST_OFFER); control.replay(); @@ -272,8 +272,8 @@ public class MesosCallbackHandlerTest extends EasyMockTest { storageUtil.expectOperations(); expectOfferAttributesSaved(HOST_OFFER); expectOfferAttributesSaved(HOST_OFFER_2); - offerManager.addOffer(HOST_OFFER); - offerManager.addOffer(HOST_OFFER_2); + offerManager.add(HOST_OFFER); + offerManager.add(HOST_OFFER_2); control.replay(); @@ -296,7 +296,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest { expect(storageUtil.attributeStore.saveHostAttributes(saved)).andReturn(true); // If the host is in draining, then the offer manager should get an offer with that attribute - offerManager.addOffer(DRAINING_HOST_OFFER); + offerManager.add(DRAINING_HOST_OFFER); control.replay(); handler.handleOffers(ImmutableList.of(HOST_OFFER.getOffer())); @@ -316,7 +316,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest { @Test public void testRescind() { - expect(offerManager.cancelOffer(OFFER_ID)).andReturn(true); + expect(offerManager.cancel(OFFER_ID)).andReturn(true); control.replay(); @@ -336,12 +336,12 @@ public class MesosCallbackHandlerTest extends EasyMockTest { FakeScheduledThreadPoolExecutor fakeExecutor = new FakeScheduledThreadPoolExecutor(); createHandler(false, fakeExecutor); - expect(offerManager.cancelOffer(OFFER_ID)).andReturn(false); - offerManager.banOffer(OFFER_ID); + expect(offerManager.cancel(OFFER_ID)).andReturn(false); + offerManager.ban(OFFER_ID); storageUtil.expectOperations(); expectOfferAttributesSaved(HOST_OFFER); - offerManager.addOffer(HOST_OFFER); - expect(offerManager.cancelOffer(OFFER_ID)).andReturn(true); + offerManager.add(HOST_OFFER); + expect(offerManager.cancel(OFFER_ID)).andReturn(true); control.replay(); replay(offerManager); @@ -355,7 +355,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest { // Eventually, we unban the offer. handler.handleRescind(OFFER_ID); - // 2 commands executed (addOffer and unbanOffer). + // 2 commands executed (add and unbanOffer). fakeExecutor.advance(); fakeExecutor.advance();
