http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelector.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelector.java
 
b/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelector.java
new file mode 100644
index 0000000..ee65bab
--- /dev/null
+++ 
b/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelector.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+
+public class FirstFitOfferSelector implements OfferSelector {
+
+  @Override
+  public Optional<HostOffer> select(Iterable<HostOffer> offers, 
ResourceRequest resourceRequest) {
+
+    return Optional.fromNullable(Iterables.getFirst(offers, null));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorModule.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorModule.java
 
b/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorModule.java
new file mode 100644
index 0000000..4d36487
--- /dev/null
+++ 
b/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorModule.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Singleton;
+
+public class FirstFitOfferSelectorModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    bind(OfferSelector.class).to(FirstFitOfferSelector.class);
+    bind(FirstFitOfferSelector.class).in(Singleton.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/OfferSelector.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/scheduling/OfferSelector.java 
b/src/main/java/org/apache/aurora/scheduler/scheduling/OfferSelector.java
new file mode 100644
index 0000000..c95b980
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/OfferSelector.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import com.google.common.base.Optional;
+
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+
+/**
+ * Injected into {@link TaskAssignerImpl}, this class scores the offers 
available and returns an
+ * option containing the offer to use.
+ */
+public interface OfferSelector {
+
+  /**
+   * Score offers that fit within the given {@link ResourceRequest} and return 
an option containing
+   * the offer to use for assignment.
+   *
+   * @param offers A stream of offers that match the given {@link 
ResourceRequest}.
+   * @param resourceRequest The {@link ResourceRequest} for the task to assign.
+   * @return An {@link Optional} containing the offer to use.
+   */
+  Optional<HostOffer> select(Iterable<HostOffer> offers, ResourceRequest 
resourceRequest);
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java 
b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
index 0796712..f72dacd 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
@@ -139,8 +139,8 @@ public class SchedulingModule extends AbstractModule {
         bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { 
}).in(Singleton.class);
         bind(BiCache.BiCacheSettings.class).toInstance(
             new BiCache.BiCacheSettings(options.reservationDuration, 
"reservation"));
-        bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class);
-        bind(TaskScheduler.TaskSchedulerImpl.class).in(Singleton.class);
+        bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
+        bind(TaskSchedulerImpl.class).in(Singleton.class);
         expose(TaskScheduler.class);
       }
     });

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java 
b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
new file mode 100644
index 0000000..87619b5
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+
+/**
+ * Responsible for matching a task against an offer and launching it.
+ */
+public interface TaskAssigner {
+  /**
+   * Tries to match a task against an offer.  If a match is found, the 
assigner makes the
+   * appropriate changes to the task and requests task launch.
+   *
+   * @param storeProvider Storage provider.
+   * @param resourceRequest The request for resources being scheduled.
+   * @param groupKey Task group key.
+   * @param tasks Tasks to assign.
+   * @param preemptionReservations Slave reservations.
+   * @return Successfully assigned task IDs.
+   */
+  Set<String> maybeAssign(
+      MutableStoreProvider storeProvider,
+      ResourceRequest resourceRequest,
+      TaskGroupKey groupKey,
+      Iterable<IAssignedTask> tasks,
+      Map<String, TaskGroupKey> preemptionReservations);
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java 
b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
new file mode 100644
index 0000000..a1dd74f
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.base.InstanceKeys;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.resources.ResourceManager;
+import org.apache.aurora.scheduler.resources.ResourceType;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
+import org.apache.mesos.v1.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+
+public class TaskAssignerImpl implements TaskAssigner {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TaskAssignerImpl.class);
+
+  @VisibleForTesting
+  static final Optional<String> LAUNCH_FAILED_MSG =
+      Optional.of("Unknown exception attempting to schedule task.");
+  @VisibleForTesting
+  static final String ASSIGNER_LAUNCH_FAILURES = "assigner_launch_failures";
+
+  private final AtomicLong launchFailures;
+
+  private final StateManager stateManager;
+  private final MesosTaskFactory taskFactory;
+  private final OfferManager offerManager;
+  private final TierManager tierManager;
+  private final UpdateAgentReserver updateAgentReserver;
+  private final OfferSelector offerSelector;
+
+  @Inject
+  public TaskAssignerImpl(
+      StateManager stateManager,
+      MesosTaskFactory taskFactory,
+      OfferManager offerManager,
+      TierManager tierManager,
+      UpdateAgentReserver updateAgentReserver,
+      StatsProvider statsProvider,
+      OfferSelector offerSelector) {
+
+    this.stateManager = requireNonNull(stateManager);
+    this.taskFactory = requireNonNull(taskFactory);
+    this.offerManager = requireNonNull(offerManager);
+    this.tierManager = requireNonNull(tierManager);
+    this.launchFailures = statsProvider.makeCounter(ASSIGNER_LAUNCH_FAILURES);
+    this.updateAgentReserver = requireNonNull(updateAgentReserver);
+    this.offerSelector = requireNonNull(offerSelector);
+  }
+
+  @VisibleForTesting
+  IAssignedTask mapAndAssignResources(Protos.Offer offer, IAssignedTask task) {
+    IAssignedTask assigned = task;
+    for (ResourceType type : ResourceManager.getTaskResourceTypes(assigned)) {
+      if (type.getMapper().isPresent()) {
+        assigned = type.getMapper().get().mapAndAssign(offer, assigned);
+      }
+    }
+    return assigned;
+  }
+
+  private Protos.TaskInfo assign(
+      Storage.MutableStoreProvider storeProvider,
+      Protos.Offer offer,
+      String taskId,
+      boolean revocable) {
+
+    String host = offer.getHostname();
+    IAssignedTask assigned = stateManager.assignTask(
+        storeProvider,
+        taskId,
+        host,
+        offer.getAgentId(),
+        task -> mapAndAssignResources(offer, task));
+    LOG.info(
+        "Offer on agent {} (id {}) is being assigned task for {}.",
+        host, offer.getAgentId().getValue(), taskId);
+    return taskFactory.createFrom(assigned, offer, revocable);
+  }
+
+  private void launchUsingOffer(
+      Storage.MutableStoreProvider storeProvider,
+      boolean revocable,
+      ResourceRequest resourceRequest,
+      IAssignedTask task,
+      HostOffer offer,
+      ImmutableSet.Builder<String> assignmentResult) throws 
OfferManager.LaunchException {
+
+    String taskId = task.getTaskId();
+    Protos.TaskInfo taskInfo = assign(storeProvider, offer.getOffer(), taskId, 
revocable);
+    
resourceRequest.getJobState().updateAttributeAggregate(offer.getAttributes());
+    try {
+      offerManager.launchTask(offer.getOffer().getId(), taskInfo);
+      assignmentResult.add(taskId);
+    } catch (OfferManager.LaunchException e) {
+      LOG.warn("Failed to launch task.", e);
+      launchFailures.incrementAndGet();
+
+      // The attempt to schedule the task failed, so we need to backpedal on 
the assignment.
+      // It is in the LOST state and a new task will move to PENDING to 
replace it.
+      // Should the state change fail due to storage issues, that's okay.  The 
task will
+      // time out in the ASSIGNED state and be moved to LOST.
+      stateManager.changeState(
+          storeProvider,
+          taskId,
+          Optional.of(PENDING),
+          LOST,
+          LAUNCH_FAILED_MSG);
+      throw e;
+    }
+  }
+
+  private Iterable<IAssignedTask> maybeAssignReserved(
+      Iterable<IAssignedTask> tasks,
+      Storage.MutableStoreProvider storeProvider,
+      boolean revocable,
+      ResourceRequest resourceRequest,
+      TaskGroupKey groupKey,
+      ImmutableSet.Builder<String> assignmentResult) {
+
+    if (!updateAgentReserver.hasReservations(groupKey)) {
+      return tasks;
+    }
+
+    // Data structure to record which tasks should be excluded from the 
regular (non-reserved)
+    // scheduling loop. This is important because we release reservations once 
they are used,
+    // so we need to record them separately to avoid them being 
double-scheduled.
+    ImmutableSet.Builder<IInstanceKey> excludeBuilder = ImmutableSet.builder();
+
+    for (IAssignedTask task : tasks) {
+      IInstanceKey key = InstanceKeys.from(task.getTask().getJob(), 
task.getInstanceId());
+      Optional<String> maybeAgentId = updateAgentReserver.getAgent(key);
+      if (maybeAgentId.isPresent()) {
+        excludeBuilder.add(key);
+        Optional<HostOffer> offer = offerManager.getMatching(
+            Protos.AgentID.newBuilder().setValue(maybeAgentId.get()).build(),
+            resourceRequest,
+            revocable);
+        if (offer.isPresent()) {
+          try {
+            // The offer can still be veto'd because of changed constraints, 
or because the
+            // Scheduler hasn't been updated by Mesos yet...
+            launchUsingOffer(storeProvider,
+                revocable,
+                resourceRequest,
+                task,
+                offer.get(),
+                assignmentResult);
+            LOG.info("Used update reservation for {} on {}", key, 
maybeAgentId.get());
+            updateAgentReserver.release(maybeAgentId.get(), key);
+          } catch (OfferManager.LaunchException e) {
+            updateAgentReserver.release(maybeAgentId.get(), key);
+          }
+        } else {
+          LOG.info(
+              "Tried to reuse offer on {} for {}, but was not ready yet.",
+              maybeAgentId.get(),
+              key);
+        }
+      }
+    }
+
+    // Return only the tasks that didn't have reservations. Offers on agents 
that were reserved
+    // might not have been seen by Aurora yet, so we need to wait until the 
reservation expires
+    // before giving up and falling back to the first-fit algorithm.
+    Set<IInstanceKey> toBeExcluded = excludeBuilder.build();
+    return Iterables.filter(tasks, t -> !toBeExcluded.contains(
+        InstanceKeys.from(t.getTask().getJob(), t.getInstanceId())));
+  }
+
+  /**
+   * Determine whether or not the offer is reserved for a different task via 
preemption or
+   * update affinity.
+   */
+  @SuppressWarnings("PMD.UselessParentheses")  // TODO(jly): PMD bug, remove 
when upgrade from 5.5.3
+  private boolean isAgentReserved(HostOffer offer,
+                                  TaskGroupKey groupKey,
+                                  Map<String, TaskGroupKey> 
preemptionReservations) {
+
+    String agentId = offer.getOffer().getAgentId().getValue();
+    Optional<TaskGroupKey> reservedGroup = Optional.fromNullable(
+        preemptionReservations.get(agentId));
+
+    return (reservedGroup.isPresent() && !reservedGroup.get().equals(groupKey))
+        || !updateAgentReserver.getReservations(agentId).isEmpty();
+  }
+
+  @Timed("assigner_maybe_assign")
+  @Override
+  public Set<String> maybeAssign(
+      Storage.MutableStoreProvider storeProvider,
+      ResourceRequest resourceRequest,
+      TaskGroupKey groupKey,
+      Iterable<IAssignedTask> tasks,
+      Map<String, TaskGroupKey> preemptionReservations) {
+
+    if (Iterables.isEmpty(tasks)) {
+      return ImmutableSet.of();
+    }
+
+    boolean revocable = tierManager.getTier(groupKey.getTask()).isRevocable();
+    ImmutableSet.Builder<String> assignmentResult = ImmutableSet.builder();
+
+    // Assign tasks reserved for a specific agent (e.g. for update affinity)
+    Iterable<IAssignedTask> nonReservedTasks = maybeAssignReserved(
+        tasks,
+        storeProvider,
+        revocable,
+        resourceRequest,
+        groupKey,
+        assignmentResult);
+
+    // Assign the rest of the non-reserved tasks
+    for (IAssignedTask task : nonReservedTasks) {
+      try {
+        // Get all offers that will satisfy the given ResourceRequest and that 
are not reserved
+        // for updates or preemption
+        FluentIterable<HostOffer> matchingOffers = FluentIterable
+            .from(offerManager.getAllMatching(groupKey, resourceRequest, 
revocable))
+            .filter(o -> !isAgentReserved(o, groupKey, 
preemptionReservations));
+
+        // Determine which is the optimal offer to select for the given request
+        Optional<HostOffer> optionalOffer = 
offerSelector.select(matchingOffers, resourceRequest);
+
+        // If no offer is chosen, continue to the next task
+        if (!optionalOffer.isPresent()) {
+          continue;
+        }
+
+        // Attempt to launch the task using the chosen offer
+        HostOffer offer = optionalOffer.get();
+        launchUsingOffer(storeProvider,
+            revocable,
+            resourceRequest,
+            task,
+            offer,
+            assignmentResult);
+      } catch (OfferManager.LaunchException e) {
+        // Any launch exception causes the scheduling round to terminate for 
this TaskGroup.
+        break;
+      }
+    }
+
+    return assignmentResult.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplModule.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplModule.java
 
b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplModule.java
new file mode 100644
index 0000000..2ddd4f5
--- /dev/null
+++ 
b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplModule.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.List;
+import javax.inject.Singleton;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.AbstractModule;
+import com.google.inject.Module;
+
+import org.apache.aurora.scheduler.app.MoreModules;
+import org.apache.aurora.scheduler.config.CliOptions;
+
+/**
+ * The default TaskAssigner implementation that allows the injection of custom 
offer
+ * selecting modules via the '-offer_selector_modules' flag.
+ */
+public class TaskAssignerImplModule extends AbstractModule {
+
+  @Parameters(separators = "=")
+  public static class Options {
+    @Parameter(names = "-offer_selector_modules",
+        description = "Guice module for customizing the TaskAssignerImpl's 
OfferSelector.")
+    @SuppressWarnings("rawtypes")
+    public List<Class> offerSelectorModules =
+        ImmutableList.of(FirstFitOfferSelectorModule.class);
+  }
+
+  private final CliOptions cliOptions;
+
+  public TaskAssignerImplModule(CliOptions cliOptions) {
+    this.cliOptions = cliOptions;
+  }
+
+  @Override
+  protected void configure() {
+    Options options = cliOptions.taskAssigner;
+    for (Module module : 
MoreModules.instantiateAll(options.offerSelectorModules, cliOptions)) {
+      install(module);
+    }
+
+    bind(TaskAssigner.class).to(TaskAssignerImpl.class);
+    bind(TaskAssignerImpl.class).in(Singleton.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java 
b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
index 0002b0c..3c38f95 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -13,55 +13,10 @@
  */
 package org.apache.aurora.scheduler.scheduling;
 
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
 
-import javax.inject.Inject;
-import javax.inject.Qualifier;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-import com.google.common.eventbus.Subscribe;
-
-import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.stats.Stats;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.preemptor.BiCache;
-import org.apache.aurora.scheduler.preemptor.Preemptor;
-import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-import static java.util.Objects.requireNonNull;
-
-import static java.util.stream.Collectors.toMap;
-
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static 
org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources;
 
 /**
  * Enables scheduling and preemption of tasks.
@@ -77,150 +32,4 @@ public interface TaskScheduler extends EventSubscriber {
    *         task ID was not present in the result.
    */
   Set<String> schedule(MutableStoreProvider storeProvider, Iterable<String> 
taskIds);
-
-  /**
-   * An asynchronous task scheduler.  Scheduling of tasks is performed on a 
delay, where each task
-   * backs off after a failed scheduling attempt.
-   * <p>
-   * Pending tasks are advertised to the scheduler via internal pubsub 
notifications.
-   */
-  class TaskSchedulerImpl implements TaskScheduler {
-    /**
-     * Binding annotation for the time duration of reservations.
-     */
-    @VisibleForTesting
-    @Qualifier
-    @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-    public @interface ReservationDuration { }
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(TaskSchedulerImpl.class);
-
-    private final TaskAssigner assigner;
-    private final Preemptor preemptor;
-    private final ExecutorSettings executorSettings;
-    private final BiCache<String, TaskGroupKey> reservations;
-
-    private final AtomicLong attemptsFired = 
Stats.exportLong("schedule_attempts_fired");
-    private final AtomicLong attemptsFailed = 
Stats.exportLong("schedule_attempts_failed");
-    private final AtomicLong attemptsNoMatch = 
Stats.exportLong("schedule_attempts_no_match");
-
-    @Inject
-    TaskSchedulerImpl(
-        TaskAssigner assigner,
-        Preemptor preemptor,
-        ExecutorSettings executorSettings,
-        BiCache<String, TaskGroupKey> reservations) {
-
-      this.assigner = requireNonNull(assigner);
-      this.preemptor = requireNonNull(preemptor);
-      this.executorSettings = requireNonNull(executorSettings);
-      this.reservations = requireNonNull(reservations);
-    }
-
-    @Timed ("task_schedule_attempt")
-    public Set<String> schedule(MutableStoreProvider store, Iterable<String> 
taskIds) {
-      try {
-        return scheduleTasks(store, taskIds);
-      } catch (RuntimeException e) {
-        // We catch the generic unchecked exception here to ensure tasks are 
not abandoned
-        // if there is a transient issue resulting in an unchecked exception.
-        LOG.warn("Task scheduling unexpectedly failed, will be retried", e);
-        attemptsFailed.incrementAndGet();
-        // Return empty set for all task IDs to be retried later.
-        // It's ok if some tasks were already assigned, those will be ignored 
in the next round.
-        return ImmutableSet.of();
-      }
-    }
-
-    private Set<String> scheduleTasks(MutableStoreProvider store, 
Iterable<String> tasks) {
-      ImmutableSet<String> taskIds = ImmutableSet.copyOf(tasks);
-      String taskIdValues = Joiner.on(",").join(taskIds);
-      LOG.debug("Attempting to schedule tasks {}", taskIdValues);
-      ImmutableSet<IAssignedTask> assignedTasks =
-          ImmutableSet.copyOf(Iterables.transform(
-              
store.getTaskStore().fetchTasks(Query.taskScoped(taskIds).byStatus(PENDING)),
-              IScheduledTask::getAssignedTask));
-
-      if (Iterables.isEmpty(assignedTasks)) {
-        LOG.warn("Failed to look up all tasks in a scheduling round: {}", 
taskIdValues);
-        return taskIds;
-      }
-
-      Preconditions.checkState(
-          assignedTasks.stream()
-              .collect(Collectors.groupingBy(t -> t.getTask()))
-              .entrySet()
-              .size() == 1,
-          "Found multiple task groups for %s",
-          taskIdValues);
-
-      Map<String, IAssignedTask> assignableTaskMap =
-          assignedTasks.stream().collect(toMap(t -> t.getTaskId(), t -> t));
-
-      if (taskIds.size() != assignedTasks.size()) {
-        LOG.warn("Failed to look up tasks "
-            + Joiner.on(", ").join(Sets.difference(taskIds, 
assignableTaskMap.keySet())));
-      }
-
-      // This is safe after all checks above.
-      ITaskConfig task = assignedTasks.stream().findFirst().get().getTask();
-      AttributeAggregate aggregate = 
AttributeAggregate.getJobActiveState(store, task.getJob());
-
-      // Valid Docker tasks can have a container but no executor config
-      ResourceBag overhead = ResourceBag.EMPTY;
-      if (task.isSetExecutorConfig()) {
-        overhead = 
executorSettings.getExecutorOverhead(task.getExecutorConfig().getName())
-            .orElseThrow(
-                () -> new IllegalArgumentException("Cannot find executor 
configuration"));
-      }
-
-      Set<String> launched = assigner.maybeAssign(
-          store,
-          new ResourceRequest(
-              task,
-              bagFromResources(task.getResources()).add(overhead), aggregate),
-          TaskGroupKey.from(task),
-          assignedTasks,
-          reservations.asMap());
-
-      attemptsFired.addAndGet(assignableTaskMap.size());
-      Set<String> failedToLaunch = Sets.difference(assignableTaskMap.keySet(), 
launched);
-
-      failedToLaunch.forEach(taskId -> {
-        // Task could not be scheduled.
-        // TODO(maxim): Now that preemption slots are searched asynchronously, 
consider
-        // retrying a launch attempt within the current scheduling round IFF a 
reservation is
-        // available.
-        maybePreemptFor(assignableTaskMap.get(taskId), aggregate, store);
-      });
-      attemptsNoMatch.addAndGet(failedToLaunch.size());
-
-      // Return all successfully launched tasks as well as those weren't tried 
(not in PENDING).
-      return Sets.union(launched, Sets.difference(taskIds, 
assignableTaskMap.keySet()));
-    }
-
-    private void maybePreemptFor(
-        IAssignedTask task,
-        AttributeAggregate jobState,
-        MutableStoreProvider storeProvider) {
-
-      if 
(!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) {
-        return;
-      }
-      Optional<String> slaveId = preemptor.attemptPreemptionFor(task, 
jobState, storeProvider);
-      if (slaveId.isPresent()) {
-        reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask()));
-      }
-    }
-
-    @Subscribe
-    public void taskChanged(final TaskStateChange stateChangeEvent) {
-      if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) {
-        IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask();
-        if (assigned.getSlaveId() != null) {
-          reservations.remove(assigned.getSlaveId(), 
TaskGroupKey.from(assigned.getTask()));
-        }
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java 
b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
new file mode 100644
index 0000000..b6d5d95
--- /dev/null
+++ 
b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import javax.inject.Inject;
+import javax.inject.Qualifier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.eventbus.Subscribe;
+
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.preemptor.BiCache;
+import org.apache.aurora.scheduler.preemptor.Preemptor;
+import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toMap;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static 
org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources;
+
+/**
+ * An asynchronous task scheduler.  Scheduling of tasks is performed on a 
delay, where each task
+ * backs off after a failed scheduling attempt.
+ * <p>
+ * Pending tasks are advertised to the scheduler via internal pubsub 
notifications.
+ */
+public class TaskSchedulerImpl implements TaskScheduler {
+  /**
+   * Binding annotation for the time duration of reservations.
+   */
+  @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  public @interface ReservationDuration { }
+
+  private static final Logger LOG =
+      
LoggerFactory.getLogger(org.apache.aurora.scheduler.scheduling.TaskSchedulerImpl.class);
+
+  private final TaskAssigner assigner;
+  private final Preemptor preemptor;
+  private final ExecutorSettings executorSettings;
+  private final BiCache<String, TaskGroupKey> reservations;
+
+  private final AtomicLong attemptsFired = 
Stats.exportLong("schedule_attempts_fired");
+  private final AtomicLong attemptsFailed = 
Stats.exportLong("schedule_attempts_failed");
+  private final AtomicLong attemptsNoMatch = 
Stats.exportLong("schedule_attempts_no_match");
+
+  @Inject
+  TaskSchedulerImpl(
+      TaskAssigner assigner,
+      Preemptor preemptor,
+      ExecutorSettings executorSettings,
+      BiCache<String, TaskGroupKey> reservations) {
+
+    this.assigner = requireNonNull(assigner);
+    this.preemptor = requireNonNull(preemptor);
+    this.executorSettings = requireNonNull(executorSettings);
+    this.reservations = requireNonNull(reservations);
+  }
+
+  @Timed("task_schedule_attempt")
+  public Set<String> schedule(Storage.MutableStoreProvider store, 
Iterable<String> taskIds) {
+    try {
+      return scheduleTasks(store, taskIds);
+    } catch (RuntimeException e) {
+      // We catch the generic unchecked exception here to ensure tasks are not 
abandoned
+      // if there is a transient issue resulting in an unchecked exception.
+      LOG.warn("Task scheduling unexpectedly failed, will be retried", e);
+      attemptsFailed.incrementAndGet();
+      // Return empty set for all task IDs to be retried later.
+      // It's ok if some tasks were already assigned, those will be ignored in 
the next round.
+      return ImmutableSet.of();
+    }
+  }
+
+  private Set<String> scheduleTasks(Storage.MutableStoreProvider store, 
Iterable<String> tasks) {
+    ImmutableSet<String> taskIds = ImmutableSet.copyOf(tasks);
+    String taskIdValues = Joiner.on(",").join(taskIds);
+    LOG.debug("Attempting to schedule tasks {}", taskIdValues);
+    ImmutableSet<IAssignedTask> assignedTasks =
+        ImmutableSet.copyOf(Iterables.transform(
+            
store.getTaskStore().fetchTasks(Query.taskScoped(taskIds).byStatus(PENDING)),
+            IScheduledTask::getAssignedTask));
+
+    if (Iterables.isEmpty(assignedTasks)) {
+      LOG.warn("Failed to look up all tasks in a scheduling round: {}", 
taskIdValues);
+      return taskIds;
+    }
+
+    Preconditions.checkState(
+        assignedTasks.stream()
+            .collect(Collectors.groupingBy(t -> t.getTask()))
+            .entrySet()
+            .size() == 1,
+        "Found multiple task groups for %s",
+        taskIdValues);
+
+    Map<String, IAssignedTask> assignableTaskMap =
+        assignedTasks.stream().collect(toMap(t -> t.getTaskId(), t -> t));
+
+    if (taskIds.size() != assignedTasks.size()) {
+      LOG.warn("Failed to look up tasks "
+          + Joiner.on(", ").join(Sets.difference(taskIds, 
assignableTaskMap.keySet())));
+    }
+
+    // This is safe after all checks above.
+    ITaskConfig task = assignedTasks.stream().findFirst().get().getTask();
+    AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, 
task.getJob());
+
+    // Valid Docker tasks can have a container but no executor config
+    ResourceBag overhead = ResourceBag.EMPTY;
+    if (task.isSetExecutorConfig()) {
+      overhead = 
executorSettings.getExecutorOverhead(task.getExecutorConfig().getName())
+          .orElseThrow(
+              () -> new IllegalArgumentException("Cannot find executor 
configuration"));
+    }
+
+    Set<String> launched = assigner.maybeAssign(
+        store,
+        new SchedulingFilter.ResourceRequest(
+            task,
+            bagFromResources(task.getResources()).add(overhead), aggregate),
+        TaskGroupKey.from(task),
+        assignedTasks,
+        reservations.asMap());
+
+    attemptsFired.addAndGet(assignableTaskMap.size());
+    Set<String> failedToLaunch = Sets.difference(assignableTaskMap.keySet(), 
launched);
+
+    failedToLaunch.forEach(taskId -> {
+      // Task could not be scheduled.
+      // TODO(maxim): Now that preemption slots are searched asynchronously, 
consider
+      // retrying a launch attempt within the current scheduling round IFF a 
reservation is
+      // available.
+      maybePreemptFor(assignableTaskMap.get(taskId), aggregate, store);
+    });
+    attemptsNoMatch.addAndGet(failedToLaunch.size());
+
+    // Return all successfully launched tasks as well as those weren't tried 
(not in PENDING).
+    return Sets.union(launched, Sets.difference(taskIds, 
assignableTaskMap.keySet()));
+  }
+
+  private void maybePreemptFor(
+      IAssignedTask task,
+      AttributeAggregate jobState,
+      Storage.MutableStoreProvider storeProvider) {
+
+    if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) 
{
+      return;
+    }
+    Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, 
storeProvider);
+    if (slaveId.isPresent()) {
+      reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask()));
+    }
+  }
+
+  @Subscribe
+  public void taskChanged(final PubsubEvent.TaskStateChange stateChangeEvent) {
+    if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) {
+      IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask();
+      if (assigned.getSlaveId() != null) {
+        reservations.remove(assigned.getSlaveId(), 
TaskGroupKey.from(assigned.getTask()));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java
 
b/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java
deleted file mode 100644
index dc244ee..0000000
--- 
a/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import javax.inject.Singleton;
-
-import com.google.inject.AbstractModule;
-
-import org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner;
-
-/**
- *  Exposes the default TaskAssigner implementation, which is a first-fit 
scheduling algorithm.
- */
-public class FirstFitTaskAssignerModule extends AbstractModule {
-  @Override
-  protected void configure() {
-    bind(TaskAssigner.class).to(FirstFitTaskAssigner.class);
-    bind(FirstFitTaskAssigner.class).in(Singleton.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java 
b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
index b7a3c0b..46e9227 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
@@ -29,6 +29,7 @@ import org.apache.aurora.scheduler.config.CliOptions;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl;
+import org.apache.aurora.scheduler.scheduling.TaskAssignerImplModule;
 import 
org.apache.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl;
 import org.apache.aurora.scheduler.state.UUIDGenerator.UUIDGeneratorImpl;
 
@@ -42,7 +43,7 @@ public class StateModule extends AbstractModule {
     @Parameter(names = "-task_assigner_modules",
         description = "Guice modules for customizing task assignment.")
     @SuppressWarnings("rawtypes")
-    public List<Class> taskAssignerModules = 
ImmutableList.of(FirstFitTaskAssignerModule.class);
+    public List<Class> taskAssignerModules = 
ImmutableList.of(TaskAssignerImplModule.class);
   }
 
   private final CliOptions options;

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java 
b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
deleted file mode 100644
index cdd0d15..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.stats.StatsProvider;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.TierManager;
-import org.apache.aurora.scheduler.base.InstanceKeys;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup;
-import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
-import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.resources.ResourceManager;
-import org.apache.aurora.scheduler.resources.ResourceType;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
-import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
-import org.apache.mesos.v1.Protos;
-import org.apache.mesos.v1.Protos.TaskInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import static org.apache.mesos.v1.Protos.Offer;
-
-/**
- * Responsible for matching a task against an offer and launching it.
- */
-public interface TaskAssigner {
-  /**
-   * Tries to match a task against an offer.  If a match is found, the 
assigner makes the
-   * appropriate changes to the task and requests task launch.
-   *
-   * @param storeProvider Storage provider.
-   * @param resourceRequest The request for resources being scheduled.
-   * @param groupKey Task group key.
-   * @param tasks Tasks to assign.
-   * @param preemptionReservations Slave reservations.
-   * @return Successfully assigned task IDs.
-   */
-  Set<String> maybeAssign(
-      MutableStoreProvider storeProvider,
-      ResourceRequest resourceRequest,
-      TaskGroupKey groupKey,
-      Iterable<IAssignedTask> tasks,
-      Map<String, TaskGroupKey> preemptionReservations);
-
-  class FirstFitTaskAssigner implements TaskAssigner {
-    private static final Logger LOG = 
LoggerFactory.getLogger(FirstFitTaskAssigner.class);
-
-    @VisibleForTesting
-    static final Optional<String> LAUNCH_FAILED_MSG =
-        Optional.of("Unknown exception attempting to schedule task.");
-    @VisibleForTesting
-    static final String ASSIGNER_LAUNCH_FAILURES = "assigner_launch_failures";
-    @VisibleForTesting
-    static final String ASSIGNER_EVALUATED_OFFERS = 
"assigner_evaluated_offers";
-
-    private final AtomicLong launchFailures;
-    private final AtomicLong evaluatedOffers;
-
-    private final StateManager stateManager;
-    private final SchedulingFilter filter;
-    private final MesosTaskFactory taskFactory;
-    private final OfferManager offerManager;
-    private final TierManager tierManager;
-    private final UpdateAgentReserver updateAgentReserver;
-
-    @Inject
-    public FirstFitTaskAssigner(
-        StateManager stateManager,
-        SchedulingFilter filter,
-        MesosTaskFactory taskFactory,
-        OfferManager offerManager,
-        TierManager tierManager,
-        UpdateAgentReserver updateAgentReserver,
-        StatsProvider statsProvider) {
-
-      this.stateManager = requireNonNull(stateManager);
-      this.filter = requireNonNull(filter);
-      this.taskFactory = requireNonNull(taskFactory);
-      this.offerManager = requireNonNull(offerManager);
-      this.tierManager = requireNonNull(tierManager);
-      this.launchFailures = 
statsProvider.makeCounter(ASSIGNER_LAUNCH_FAILURES);
-      this.evaluatedOffers = 
statsProvider.makeCounter(ASSIGNER_EVALUATED_OFFERS);
-      this.updateAgentReserver = requireNonNull(updateAgentReserver);
-    }
-
-    @VisibleForTesting
-    IAssignedTask mapAndAssignResources(Offer offer, IAssignedTask task) {
-      IAssignedTask assigned = task;
-      for (ResourceType type : ResourceManager.getTaskResourceTypes(assigned)) 
{
-        if (type.getMapper().isPresent()) {
-          assigned = type.getMapper().get().mapAndAssign(offer, assigned);
-        }
-      }
-      return assigned;
-    }
-
-    private TaskInfo assign(
-        MutableStoreProvider storeProvider,
-        Offer offer,
-        String taskId,
-        boolean revocable) {
-
-      String host = offer.getHostname();
-      IAssignedTask assigned = stateManager.assignTask(
-          storeProvider,
-          taskId,
-          host,
-          offer.getAgentId(),
-          task -> mapAndAssignResources(offer, task));
-      LOG.info(
-          "Offer on agent {} (id {}) is being assigned task for {}.",
-          host, offer.getAgentId().getValue(), taskId);
-      return taskFactory.createFrom(assigned, offer, revocable);
-    }
-
-    private boolean evaluateOffer(
-        MutableStoreProvider storeProvider,
-        boolean revocable,
-        ResourceRequest resourceRequest,
-        TaskGroupKey groupKey,
-        IAssignedTask task,
-        HostOffer offer,
-        ImmutableSet.Builder<String> assignmentResult) throws 
OfferManager.LaunchException {
-
-      String taskId = task.getTaskId();
-      Set<Veto> vetoes = filter.filter(
-          new UnusedResource(
-              offer.getResourceBag(revocable),
-              offer.getAttributes(),
-              offer.getUnavailabilityStart()),
-          resourceRequest);
-
-      if (vetoes.isEmpty()) {
-        TaskInfo taskInfo = assign(
-            storeProvider,
-            offer.getOffer(),
-            taskId,
-            revocable);
-        
resourceRequest.getJobState().updateAttributeAggregate(offer.getAttributes());
-
-        try {
-          offerManager.launchTask(offer.getOffer().getId(), taskInfo);
-          assignmentResult.add(taskId);
-          return true;
-        } catch (OfferManager.LaunchException e) {
-          LOG.warn("Failed to launch task.", e);
-          launchFailures.incrementAndGet();
-
-          // The attempt to schedule the task failed, so we need to backpedal 
on the
-          // assignment.
-          // It is in the LOST state and a new task will move to PENDING to 
replace it.
-          // Should the state change fail due to storage issues, that's okay.  
The task will
-          // time out in the ASSIGNED state and be moved to LOST.
-          stateManager.changeState(
-              storeProvider,
-              taskId,
-              Optional.of(PENDING),
-              LOST,
-              LAUNCH_FAILED_MSG);
-          throw e;
-        }
-      } else {
-        if (Veto.identifyGroup(vetoes) == VetoGroup.STATIC) {
-          // Never attempt to match this offer/groupKey pair again.
-          offerManager.banOfferForTaskGroup(offer.getOffer().getId(), 
groupKey);
-        }
-        LOG.debug("Agent {} vetoed task {}: {}", 
offer.getOffer().getHostname(), taskId, vetoes);
-      }
-      return false;
-    }
-
-    private Iterable<IAssignedTask> maybeAssignReserved(
-        Iterable<IAssignedTask> tasks,
-        MutableStoreProvider storeProvider,
-        boolean revocable,
-        ResourceRequest resourceRequest,
-        TaskGroupKey groupKey,
-        ImmutableSet.Builder<String> assignmentResult) {
-
-      if (!updateAgentReserver.hasReservations(groupKey)) {
-        return tasks;
-      }
-
-      // Data structure to record which tasks should be excluded from the 
regular (non-reserved)
-      // scheduling loop. This is important because we release reservations 
once they are used,
-      // so we need to record them separately to avoid them being 
double-scheduled.
-      ImmutableSet.Builder<IInstanceKey> excludeBuilder = 
ImmutableSet.builder();
-
-      for (IAssignedTask task: tasks) {
-        IInstanceKey key = InstanceKeys.from(task.getTask().getJob(), 
task.getInstanceId());
-        Optional<String> maybeAgentId = updateAgentReserver.getAgent(key);
-        if (maybeAgentId.isPresent()) {
-          excludeBuilder.add(key);
-          Optional<HostOffer> offer = offerManager.getOffer(
-              
Protos.AgentID.newBuilder().setValue(maybeAgentId.get()).build());
-          if (offer.isPresent()) {
-            try {
-              // The offer can still be veto'd because of changed constraints, 
or because the
-              // Scheduler hasn't been updated by Mesos yet...
-              if (evaluateOffer(
-                  storeProvider,
-                  revocable,
-                  resourceRequest,
-                  groupKey,
-                  task,
-                  offer.get(),
-                  assignmentResult)) {
-
-                LOG.info("Used update reservation for {} on {}", key, 
maybeAgentId.get());
-                updateAgentReserver.release(maybeAgentId.get(), key);
-              } else {
-                LOG.info(
-                    "Tried to reuse offer on {} for {}, but was not ready 
yet.",
-                    maybeAgentId.get(),
-                    key);
-              }
-            } catch (OfferManager.LaunchException e) {
-              updateAgentReserver.release(maybeAgentId.get(), key);
-            }
-          }
-        }
-      }
-
-      // Return only the tasks that didn't have reservations. Offers on agents 
that were reserved
-      // might not have been seen by Aurora yet, so we need to wait until the 
reservation expires
-      // before giving up and falling back to the first-fit algorithm.
-      Set<IInstanceKey> toBeExcluded = excludeBuilder.build();
-      return Iterables.filter(tasks, t -> !toBeExcluded.contains(
-          InstanceKeys.from(t.getTask().getJob(), t.getInstanceId())));
-    }
-
-    @Timed("assigner_maybe_assign")
-    @Override
-    public Set<String> maybeAssign(
-        MutableStoreProvider storeProvider,
-        ResourceRequest resourceRequest,
-        TaskGroupKey groupKey,
-        Iterable<IAssignedTask> tasks,
-        Map<String, TaskGroupKey> preemptionReservations) {
-
-      if (Iterables.isEmpty(tasks)) {
-        return ImmutableSet.of();
-      }
-
-      boolean revocable = 
tierManager.getTier(groupKey.getTask()).isRevocable();
-      ImmutableSet.Builder<String> assignmentResult = ImmutableSet.builder();
-
-      Iterable<IAssignedTask> nonReservedTasks = maybeAssignReserved(
-          tasks,
-          storeProvider,
-          revocable,
-          resourceRequest,
-          groupKey,
-          assignmentResult);
-
-      Iterator<IAssignedTask> remainingTasks = nonReservedTasks.iterator();
-      // Make sure we still have tasks to process after reservations are 
processed.
-      if (remainingTasks.hasNext()) {
-        IAssignedTask task = remainingTasks.next();
-        for (HostOffer offer : offerManager.getOffers(groupKey)) {
-
-          if (!offer.hasCpuAndMem()) {
-            // This offer lacks any type of CPU or mem resource, and therefore 
will never match
-            // a task.
-            continue;
-          }
-
-          String agentId = offer.getOffer().getAgentId().getValue();
-
-          Optional<TaskGroupKey> reservedGroup = Optional.fromNullable(
-              preemptionReservations.get(agentId));
-
-          if (reservedGroup.isPresent() && 
!reservedGroup.get().equals(groupKey)) {
-            // This slave is reserved for a different task group -> skip.
-            continue;
-          }
-
-          if (!updateAgentReserver.getReservations(agentId).isEmpty()) {
-            // This agent has been reserved for an update in-progress, skip.
-            continue;
-          }
-
-          evaluatedOffers.incrementAndGet();
-          try {
-            boolean offerUsed = evaluateOffer(
-                storeProvider, revocable, resourceRequest, groupKey, task, 
offer, assignmentResult);
-            if (offerUsed) {
-              if (remainingTasks.hasNext()) {
-                task = remainingTasks.next();
-              } else {
-                break;
-              }
-            }
-          } catch (OfferManager.LaunchException e) {
-            break;
-          }
-        }
-      }
-
-      return assignmentResult.build();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java 
b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
index 6033c01..e629093 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
@@ -157,7 +157,7 @@ public class AsyncStatsModule extends AbstractModule {
 
     @Override
     public Iterable<MachineResource> get() {
-      Iterable<HostOffer> offers = offerManager.getOffers();
+      Iterable<HostOffer> offers = offerManager.getAll();
 
       ImmutableList.Builder<MachineResource> builder = ImmutableList.builder();
       for (HostOffer offer : offers) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java 
b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
index 0ec4de6..5cb5310 100644
--- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
@@ -160,6 +160,7 @@ public class CommandLineTest {
     expected.scheduling.reservationDuration = TEST_TIME;
     expected.scheduling.schedulingMaxBatchSize = 42;
     expected.scheduling.maxTasksPerScheduleAttempt = 42;
+    expected.taskAssigner.offerSelectorModules = 
ImmutableList.of(NoopModule.class);
     expected.async.asyncWorkerThreads = 42;
     expected.zk.inProcess = true;
     expected.zk.zkEndpoints = 
ImmutableList.of(InetSocketAddress.createUnresolved("testing", 42));
@@ -266,11 +267,11 @@ public class CommandLineTest {
         "-hold_offers_forever=true",
         "-min_offer_hold_time=42days",
         "-offer_hold_jitter_window=42days",
-        "-offer_static_ban_cache_max_size=42",
         "-offer_filter_duration=42days",
         "-unavailability_threshold=42days",
         "-offer_order=CPU,DISK",
         
"-offer_order_modules=org.apache.aurora.scheduler.config.CommandLineTest$NoopModule",
+        "-offer_static_ban_cache_max_size=42",
         "-custom_executor_config=" + tempFile.getAbsolutePath(),
         "-thermos_executor_path=testing",
         "-thermos_executor_resources=testing",
@@ -306,6 +307,7 @@ public class CommandLineTest {
         "-offer_reservation_duration=42days",
         "-scheduling_max_batch_size=42",
         "-max_tasks_per_schedule_attempt=42",
+        
"-offer_selector_modules=org.apache.aurora.scheduler.config.CommandLineTest$NoopModule",
         "-async_worker_threads=42",
         "-zk_in_proc=true",
         "-zk_endpoints=testing:42",

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java 
b/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java
index 3069959..549d2e3 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java
@@ -49,7 +49,7 @@ public class OffersTest extends EasyMockTest {
 
   @Test
   public void testNoOffers() throws Exception {
-    expect(offerManager.getOffers()).andReturn(ImmutableSet.of());
+    expect(offerManager.getAll()).andReturn(ImmutableSet.of());
 
     control.replay();
 
@@ -134,7 +134,7 @@ public class OffersTest extends EasyMockTest {
             .build(),
         IHostAttributes.build(new HostAttributes().setMode(NONE)));
 
-    expect(offerManager.getOffers()).andReturn(ImmutableSet.of(offer));
+    expect(offerManager.getAll()).andReturn(ImmutableSet.of(offer));
 
     control.replay();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java 
b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
index 45ae6bb..64efc0d 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
@@ -259,7 +259,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
   public void testOffers() {
     storageUtil.expectOperations();
     expectOfferAttributesSaved(HOST_OFFER);
-    offerManager.addOffer(HOST_OFFER);
+    offerManager.add(HOST_OFFER);
 
     control.replay();
 
@@ -272,8 +272,8 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
     storageUtil.expectOperations();
     expectOfferAttributesSaved(HOST_OFFER);
     expectOfferAttributesSaved(HOST_OFFER_2);
-    offerManager.addOffer(HOST_OFFER);
-    offerManager.addOffer(HOST_OFFER_2);
+    offerManager.add(HOST_OFFER);
+    offerManager.add(HOST_OFFER_2);
 
     control.replay();
 
@@ -296,7 +296,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
     
expect(storageUtil.attributeStore.saveHostAttributes(saved)).andReturn(true);
 
     // If the host is in draining, then the offer manager should get an offer 
with that attribute
-    offerManager.addOffer(DRAINING_HOST_OFFER);
+    offerManager.add(DRAINING_HOST_OFFER);
 
     control.replay();
     handler.handleOffers(ImmutableList.of(HOST_OFFER.getOffer()));
@@ -316,7 +316,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
 
   @Test
   public void testRescind() {
-    expect(offerManager.cancelOffer(OFFER_ID)).andReturn(true);
+    expect(offerManager.cancel(OFFER_ID)).andReturn(true);
 
     control.replay();
 
@@ -336,12 +336,12 @@ public class MesosCallbackHandlerTest extends 
EasyMockTest {
     FakeScheduledThreadPoolExecutor fakeExecutor = new 
FakeScheduledThreadPoolExecutor();
     createHandler(false, fakeExecutor);
 
-    expect(offerManager.cancelOffer(OFFER_ID)).andReturn(false);
-    offerManager.banOffer(OFFER_ID);
+    expect(offerManager.cancel(OFFER_ID)).andReturn(false);
+    offerManager.ban(OFFER_ID);
     storageUtil.expectOperations();
     expectOfferAttributesSaved(HOST_OFFER);
-    offerManager.addOffer(HOST_OFFER);
-    expect(offerManager.cancelOffer(OFFER_ID)).andReturn(true);
+    offerManager.add(HOST_OFFER);
+    expect(offerManager.cancel(OFFER_ID)).andReturn(true);
 
     control.replay();
     replay(offerManager);
@@ -355,7 +355,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
     // Eventually, we unban the offer.
     handler.handleRescind(OFFER_ID);
 
-    // 2 commands executed (addOffer and unbanOffer).
+    // 2 commands executed (add and unbanOffer).
     fakeExecutor.advance();
     fakeExecutor.advance();
 

Reply via email to