http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java 
b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
deleted file mode 100644
index f58c66a..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Optional;
-import java.util.Set;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Predicates;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-import com.google.common.eventbus.Subscribe;
-
-import org.apache.aurora.gen.HostStatus;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.BatchWorker;
-import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IHostStatus;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.mesos.v1.Protos;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
-import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
-
-/**
- * Logic that puts hosts into maintenance mode, and triggers draining of hosts 
upon request.
- * All state-changing functions return their results.  Additionally, all 
state-changing functions
- * will ignore requests to change state of unknown hosts and subsequently omit 
these hosts from
- * return values.
- */
-public interface MaintenanceController {
-
-  /**
-   * Places hosts in maintenance mode.
-   * Hosts in maintenance mode are less-preferred for scheduling.
-   * No change will be made for hosts that are not recognized, and 
unrecognized hosts will not be
-   * included in the result.
-   *
-   * @param hosts Hosts to put into maintenance mode.
-   * @return The adjusted state of the hosts.
-   */
-  Set<IHostStatus> startMaintenance(Set<String> hosts);
-
-  /**
-   * Initiate a drain of all active tasks on {@code hosts}.
-   *
-   * @param hosts Hosts to drain.
-   * @return The adjusted state of the hosts.  Hosts without any active tasks 
will be immediately
-   *         moved to DRAINED.
-   */
-  Set<IHostStatus> drain(Set<String> hosts);
-
-  /**
-   * Drain tasks defined by the inverse offer.
-   * This method doesn't set any host attributes.
-   *
-   * @param inverseOffer the inverse offer to use.
-   */
-  void drainForInverseOffer(Protos.InverseOffer inverseOffer);
-
-  /**
-   * Fetches the current maintenance mode of {$code host}.
-   *
-   * @param host Host to fetch state for.
-   * @return Maintenance mode of host, {@link MaintenanceMode#NONE} if the 
host is not known.
-   */
-  MaintenanceMode getMode(String host);
-
-  /**
-   * Fetches the current state of {@code hosts}.
-   *
-   * @param hosts Hosts to fetch state for.
-   * @return The state of the hosts.
-   */
-  Set<IHostStatus> getStatus(Set<String> hosts);
-
-  /**
-   * Moves {@code hosts} out of maintenance mode, returning them to mode NONE.
-   *
-   * @param hosts Hosts to move out of maintenance mode.
-   * @return The adjusted state of the hosts.
-   */
-  Set<IHostStatus> endMaintenance(Set<String> hosts);
-
-  class MaintenanceControllerImpl implements MaintenanceController, 
EventSubscriber {
-    private static final Logger LOG = 
LoggerFactory.getLogger(MaintenanceControllerImpl.class);
-    private final Storage storage;
-    private final StateManager stateManager;
-    private final TaskEventBatchWorker batchWorker;
-
-    @Inject
-    public MaintenanceControllerImpl(
-        Storage storage,
-        StateManager stateManager,
-        TaskEventBatchWorker batchWorker) {
-
-      this.storage = requireNonNull(storage);
-      this.stateManager = requireNonNull(stateManager);
-      this.batchWorker = requireNonNull(batchWorker);
-    }
-
-    private Set<String> drainTasksOnHost(String host, MutableStoreProvider 
store) {
-      Query.Builder query = Query.slaveScoped(host).active();
-      Set<String> activeTasks = 
FluentIterable.from(store.getTaskStore().fetchTasks(query))
-          .transform(Tasks::id)
-          .toSet();
-
-      if (activeTasks.isEmpty()) {
-        LOG.info("No tasks to drain on host: {}", host);
-        // Simple way to avoid the log message if there are no tasks.
-        return activeTasks;
-      } else {
-        LOG.info("Draining tasks: {} on host: {}", activeTasks, host);
-        for (String taskId : activeTasks) {
-          stateManager.changeState(
-              store,
-              taskId,
-              Optional.empty(),
-              ScheduleStatus.DRAINING,
-              DRAINING_MESSAGE);
-        }
-
-        return activeTasks;
-      }
-
-    }
-
-    private Set<IHostStatus> watchDrainingTasks(MutableStoreProvider store, 
Set<String> hosts) {
-      LOG.info("Hosts to drain: " + hosts);
-      Set<String> emptyHosts = Sets.newHashSet();
-      for (String host : hosts) {
-        Set<String> drainedTasks = drainTasksOnHost(host, store);
-        // If there are no tasks on the host, immediately transition to 
DRAINED.
-        if (drainedTasks.isEmpty()) {
-          emptyHosts.add(host);
-        }
-      }
-
-      return ImmutableSet.<IHostStatus>builder()
-          .addAll(setMaintenanceMode(store, emptyHosts, DRAINED))
-          .addAll(setMaintenanceMode(store, Sets.difference(hosts, 
emptyHosts), DRAINING))
-          .build();
-    }
-
-    /**
-     * Notifies the MaintenanceController that a task has changed state.
-     *
-     * @param change Event
-     */
-    @Subscribe
-    public void taskChangedState(final TaskStateChange change) {
-      if (Tasks.isTerminated(change.getNewState())) {
-        final String host = change.getTask().getAssignedTask().getSlaveHost();
-        batchWorker.execute(store -> {
-          // If the task _was_ associated with a draining host, and it was the 
last task on the
-          // host.
-          Optional<IHostAttributes> attributes =
-              store.getAttributeStore().getHostAttributes(host);
-          if (attributes.isPresent() && attributes.get().getMode() == 
DRAINING) {
-            Query.Builder builder = Query.slaveScoped(host).active();
-            Iterable<IScheduledTask> activeTasks = 
store.getTaskStore().fetchTasks(builder);
-            if (Iterables.isEmpty(activeTasks)) {
-              LOG.info("Moving host {} into DRAINED", host);
-              setMaintenanceMode(store, ImmutableSet.of(host), DRAINED);
-            } else {
-              LOG.info("Host {} is DRAINING with active tasks: {}", host, 
Tasks.ids(activeTasks));
-            }
-          }
-          return BatchWorker.NO_RESULT;
-        });
-      }
-    }
-
-    @Override
-    public Set<IHostStatus> startMaintenance(Set<String> hosts) {
-      return storage.write(
-          storeProvider -> setMaintenanceMode(storeProvider, hosts, 
MaintenanceMode.SCHEDULED));
-    }
-
-    @VisibleForTesting
-    static final Optional<String> DRAINING_MESSAGE =
-        Optional.of("Draining machine for maintenance.");
-
-    @Override
-    public Set<IHostStatus> drain(Set<String> hosts) {
-      return storage.write(store -> watchDrainingTasks(store, hosts));
-    }
-
-    private Optional<String> getHostname(Protos.InverseOffer offer) {
-      if (offer.getUrl().getAddress().hasHostname()) {
-        return Optional.of(offer.getUrl().getAddress().getHostname());
-      } else {
-        return Optional.empty();
-      }
-    }
-
-    @Override
-    public void drainForInverseOffer(Protos.InverseOffer offer) {
-      // TaskStore does not allow for querying by agent id.
-      Optional<String> hostname = getHostname(offer);
-
-      if (hostname.isPresent()) {
-        String host = hostname.get();
-        storage.write(storeProvider -> drainTasksOnHost(host, storeProvider));
-      } else {
-        LOG.error("Unable to drain tasks on agent {} because "
-            + "no hostname attached to inverse offer {}.", offer.getAgentId(), 
offer.getId());
-      }
-    }
-
-    private static final Function<IHostAttributes, String> HOST_NAME =
-        IHostAttributes::getHost;
-
-    private static final Function<IHostAttributes, IHostStatus> 
ATTRS_TO_STATUS =
-        attributes -> IHostStatus.build(
-            new 
HostStatus().setHost(attributes.getHost()).setMode(attributes.getMode()));
-
-    private static final Function<IHostStatus, MaintenanceMode> GET_MODE = 
IHostStatus::getMode;
-
-    @Override
-    public MaintenanceMode getMode(final String host) {
-      return storage.read(storeProvider -> 
storeProvider.getAttributeStore().getHostAttributes(host)
-          .map(ATTRS_TO_STATUS)
-          .map(GET_MODE)
-          .orElse(MaintenanceMode.NONE));
-    }
-
-    @Override
-    public Set<IHostStatus> getStatus(final Set<String> hosts) {
-      return storage.read(storeProvider -> {
-        // Warning - this is filtering _all_ host attributes.  If using this 
to frequently query
-        // for a small set of hosts, a getHostAttributes variant should be 
added.
-        return 
FluentIterable.from(storeProvider.getAttributeStore().getHostAttributes())
-            .filter(Predicates.compose(Predicates.in(hosts), HOST_NAME))
-            .transform(ATTRS_TO_STATUS)
-            .toSet();
-      });
-    }
-
-    @Override
-    public Set<IHostStatus> endMaintenance(final Set<String> hosts) {
-      return storage.write(
-          storeProvider -> setMaintenanceMode(storeProvider, hosts, 
MaintenanceMode.NONE));
-    }
-
-    private Set<IHostStatus> setMaintenanceMode(
-        MutableStoreProvider storeProvider,
-        Set<String> hosts,
-        MaintenanceMode mode) {
-
-      AttributeStore.Mutable store = storeProvider.getAttributeStore();
-      ImmutableSet.Builder<IHostStatus> statuses = ImmutableSet.builder();
-      for (String host : hosts) {
-        LOG.info("Setting maintenance mode to {} for host {}", mode, host);
-        Optional<IHostAttributes> toSave = 
AttributeStore.Util.mergeMode(store, host, mode);
-        if (toSave.isPresent()) {
-          store.saveHostAttributes(toSave.get());
-          LOG.info("Updated host attributes: " + toSave.get());
-          statuses.add(IHostStatus.build(new 
HostStatus().setHost(host).setMode(mode)));
-        }
-      }
-      return statuses.build();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java 
b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
index 0e0f90b..dfc2b5c 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
@@ -18,10 +18,8 @@ import javax.inject.Singleton;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
 import com.google.inject.Module;
 
 import org.apache.aurora.scheduler.app.MoreModules;
@@ -31,7 +29,6 @@ import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl;
 import org.apache.aurora.scheduler.scheduling.TaskAssignerImplModule;
-import 
org.apache.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl;
 import org.apache.aurora.scheduler.state.UUIDGenerator.UUIDGeneratorImpl;
 
 /**
@@ -69,16 +66,7 @@ public class StateModule extends AbstractModule {
 
     PubsubEventModule.bindSubscriber(binder(), PartitionManager.class);
 
-    bindMaintenanceController(binder());
-
     bind(ClusterState.class).to(ClusterStateImpl.class);
     bind(ClusterStateImpl.class).in(Singleton.class);
   }
-
-  @VisibleForTesting
-  static void bindMaintenanceController(Binder binder) {
-    
binder.bind(MaintenanceController.class).to(MaintenanceControllerImpl.class);
-    binder.bind(MaintenanceControllerImpl.class).in(Singleton.class);
-    PubsubEventModule.bindSubscriber(binder, MaintenanceControllerImpl.class);
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java 
b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
index e88cad6..800abfd 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
@@ -354,9 +354,13 @@ class ReadOnlySchedulerImpl implements 
ReadOnlyScheduler.Iface {
   public Response getJobUpdateDiff(JobUpdateRequest mutableRequest) {
     IJobUpdateRequest request;
     try {
-      request = IJobUpdateRequest.build(new 
JobUpdateRequest(mutableRequest).setTaskConfig(
-          configurationManager.validateAndPopulate(
-              
ITaskConfig.build(mutableRequest.getTaskConfig())).newBuilder()));
+      request = IJobUpdateRequest.build(
+          new JobUpdateRequest(mutableRequest)
+              .setTaskConfig(configurationManager
+                  .validateAndPopulate(
+                      ITaskConfig.build(mutableRequest.getTaskConfig()),
+                      mutableRequest.getInstanceCount())
+                  .newBuilder()));
     } catch (TaskDescriptionException e) {
       return error(INVALID_REQUEST, e);
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
 
b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 9fc0416..6a28bc2 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -63,6 +63,7 @@ import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.gen.Response;
 import org.apache.aurora.gen.Result;
 import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.SlaPolicy;
 import org.apache.aurora.gen.StartJobUpdateResult;
 import org.apache.aurora.gen.StartMaintenanceResult;
 import org.apache.aurora.gen.TaskQuery;
@@ -76,11 +77,11 @@ import 
org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
 import org.apache.aurora.scheduler.cron.CronException;
 import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.maintenance.MaintenanceController;
 import org.apache.aurora.scheduler.quota.QuotaCheckResult;
 import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
 import org.apache.aurora.scheduler.reconciliation.TaskReconciler;
-import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.UUIDGenerator;
@@ -156,6 +157,8 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
   @VisibleForTesting
   static final String DRAIN_HOSTS = STAT_PREFIX + "drainHosts";
   @VisibleForTesting
+  static final String SLA_DRAIN_HOSTS = STAT_PREFIX + "slaDrainHosts";
+  @VisibleForTesting
   static final String MAINTENANCE_STATUS = STAT_PREFIX + "maintenanceStatus";
   @VisibleForTesting
   static final String END_MAINTENANCE = STAT_PREFIX + "endMaintenance";
@@ -190,6 +193,7 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
   private final AtomicLong restartShardsCounter;
   private final AtomicLong startMaintenanceCounter;
   private final AtomicLong drainHostsCounter;
+  private final AtomicLong slaDrainHostsCounter;
   private final AtomicLong maintenanceStatusCounter;
   private final AtomicLong endMaintenanceCounter;
   private final AtomicLong addInstancesCounter;
@@ -237,6 +241,7 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
     this.restartShardsCounter = statsProvider.makeCounter(RESTART_SHARDS);
     this.startMaintenanceCounter = 
statsProvider.makeCounter(START_MAINTENANCE);
     this.drainHostsCounter = statsProvider.makeCounter(DRAIN_HOSTS);
+    this.slaDrainHostsCounter = statsProvider.makeCounter(SLA_DRAIN_HOSTS);
     this.maintenanceStatusCounter = 
statsProvider.makeCounter(MAINTENANCE_STATUS);
     this.endMaintenanceCounter = statsProvider.makeCounter(END_MAINTENANCE);
     this.addInstancesCounter = statsProvider.makeCounter(ADD_INSTANCES);
@@ -586,6 +591,14 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
   }
 
   @Override
+  public Response slaDrainHosts(Hosts hosts, SlaPolicy defaultSlaPolicy, long 
timeoutSecs) {
+    slaDrainHostsCounter.addAndGet(hosts.getHostNamesSize());
+    return ok(Result.drainHostsResult(
+        new DrainHostsResult().setStatuses(IHostStatus.toBuildersSet(
+            maintenance.slaDrain(hosts.getHostNames(), defaultSlaPolicy, 
timeoutSecs)))));
+  }
+
+  @Override
   public Response forceTaskState(String taskId, ScheduleStatus status) {
     checkNotBlank(taskId);
     requireNonNull(status);
@@ -807,9 +820,12 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
 
     IJobUpdateRequest request;
     try {
-      request = IJobUpdateRequest.build(new 
JobUpdateRequest(mutableRequest).setTaskConfig(
-          configurationManager.validateAndPopulate(
-              
ITaskConfig.build(mutableRequest.getTaskConfig())).newBuilder()));
+      request = IJobUpdateRequest.build(
+          new JobUpdateRequest(mutableRequest)
+              .setTaskConfig(configurationManager.validateAndPopulate(
+                  ITaskConfig.build(mutableRequest.getTaskConfig()),
+                  mutableRequest.getInstanceCount())
+              .newBuilder()));
     } catch (TaskDescriptionException e) {
       return error(INVALID_REQUEST, e);
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/python/apache/aurora/admin/admin_util.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/admin/admin_util.py 
b/src/main/python/apache/aurora/admin/admin_util.py
index 8240e80..964876b 100644
--- a/src/main/python/apache/aurora/admin/admin_util.py
+++ b/src/main/python/apache/aurora/admin/admin_util.py
@@ -100,6 +100,29 @@ OVERRIDE_SLA_REASON_OPTION = optparse.Option(
     help='Reason for overriding default SLA values. Provide details including 
the '
          'maintenance ticket number.')
 
+DEFAULT_SLA_PERCENTAGE_OPTION = optparse.Option(
+    '--default_percentage',
+    dest='default_percentage',
+    default=95,
+    help='Percentage of tasks required to be up all the time within the 
duration. '
+         'This percentage will be used for SLA calculation if tasks do not '
+         'have a configured SlaPolicy.')
+
+DEFAULT_SLA_DURATION_OPTION = optparse.Option(
+    '--default_duration',
+    dest='default_duration',
+    default='30m',
+    help='Time interval (now - value) for the percentage of up tasks. Format: 
XdYhZmWs. '
+         'This duration will be used for SLA calculation if tasks do not '
+         'have a configured SlaPolicy.')
+
+FORCE_DRAIN_TIMEOUT_OPTION = optparse.Option(
+    '--force_drain_timeout',
+    dest='timeout',
+    default='7d',
+    help='Time interval (now - value) after which tasks will be forcefully 
drained. '
+         'Format: XdYhZmWs.')
+
 UNSAFE_SLA_HOSTS_FILE_OPTION = optparse.Option(
     '--unsafe_hosts_file',
     dest='unsafe_hosts_filename',
@@ -213,6 +236,7 @@ def parse_and_validate_sla_overrides(options, hostnames):
   if has_override != all_overrides:
     die('All --override_* options are required when attempting to override 
default SLA values.')
 
+  print(options.percentage)
   percentage = parse_sla_percentage(options.percentage) if options.percentage 
else None
   duration = parse_time(options.duration) if options.duration else None
   if options.reason:
@@ -230,6 +254,20 @@ def parse_and_validate_sla_overrides(options, hostnames):
   return percentage or SLA_UPTIME_PERCENTAGE_LIMIT, duration or 
SLA_UPTIME_DURATION_LIMIT
 
 
+def parse_and_validate_sla_drain_default(options):
+  """Parses and validates host SLA default 3-tuple (percentage, duration, 
timeout).
+
+  :param options: command line options
+  :type options: list of app.option
+  :rtype: a tuple of: default percentage (float), default duration (Amount) 
and timeout (Amount)
+  """
+  percentage = parse_sla_percentage(options.default_percentage)
+  duration = parse_time(options.default_duration).as_(Time.SECONDS)
+  timeout = parse_time(options.timeout).as_(Time.SECONDS)
+
+  return percentage, duration, timeout
+
+
 def print_results(results):
   """Prints formatted SLA results.
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/python/apache/aurora/admin/host_maintenance.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/admin/host_maintenance.py 
b/src/main/python/apache/aurora/admin/host_maintenance.py
index 83fc2b6..3ccf322 100644
--- a/src/main/python/apache/aurora/admin/host_maintenance.py
+++ b/src/main/python/apache/aurora/admin/host_maintenance.py
@@ -170,6 +170,7 @@ class HostMaintenance(object):
     for hostname in drained_hosts:
       callback(hostname)
 
+  # Deprecated: See AURORA-1986.
   def perform_maintenance(self, hostnames, grouping_function=DEFAULT_GROUPING,
                           percentage=None, duration=None, output_file=None, 
callback=None):
     """Put hosts into maintenance mode and drain them.
@@ -234,6 +235,25 @@ class HostMaintenance(object):
 
     return set(hostnames) - not_drained_hostnames
 
+  def perform_sla_maintenance(self, hostnames, percentage, duration, timeout):
+    """Submits hosts for SLA-aware to the scheduler.
+
+
+    :param hostnames: A list of hostnames to operate upon
+    :type hostnames: list of strings
+    :param percentage: SLA percentage to use
+    :type percentage: float
+    :param duration: SLA duration to use
+    :type duration: twitter.common.quantity.Time
+    :param timeout: Force drain timeout
+    :type duration: twitter.common.quantity.Time
+    :rtype: set of host names that were successfully drained
+    """
+    drainable_hosts = Hosts(set(hostnames))
+    check_and_log_response(
+      self._client.sla_drain_hosts(drainable_hosts, percentage, duration, 
timeout))
+    return drainable_hosts
+
   def check_status(self, hostnames):
     """Query the scheduler to determine the maintenance status for a list of 
hostnames
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/python/apache/aurora/admin/maintenance.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/admin/maintenance.py 
b/src/main/python/apache/aurora/admin/maintenance.py
index 942a237..33a7eea 100644
--- a/src/main/python/apache/aurora/admin/maintenance.py
+++ b/src/main/python/apache/aurora/admin/maintenance.py
@@ -18,13 +18,17 @@ from apache.aurora.client.base import GROUPING_OPTION, 
get_grouping_or_die, requ
 from apache.aurora.common.clusters import CLUSTERS
 
 from .admin_util import (
+    DEFAULT_SLA_DURATION_OPTION,
+    DEFAULT_SLA_PERCENTAGE_OPTION,
     FILENAME_OPTION,
+    FORCE_DRAIN_TIMEOUT_OPTION,
     HOSTS_OPTION,
     OVERRIDE_SLA_DURATION_OPTION,
     OVERRIDE_SLA_PERCENTAGE_OPTION,
     OVERRIDE_SLA_REASON_OPTION,
     POST_DRAIN_SCRIPT_OPTION,
     UNSAFE_SLA_HOSTS_FILE_OPTION,
+    parse_and_validate_sla_drain_default,
     parse_and_validate_sla_overrides,
     parse_hostnames,
     parse_script
@@ -132,6 +136,47 @@ def host_drain(cluster):
 
 
 @app.command
[email protected]_option(FORCE_DRAIN_TIMEOUT_OPTION)
[email protected]_option(FILENAME_OPTION)
[email protected]_option(HOSTS_OPTION)
[email protected]_option(DEFAULT_SLA_PERCENTAGE_OPTION)
[email protected]_option(DEFAULT_SLA_DURATION_OPTION)
[email protected]('cluster')
+def sla_host_drain(cluster):
+  """usage: sla_host_drain {--filename=filename | --hosts=hosts}
+                           [--default_percentage=percentage]
+                           [--default_duration=duration]
+                           [--force_drain_timeout=timeout]
+                           cluster
+
+  Asks the scheduler to drain the list of provided hosts in an SLA-aware 
manner.
+
+  The list of hosts is drained and marked in a drained state.  This will kill
+  off any tasks currently running on these hosts, as well as prevent future
+  tasks from scheduling on these hosts while they are drained.
+
+  The hosts are left in maintenance mode upon completion. Use host_activate to
+  return hosts back to service and allow scheduling tasks on them.
+
+  If tasks are unable to be drained after the specified timeout interval they 
will
+  be forcefully drained even if it breaks SLA.
+  """
+  options = app.get_options()
+  drainable_hosts = parse_hostnames(options.filename, options.hosts)
+
+  percentage, duration, timeout = parse_and_validate_sla_drain_default(options)
+
+  HostMaintenance(
+      cluster=CLUSTERS[cluster],
+      verbosity=options.verbosity,
+      
bypass_leader_redirect=options.bypass_leader_redirect).perform_sla_maintenance(
+          drainable_hosts,
+          percentage=percentage,
+          duration=duration,
+          timeout=timeout)
+
+
[email protected]
 @app.command_option(FILENAME_OPTION)
 @app.command_option(HOSTS_OPTION)
 @requires.exactly('cluster')

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/python/apache/aurora/client/api/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/__init__.py 
b/src/main/python/apache/aurora/client/api/__init__.py
index f6fd1dd..f1a82cb 100644
--- a/src/main/python/apache/aurora/client/api/__init__.py
+++ b/src/main/python/apache/aurora/client/api/__init__.py
@@ -33,8 +33,10 @@ from gen.apache.aurora.api.ttypes import (
     JobUpdateQuery,
     JobUpdateRequest,
     Metadata,
+    PercentageSlaPolicy,
     Resource,
     ResourceAggregate,
+    SlaPolicy,
     TaskQuery
 )
 
@@ -298,6 +300,16 @@ class AuroraClientAPI(object):
     log.info("Draining tasks on: %s" % hosts.hostNames)
     return self._scheduler_proxy.drainHosts(hosts)
 
+  def sla_drain_hosts(self, hosts, percentage=None, duration=None, 
timeout=None):
+    log.info("Asking scheduler to drain tasks by SLA on: %s" % hosts.hostNames)
+    return self._scheduler_proxy.slaDrainHosts(
+      hosts,
+      SlaPolicy(
+        percentageSlaPolicy=PercentageSlaPolicy(
+          percentage=percentage,
+          durationSecs=duration)),
+      timeout)
+
   def maintenance_status(self, hosts):
     log.info("Maintenance status for: %s" % hosts.hostNames)
     # read-only calls are retriable.

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/python/apache/aurora/client/cli/context.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/context.py 
b/src/main/python/apache/aurora/client/cli/context.py
index 06b1941..ad16c2d 100644
--- a/src/main/python/apache/aurora/client/cli/context.py
+++ b/src/main/python/apache/aurora/client/cli/context.py
@@ -244,3 +244,19 @@ class AuroraCommandContext(Context):
       raise self.CommandError(EXIT_INVALID_PARAMETER,
           "Invalid instance parameter: %s" % (list(unrecognized)))
     return active
+
+  def has_count_or_percentage_sla_policy(self, key):
+    """Returns true if any of the tasks has count or percentage sla policy, 
false otherwise.
+
+    :param key: Job key
+    :type key: AuroraJobKey
+    :return: true if any of the tasks has count or percentage sla policy, 
false otherwise.
+    """
+    for task in self.get_active_tasks(key):
+      if task.assignedTask.task.slaPolicy:
+        if task.assignedTask.task.slaPolicy.percentageSlaPolicy:
+          return True
+        if task.assignedTask.task.slaPolicy.countSlaPolicy:
+          return True
+
+    return False

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/python/apache/aurora/client/cli/jobs.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/jobs.py 
b/src/main/python/apache/aurora/client/cli/jobs.py
index 536d04a..f5b6114 100644
--- a/src/main/python/apache/aurora/client/cli/jobs.py
+++ b/src/main/python/apache/aurora/client/cli/jobs.py
@@ -379,6 +379,9 @@ class AddCommand(Verb):
     active = context.get_active_instances_or_raise(job, [instance])
     start = max(list(active)) + 1
 
+    if context.has_count_or_percentage_sla_policy(job):
+      context.print_out("WARNING: Adding instances without updating 
SlaPolicy.")
+
     api = context.get_api(job.cluster)
     resp = api.add_instances(job, instance, count)
     context.log_response_and_raise(resp)
@@ -410,6 +413,10 @@ class KillCommand(AbstractKillCommand):
       raise context.CommandError(EXIT_INVALID_PARAMETER,
           "The instances list cannot be omitted in a kill command!; "
           "use killall to kill all instances")
+
+    if context.has_count_or_percentage_sla_policy(job):
+      context.print_out("WARNING: Killing instances without updating 
SlaPolicy.")
+
     if context.options.strict:
       context.get_active_instances_or_raise(job, instances_arg)
     api = context.get_api(job.cluster)

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java 
b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 63c338e..534a912 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -39,12 +39,14 @@ import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.Module;
+import com.google.inject.TypeLiteral;
 
 import org.apache.aurora.GuavaUtils;
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
 import org.apache.aurora.common.application.Lifecycle;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Data;
+import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.Stats;
 import org.apache.aurora.common.zookeeper.Credentials;
 import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
@@ -64,6 +66,7 @@ import org.apache.aurora.scheduler.AppStartup;
 import org.apache.aurora.scheduler.TierModule;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.config.CliOptions;
+import org.apache.aurora.scheduler.config.types.TimeAmount;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule;
 import org.apache.aurora.scheduler.discovery.ZooKeeperConfig;
@@ -71,6 +74,7 @@ import org.apache.aurora.scheduler.log.Log;
 import org.apache.aurora.scheduler.log.Log.Entry;
 import org.apache.aurora.scheduler.log.Log.Position;
 import org.apache.aurora.scheduler.log.Log.Stream;
+import org.apache.aurora.scheduler.maintenance.MaintenanceController;
 import org.apache.aurora.scheduler.mesos.DriverFactory;
 import org.apache.aurora.scheduler.mesos.DriverSettings;
 import org.apache.aurora.scheduler.mesos.FrameworkInfoFactory;
@@ -207,6 +211,11 @@ public class SchedulerIT extends BaseZooKeeperTest {
                 new ServerInfo()
                     .setClusterName(CLUSTER_NAME)
                     .setStatsUrlPrefix(STATS_URL_PREFIX)));
+
+        bind(new TypeLiteral<Amount<Long, Time>>() { })
+            .annotatedWith(
+                
MaintenanceController.MaintenanceControllerImpl.PollingInterval.class)
+            .toInstance(new TimeAmount(1, Time.MINUTES));
       }
     };
     ZooKeeperConfig zkClientConfig =

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/base/TaskTestUtil.java 
b/src/test/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
index 8c1f5ce..abee095 100644
--- a/src/test/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
+++ b/src/test/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
@@ -91,6 +91,8 @@ public final class TaskTestUtil {
           true,
           true,
           true,
+          2,
+          1800L,
           ConfigurationManager.DEFAULT_ALLOWED_JOB_ENVIRONMENTS);
   public static final ExecutorID EXECUTOR_ID = ExecutorID.newBuilder()
       .setValue("PLACEHOLDER")
@@ -126,7 +128,7 @@ public final class TaskTestUtil {
         .setPriority(1)
         .setMaxTaskFailures(-1)
         .setProduction(prod)
-        .setTier(PROD_TIER_NAME)
+        .setTier(prod ? PROD_TIER_NAME : DEV_TIER_NAME)
         .setPartitionPolicy(new 
PartitionPolicy().setDelaySecs(5).setReschedule(true))
         .setSlaPolicy(SlaPolicy.percentageSlaPolicy(
             new 
PercentageSlaPolicy().setPercentage(95.0).setDurationSecs(1800)))

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java 
b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
index e66ec11..dcf5889 100644
--- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
@@ -231,9 +231,13 @@ public class CommandLineTest {
     expected.mesosLog.coordinatorElectionRetries = 42;
     expected.mesosLog.readTimeout = TEST_TIME;
     expected.mesosLog.writeTimeout = TEST_TIME;
-    expected.sla.slaRefreshInterval = TEST_TIME;
+    expected.sla.minRequiredInstances = 42;
+    expected.sla.maxParallelCoordinators = 42;
+    expected.sla.maxSlaDuration = TEST_TIME;
+    expected.sla.slaCoordinatorTimeout = TEST_TIME;
     expected.sla.slaProdMetrics = ImmutableList.of(MetricCategory.JOB_UPTIMES);
     expected.sla.slaNonProdMetrics = 
ImmutableList.of(MetricCategory.JOB_UPTIMES);
+    expected.sla.slaRefreshInterval = TEST_TIME;
     expected.webhook.webhookConfigFile = tempFile;
     expected.scheduler.maxRegistrationDelay = TEST_TIME;
     expected.scheduler.maxLeadingDuration = TEST_TIME;
@@ -251,6 +255,7 @@ public class CommandLineTest {
     expected.cron.cronMaxBatchSize = 42;
     expected.resourceSettings.enableRevocableCpus = false;
     expected.resourceSettings.enableRevocableRam = true;
+    expected.maintenance.hostMaintenancePollingInterval = TEST_TIME;
 
     assertAllNonDefaultParameters(expected);
 
@@ -385,7 +390,12 @@ public class CommandLineTest {
         "-cron_scheduling_max_batch_size=42",
         "-enable_revocable_cpus=false",
         "-enable_revocable_ram=true",
-        "-partition_aware=true"
+        "-partition_aware=true",
+        "-sla_coordinator_timeout=42days",
+        "-host_maintenance_polling_interval=42days",
+        "-max_parallel_coordinated_maintenance=42",
+        "-min_required_instances_for_sla_check=42",
+        "-max_sla_duration_secs=42days"
     );
     assertEqualOptions(expected, parsed);
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
 
b/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
index 749ffea..a99fa7a 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
@@ -22,6 +22,8 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.aurora.gen.AppcImage;
 import org.apache.aurora.gen.Constraint;
 import org.apache.aurora.gen.Container;
+import org.apache.aurora.gen.CoordinatorSlaPolicy;
+import org.apache.aurora.gen.CountSlaPolicy;
 import org.apache.aurora.gen.CronCollisionPolicy;
 import org.apache.aurora.gen.DockerParameter;
 import org.apache.aurora.gen.ExecutorConfig;
@@ -33,6 +35,8 @@ import org.apache.aurora.gen.LimitConstraint;
 import org.apache.aurora.gen.MesosContainer;
 import org.apache.aurora.gen.MesosFetcherURI;
 import org.apache.aurora.gen.Mode;
+import org.apache.aurora.gen.PercentageSlaPolicy;
+import org.apache.aurora.gen.SlaPolicy;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.gen.ValueConstraint;
@@ -74,6 +78,9 @@ public class ConfigurationManagerTest {
 
   private static final ImmutableSet<Container._Fields> ALL_CONTAINER_TYPES =
       ImmutableSet.copyOf(Container._Fields.values());
+  private static final int MIN_REQUIRED_INSTANCES = 20;
+  private static final long MAX_SLA_DURATION_SECS = 7200;
+  private static final long DEFAULT_SLA_PERCENTAGE = 95;
 
   private static final JobKey JOB_KEY = new JobKey("owner-role", "devel", 
"email_stats");
   private static final JobConfiguration UNSANITIZED_JOB_CONFIGURATION = new 
JobConfiguration()
@@ -126,6 +133,8 @@ public class ConfigurationManagerTest {
           false,
           true,
           false,
+          MIN_REQUIRED_INSTANCES,
+          MAX_SLA_DURATION_SECS,
           ConfigurationManager.DEFAULT_ALLOWED_JOB_ENVIRONMENTS),
       TaskTestUtil.TIER_MANAGER,
       TaskTestUtil.THRIFT_BACKFILL,
@@ -139,6 +148,8 @@ public class ConfigurationManagerTest {
           true,
           true,
           true,
+          MIN_REQUIRED_INSTANCES,
+          MAX_SLA_DURATION_SECS,
           ConfigurationManager.DEFAULT_ALLOWED_JOB_ENVIRONMENTS),
       TaskTestUtil.TIER_MANAGER,
       TaskTestUtil.THRIFT_BACKFILL,
@@ -160,7 +171,7 @@ public class ConfigurationManagerTest {
     taskConfig.getContainer().getDocker().setImage(null);
 
     expectTaskDescriptionException("A container must specify an image");
-    CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(taskConfig));
+    CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(taskConfig), 
100);
   }
 
   @Test
@@ -169,7 +180,7 @@ public class ConfigurationManagerTest {
     taskConfig.getContainer().getDocker().addToParameters(new 
DockerParameter("foo", "bar"));
 
     expectTaskDescriptionException(ConfigurationManager.NO_DOCKER_PARAMETERS);
-    CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(taskConfig));
+    CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(taskConfig), 
100);
   }
 
   @Test
@@ -179,7 +190,7 @@ public class ConfigurationManagerTest {
     builder.unsetExecutorConfig();
 
     
expectTaskDescriptionException(ConfigurationManager.EXECUTOR_REQUIRED_WITH_DOCKER);
-    CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder));
+    CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 1);
   }
 
   @Test
@@ -187,7 +198,7 @@ public class ConfigurationManagerTest {
     TaskConfig builder = CONFIG_WITH_CONTAINER.newBuilder();
     builder.unsetExecutorConfig();
 
-    
DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder));
+    
DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 
100);
   }
 
   @Test
@@ -196,7 +207,7 @@ public class ConfigurationManagerTest {
         .setTier("pr/d"));
 
     expectTaskDescriptionException("Tier contains illegal characters");
-    CONFIGURATION_MANAGER.validateAndPopulate(config);
+    CONFIGURATION_MANAGER.validateAndPopulate(config, 100);
   }
 
   @Test
@@ -205,7 +216,7 @@ public class ConfigurationManagerTest {
     builder.getContainer().getDocker().setParameters(ImmutableList.of());
 
     ITaskConfig result =
-        
DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder));
+        
DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 
100);
 
     // The resulting task config should contain parameters supplied to the 
ConfigurationManager.
     List<IDockerParameter> params = 
result.getContainer().getDocker().getParameters();
@@ -221,7 +232,7 @@ public class ConfigurationManagerTest {
     taskConfig.getContainer().getDocker().addToParameters(userParameter);
 
     ITaskConfig result = DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(
-        ITaskConfig.build(taskConfig));
+        ITaskConfig.build(taskConfig), 100);
 
     // The resulting task config should contain parameters supplied from user 
config.
     List<IDockerParameter> params = 
result.getContainer().getDocker().getParameters();
@@ -236,7 +247,7 @@ public class ConfigurationManagerTest {
         .setConstraint(TaskConstraint.value(
             new ValueConstraint(false, ImmutableSet.of(JOB_KEY.getRole() + 
"/f"))))));
 
-    CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder));
+    CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 100);
   }
 
   @Test
@@ -246,7 +257,7 @@ public class ConfigurationManagerTest {
         .setName(DEDICATED_ATTRIBUTE)
         .setConstraint(TaskConstraint.value(new ValueConstraint(false, 
ImmutableSet.of("*/f"))))));
 
-    CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder));
+    CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 100);
   }
 
   @Test
@@ -257,7 +268,7 @@ public class ConfigurationManagerTest {
         .setConstraint(TaskConstraint.value(new ValueConstraint(false, 
ImmutableSet.of("r/f"))))));
 
     expectTaskDescriptionException("Only r may use hosts dedicated for that 
role.");
-    CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder));
+    CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 100);
   }
 
   @Test
@@ -267,7 +278,7 @@ public class ConfigurationManagerTest {
     builder.addToResources(ramMb(72));
 
     expectTaskDescriptionException("Multiple resource values are not supported 
for CPU, RAM");
-    
DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder));
+    
DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 
100);
   }
 
   @Test
@@ -275,7 +286,7 @@ public class ConfigurationManagerTest {
     TaskConfig builder = CONFIG_WITH_CONTAINER.newBuilder();
     builder.addToResources(namedPort("thrift"));
 
-    
DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder));
+    
DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 
100);
   }
 
   @Test
@@ -293,10 +304,12 @@ public class ConfigurationManagerTest {
             false,
             false,
             false,
+            MIN_REQUIRED_INSTANCES,
+            MAX_SLA_DURATION_SECS,
             ConfigurationManager.DEFAULT_ALLOWED_JOB_ENVIRONMENTS),
         TaskTestUtil.TIER_MANAGER,
         TaskTestUtil.THRIFT_BACKFILL,
-        
TestExecutorSettings.THERMOS_EXECUTOR).validateAndPopulate(ITaskConfig.build(builder));
+        
TestExecutorSettings.THERMOS_EXECUTOR).validateAndPopulate(ITaskConfig.build(builder),
 100);
   }
 
   @Test
@@ -312,15 +325,18 @@ public class ConfigurationManagerTest {
             new ConfigurationManagerSettings(
                     ALL_CONTAINER_TYPES,
                     true,
-                ImmutableList.of(new DockerParameter("foo", "bar")),
+                    ImmutableList.of(new DockerParameter("foo", "bar")),
                     false,
                     false,
                     false,
                     false,
+                    MIN_REQUIRED_INSTANCES,
+                    MAX_SLA_DURATION_SECS,
                     ".+"),
             TaskTestUtil.TIER_MANAGER,
             TaskTestUtil.THRIFT_BACKFILL,
-            
TestExecutorSettings.THERMOS_EXECUTOR).validateAndPopulate(ITaskConfig.build(builder));
+            TestExecutorSettings.THERMOS_EXECUTOR)
+        .validateAndPopulate(ITaskConfig.build(builder), 100);
   }
 
   @Test
@@ -333,7 +349,7 @@ public class ConfigurationManagerTest {
 
     expectTaskDescriptionException(NO_CONTAINER_VOLUMES);
 
-    CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder));
+    CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 100);
   }
 
   @Test
@@ -343,7 +359,7 @@ public class ConfigurationManagerTest {
     builder.unsetTaskLinks();
 
     ITaskConfig populated =
-        
DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder));
+        
DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 
100);
     assertEquals(ImmutableSet.of("health", "http"), 
populated.getTaskLinks().keySet());
   }
 
@@ -361,6 +377,8 @@ public class ConfigurationManagerTest {
           true,
           true,
           true,
+          MIN_REQUIRED_INSTANCES,
+          MAX_SLA_DURATION_SECS,
           "b.r"),
       TaskTestUtil.TIER_MANAGER,
       TaskTestUtil.THRIFT_BACKFILL,
@@ -368,6 +386,166 @@ public class ConfigurationManagerTest {
             .validateAndPopulate(IJobConfiguration.build(jobConfiguration));
   }
 
+  @Test
+  public void testCountSlaPolicyUnsupportedTier() throws Exception {
+    ITaskConfig config = ITaskConfig.build(
+        CONFIG_WITH_CONTAINER
+            .newBuilder()
+            .setTier(TaskTestUtil.DEV_TIER_NAME)
+            .setSlaPolicy(SlaPolicy.countSlaPolicy(
+                new CountSlaPolicy()
+                    .setCount(MIN_REQUIRED_INSTANCES))));
+
+    expectTaskDescriptionException("Tier 'tier-dev' does not support 
SlaPolicy.");
+    DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(
+        config,
+        MIN_REQUIRED_INSTANCES);
+  }
+
+  @Test
+  public void testCountSlaPolicyTooFewInstances() throws Exception {
+    ITaskConfig config = ITaskConfig.build(
+        CONFIG_WITH_CONTAINER
+            .newBuilder()
+            .setSlaPolicy(SlaPolicy.countSlaPolicy(
+                new CountSlaPolicy()
+                    .setCount(MIN_REQUIRED_INSTANCES))));
+
+    expectTaskDescriptionException(
+        "Job with fewer than 20 instances cannot have Percentage/Count 
SlaPolicy.");
+    DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(
+        config,
+        MIN_REQUIRED_INSTANCES - 1);
+  }
+
+  @Test
+  public void testCountSlaPolicyCountTooHigh() throws Exception {
+    ITaskConfig config = ITaskConfig.build(
+        CONFIG_WITH_CONTAINER
+            .newBuilder()
+            .setSlaPolicy(SlaPolicy.countSlaPolicy(
+                new CountSlaPolicy()
+                  .setCount(MIN_REQUIRED_INSTANCES))));
+
+    expectTaskDescriptionException(
+        "Current CountSlaPolicy: count=20 will not allow any instances to be 
killed. "
+            + "Must be less than instanceCount=20.");
+    DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(
+        config,
+        MIN_REQUIRED_INSTANCES);
+  }
+
+  @Test
+  public void testCountSlaPolicyDurationSecsTooHigh() throws Exception {
+    ITaskConfig config = ITaskConfig.build(
+        CONFIG_WITH_CONTAINER
+            .newBuilder()
+            .setSlaPolicy(SlaPolicy.countSlaPolicy(
+                new CountSlaPolicy()
+                    .setDurationSecs(MAX_SLA_DURATION_SECS + 1))));
+
+    expectTaskDescriptionException("CountSlaPolicy: durationSecs=7201 must be 
less than "
+        + "cluster-wide maximum of 7200 secs.");
+    DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(
+        config,
+        MIN_REQUIRED_INSTANCES);
+  }
+
+  @Test
+  public void testPercentageSlaPolicyUnsupportedTier() throws Exception {
+    ITaskConfig config = ITaskConfig.build(
+        CONFIG_WITH_CONTAINER
+            .newBuilder()
+            .setTier(TaskTestUtil.DEV_TIER_NAME)
+            .setSlaPolicy(SlaPolicy.percentageSlaPolicy(
+                new PercentageSlaPolicy()
+                    .setPercentage(DEFAULT_SLA_PERCENTAGE))));
+
+    expectTaskDescriptionException("Tier 'tier-dev' does not support 
SlaPolicy.");
+    DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(
+        config,
+        MIN_REQUIRED_INSTANCES);
+  }
+
+  @Test
+  public void testPercentageSlaPolicyTooFewInstances() throws Exception {
+    ITaskConfig config = ITaskConfig.build(
+        CONFIG_WITH_CONTAINER
+            .newBuilder()
+            .setSlaPolicy(SlaPolicy.percentageSlaPolicy(
+                new PercentageSlaPolicy()
+                    .setPercentage(DEFAULT_SLA_PERCENTAGE))));
+
+    expectTaskDescriptionException(
+        "Job with fewer than 20 instances cannot have Percentage/Count 
SlaPolicy.");
+    DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(
+        config,
+        MIN_REQUIRED_INSTANCES - 1);
+  }
+
+  @Test
+  public void testPercentageSlaPolicyPercentageTooHigh() throws Exception {
+    ITaskConfig config = ITaskConfig.build(
+        CONFIG_WITH_CONTAINER
+            .newBuilder()
+            .setSlaPolicy(SlaPolicy.percentageSlaPolicy(
+                new PercentageSlaPolicy()
+                    .setPercentage(100))));
+
+    expectTaskDescriptionException(
+        "Current PercentageSlaPolicy: percentage=100.000000 will not allow any 
instances "
+            + "to be killed. Must be less than 95.000000.");
+    DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(
+        config,
+        MIN_REQUIRED_INSTANCES);
+  }
+
+  @Test
+  public void testPercentageSlaPolicyDurationSecsTooHigh() throws Exception {
+    ITaskConfig config = ITaskConfig.build(
+        CONFIG_WITH_CONTAINER
+            .newBuilder()
+            .setSlaPolicy(SlaPolicy.percentageSlaPolicy(
+                new PercentageSlaPolicy()
+                    .setDurationSecs(MAX_SLA_DURATION_SECS + 1))));
+
+    expectTaskDescriptionException("PercentageSlaPolicy: durationSecs=7201 
must be less than "
+        + "cluster-wide maximum of 7200 secs.");
+    DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(
+        config,
+        MIN_REQUIRED_INSTANCES);
+  }
+
+  @Test
+  public void testCoordinatorSlaPolicyUnsupportedTier() throws Exception {
+    ITaskConfig config = ITaskConfig.build(
+        CONFIG_WITH_CONTAINER
+            .newBuilder()
+            .setTier(TaskTestUtil.DEV_TIER_NAME)
+            .setSlaPolicy(SlaPolicy.coordinatorSlaPolicy(
+                new CoordinatorSlaPolicy()
+                    .setCoordinatorUrl("http://localhost";))));
+
+    expectTaskDescriptionException("Tier 'tier-dev' does not support 
SlaPolicy.");
+    DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(
+        config,
+        MIN_REQUIRED_INSTANCES);
+  }
+
+  @Test
+  public void testCoordinatorSlaPolicyTooFewInstances() throws Exception {
+    ITaskConfig config = ITaskConfig.build(
+        CONFIG_WITH_CONTAINER
+            .newBuilder()
+            .setSlaPolicy(SlaPolicy.coordinatorSlaPolicy(
+                new CoordinatorSlaPolicy()
+                    .setCoordinatorUrl("http://localhost";))));
+
+    DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(
+        config,
+        MIN_REQUIRED_INSTANCES - 1);
+  }
+
   private void expectTaskDescriptionException(String message) {
     expectedException.expect(TaskDescriptionException.class);
     expectedException.expectMessage(message);

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java 
b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
index 0fabb33..c9ad0cf 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
@@ -66,7 +66,7 @@ public class CronIT extends EasyMockTest {
       new JobConfiguration()
           .setCronSchedule(CRONTAB_ENTRY.toString())
           .setKey(JOB_KEY.newBuilder())
-          .setInstanceCount(2)
+          .setInstanceCount(20)
           .setOwner(IDENTITY)
           .setCronCollisionPolicy(KILL_EXISTING)
           .setTaskConfig(makeTaskConfig()));

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java 
b/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java
index b7dcf3a..e81c3fa 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java
@@ -37,7 +37,7 @@ final class QuartzTestUtil {
   static final IJobConfiguration JOB = IJobConfiguration.build(
       new JobConfiguration()
           .setCronSchedule("* * * * SUN")
-          .setInstanceCount(10)
+          .setInstanceCount(20)
           .setOwner(new Identity().setUser("user"))
           .setKey(AURORA_JOB_KEY.newBuilder())
           .setTaskConfig(TaskTestUtil.makeConfig(AURORA_JOB_KEY)

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java
 
b/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java
new file mode 100644
index 0000000..28c62a1
--- /dev/null
+++ 
b/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java
@@ -0,0 +1,422 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.maintenance;
+
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.TypeLiteral;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.CountSlaPolicy;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.HostMaintenanceRequest;
+import org.apache.aurora.gen.HostStatus;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.PercentageSlaPolicy;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.SlaPolicy;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.config.types.TimeAmount;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.sla.SlaManager;
+import org.apache.aurora.scheduler.state.PubsubTestUtil;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IHostMaintenanceRequest;
+import org.apache.aurora.scheduler.storage.entities.IHostStatus;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ISlaPolicy;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.mesos.v1.Protos;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
+import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
+import static org.apache.aurora.gen.MaintenanceMode.NONE;
+import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static 
org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class MaintenanceControllerImplTest extends EasyMockTest {
+
+  private static final String HOST_A = "a";
+  private static final Set<String> A = ImmutableSet.of(HOST_A);
+  private static final Protos.OfferID OFFER_ID = Protos.OfferID.newBuilder()
+      .setValue("offer-id")
+      .build();
+  private static final Protos.AgentID AGENT_ID = Protos.AgentID.newBuilder()
+      .setValue("agent-id")
+      .build();
+  private static final Protos.FrameworkID FRAMEWORK_ID = 
Protos.FrameworkID.newBuilder()
+      .setValue("framework-id")
+      .build();
+  private static final Protos.URL AGENT_URL = Protos.URL.newBuilder()
+      .setAddress(Protos.Address.newBuilder()
+          .setHostname(HOST_A)
+          .setPort(5051))
+      .setScheme("http")
+      .build();
+  private static final Protos.Unavailability UNAVAILABILITY = 
Protos.Unavailability.newBuilder()
+      .setStart(Protos.TimeInfo.newBuilder()
+          .setNanoseconds(Amount.of(1L, Time.MINUTES).as(Time.NANOSECONDS)))
+      .build();
+  private static final SlaPolicy SLA_POLICY = SlaPolicy.percentageSlaPolicy(
+      new PercentageSlaPolicy(95, 1800));
+
+  private static final Protos.InverseOffer INVERSE_OFFER = 
Protos.InverseOffer.newBuilder()
+      .setId(OFFER_ID)
+      .setAgentId(AGENT_ID)
+      .setUrl(AGENT_URL)
+      .setFrameworkId(FRAMEWORK_ID)
+      .setUnavailability(UNAVAILABILITY)
+      .build();
+
+  private static final SlaPolicy COUNT_SLA_POLICY = SlaPolicy.countSlaPolicy(
+      new CountSlaPolicy()
+          .setCount(2)
+          .setDurationSecs(1800)
+  );
+
+  private StorageTestUtil storageUtil;
+  private StateManager stateManager;
+  private SlaManager slaManager;
+  private MaintenanceController.MaintenanceControllerImpl maintenance;
+  private EventSink eventSink;
+
+  @Before
+  public void setUp() throws Exception {
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    stateManager = createMock(StateManager.class);
+    slaManager = createMock(SlaManager.class);
+    TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class);
+    expectBatchExecute(batchWorker, storageUtil.storage, control).anyTimes();
+
+    Injector injector = Guice.createInjector(
+        new PubsubEventModule(),
+        new MaintenanceModule(new MaintenanceModule.Options()),
+        new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(Storage.class).toInstance(storageUtil.storage);
+            bind(StateManager.class).toInstance(stateManager);
+            bind(SlaManager.class).toInstance(slaManager);
+            bind(StatsProvider.class).toInstance(new FakeStatsProvider());
+            bind(Executor.class).annotatedWith(AsyncExecutor.class)
+                .toInstance(MoreExecutors.directExecutor());
+            bind(TaskEventBatchWorker.class).toInstance(batchWorker);
+            bind(new TypeLiteral<Amount<Long, Time>>() { })
+                .annotatedWith(
+                    
MaintenanceController.MaintenanceControllerImpl.PollingInterval.class)
+                .toInstance(new TimeAmount(1, Time.MINUTES));
+          }
+        });
+    maintenance = 
injector.getInstance(MaintenanceController.MaintenanceControllerImpl.class);
+    eventSink = PubsubTestUtil.startPubsub(injector);
+  }
+
+  private static IScheduledTask makeTask(String host, String taskId) {
+    ScheduledTask builder = TaskTestUtil.addStateTransition(
+        TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB),
+        RUNNING,
+        1000).newBuilder();
+    builder.getAssignedTask().setSlaveHost(host);
+    return IScheduledTask.build(builder);
+  }
+
+  @Test
+  public void testMaintenanceCycle() {
+    IScheduledTask task1 = makeTask(HOST_A, "taskA");
+    IScheduledTask task2 = makeTask(HOST_A, "taskB");
+
+    expectMaintenanceModeChange(HOST_A, SCHEDULED);
+    expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1, task2));
+    expectTaskDraining(task1);
+    expectTaskDraining(task2);
+    expectMaintenanceModeChange(HOST_A, DRAINING);
+    IHostAttributes attributes =
+        IHostAttributes.build(new 
HostAttributes().setHost(HOST_A).setMode(DRAINING));
+    IHostMaintenanceRequest maintenanceRequest =
+        IHostMaintenanceRequest.build(new HostMaintenanceRequest()
+            .setHost(HOST_A)
+            .setCreatedTimestampMs(System.currentTimeMillis())
+            .setDefaultSlaPolicy(SLA_POLICY));
+
+    storageUtil.hostMaintenanceStore.saveHostMaintenanceRequest(
+        anyObject(IHostMaintenanceRequest.class));
+    expect(storageUtil.attributeStore.getHostAttributes(HOST_A))
+        .andReturn(Optional.of(attributes)).times(2);
+    expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A))
+        .andReturn(Optional.of(maintenanceRequest)).times(2);
+
+    
expect(storageUtil.attributeStore.getHostAttributes()).andReturn(ImmutableSet.of(attributes));
+    expectFetchTasksByHost(HOST_A, ImmutableSet.of(task2));
+    // TaskA is KILLED and therefore no longer active
+    expectFetchTasksByHost(HOST_A, ImmutableSet.of());
+    expectMaintenanceModeChange(HOST_A, DRAINED);
+    expectMaintenanceModeChange(HOST_A, NONE);
+    storageUtil.hostMaintenanceStore.removeHostMaintenanceRequest(HOST_A);
+
+    control.replay();
+
+    assertStatus(HOST_A, SCHEDULED, maintenance.startMaintenance(A));
+    assertStatus(HOST_A, DRAINING, maintenance.drain(A));
+    assertStatus(HOST_A, DRAINING, maintenance.getStatus(A));
+    eventSink.post(
+        TaskStateChange.transition(
+            IScheduledTask.build(task1.newBuilder().setStatus(KILLED)), 
RUNNING));
+    eventSink.post(
+        TaskStateChange.transition(
+            IScheduledTask.build(task2.newBuilder().setStatus(KILLED)), 
RUNNING));
+    assertStatus(HOST_A, NONE, maintenance.endMaintenance(A));
+  }
+
+  @Test
+  public void testUnknownHost() {
+    expect(storageUtil.attributeStore.getHostAttributes("b"))
+        .andReturn(Optional.empty());
+
+    control.replay();
+
+    assertEquals(ImmutableSet.of(),
+        maintenance.startMaintenance(ImmutableSet.of("b")));
+  }
+
+  @Test
+  public void testDrainEmptyHost() {
+    storageUtil.hostMaintenanceStore.saveHostMaintenanceRequest(
+        anyObject(IHostMaintenanceRequest.class));
+    expectMaintenanceModeChange(HOST_A, SCHEDULED);
+    expectFetchTasksByHost(HOST_A, ImmutableSet.of());
+    expectMaintenanceModeChange(HOST_A, DRAINED);
+
+    control.replay();
+
+    assertStatus(HOST_A, SCHEDULED, maintenance.startMaintenance(A));
+    assertStatus(HOST_A, DRAINED, maintenance.drain(A));
+  }
+
+  @Test
+  public void testEndEarly() {
+    expectMaintenanceModeChange(HOST_A, SCHEDULED);
+    expectMaintenanceModeChange(HOST_A, NONE);
+    
expect(storageUtil.attributeStore.getHostAttributes(HOST_A)).andReturn(Optional.of(
+        IHostAttributes.build(new 
HostAttributes().setHost(HOST_A).setMode(NONE))));
+    storageUtil.hostMaintenanceStore.removeHostMaintenanceRequest(HOST_A);
+
+    control.replay();
+
+    assertStatus(HOST_A, SCHEDULED, maintenance.startMaintenance(A));
+
+    // End maintenance without DRAINING.
+    assertStatus(HOST_A, NONE, maintenance.endMaintenance(A));
+
+    // Make sure a later transition on the host does not cause any ill effects 
that could surface
+    // from stale internal state.
+    eventSink.post(TaskStateChange.transition(
+        IScheduledTask.build(makeTask(HOST_A, 
"taskA").newBuilder().setStatus(KILLED)), RUNNING));
+  }
+
+  @Test
+  public void testSlaDrain() {
+    storageUtil.hostMaintenanceStore.saveHostMaintenanceRequest(
+        anyObject(IHostMaintenanceRequest.class));
+    expectMaintenanceModeChange(HOST_A, DRAINING);
+
+    control.replay();
+
+    assertStatus(
+        HOST_A,
+        DRAINING,
+        maintenance.slaDrain(ImmutableSet.of(HOST_A), COUNT_SLA_POLICY, 1800));
+  }
+
+  @Test
+  public void testSlaDrainUnknownHost() {
+    storageUtil.hostMaintenanceStore.saveHostMaintenanceRequest(
+        anyObject(IHostMaintenanceRequest.class));
+    expect(storageUtil.attributeStore.getHostAttributes("unknown"))
+        .andReturn(Optional.empty());
+
+    control.replay();
+
+    assertEquals(ImmutableSet.of(),
+        maintenance.slaDrain(ImmutableSet.of("unknown"), COUNT_SLA_POLICY, 
1800));
+  }
+
+  @Test
+  public void testIteration() {
+    IScheduledTask task1 = makeTask(HOST_A, "taskA");
+    IScheduledTask task2 = makeTask(HOST_A, "taskB");
+
+    IHostAttributes attributes =
+        IHostAttributes.build(new 
HostAttributes().setHost(HOST_A).setMode(DRAINING));
+    expect(storageUtil.attributeStore.getHostAttributes())
+        .andReturn(ImmutableSet.of(attributes));
+    expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1, task2));
+    IHostMaintenanceRequest maintenanceRequest =
+        IHostMaintenanceRequest.build(new HostMaintenanceRequest()
+            .setHost(HOST_A)
+            .setCreatedTimestampMs(System.currentTimeMillis())
+            .setDefaultSlaPolicy(SLA_POLICY));
+    expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A))
+        .andReturn(Optional.of(maintenanceRequest)).times(2);
+    expectTaskDraining(task1);
+    expectTaskDraining(task2);
+
+    control.replay();
+
+    maintenance.runForTest();
+  }
+
+  @Test
+  public void testIterationEmptyHost() {
+    IHostAttributes attributes =
+        IHostAttributes.build(new 
HostAttributes().setHost(HOST_A).setMode(DRAINING));
+    expect(storageUtil.attributeStore.getHostAttributes())
+        .andReturn(ImmutableSet.of(attributes));
+    expectFetchTasksByHost(HOST_A, ImmutableSet.of());
+    expectMaintenanceModeChange(HOST_A, DRAINED);
+
+    control.replay();
+
+    maintenance.runForTest();
+  }
+
+  @Test
+  public void testIterationMaintenanceTimeout() {
+    IScheduledTask task1 = makeTask(HOST_A, "taskA");
+    IScheduledTask task2 = makeTask(HOST_A, "taskB");
+
+    IHostAttributes attributes =
+        IHostAttributes.build(new 
HostAttributes().setHost(HOST_A).setMode(DRAINING));
+    expect(storageUtil.attributeStore.getHostAttributes())
+        .andReturn(ImmutableSet.of(attributes));
+    expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1, task2));
+    IHostMaintenanceRequest maintenanceRequest =
+        IHostMaintenanceRequest.build(new HostMaintenanceRequest()
+            .setHost(HOST_A)
+            .setCreatedTimestampMs(0)
+            .setDefaultSlaPolicy(SLA_POLICY));
+    expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A))
+        .andReturn(Optional.of(maintenanceRequest)).times(2);
+    expectTaskDraining(task1, true);
+    expectTaskDraining(task2, true);
+
+    control.replay();
+
+    maintenance.runForTest();
+  }
+
+  @Test
+  public void testIterationNoMaintenanceRequest() {
+    IScheduledTask task1 = makeTask(HOST_A, "taskA");
+    IScheduledTask task2 = makeTask(HOST_A, "taskB");
+
+    IHostAttributes attributes =
+        IHostAttributes.build(new 
HostAttributes().setHost(HOST_A).setMode(DRAINING));
+    expect(storageUtil.attributeStore.getHostAttributes())
+        .andReturn(ImmutableSet.of(attributes));
+    expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1, task2));
+    expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A))
+        .andReturn(Optional.empty()).times(2);
+
+    control.replay();
+
+    maintenance.runForTest();
+  }
+
+  @Test
+  public void testGetMode() {
+    
expect(storageUtil.attributeStore.getHostAttributes(HOST_A)).andReturn(Optional.of(
+        IHostAttributes.build(new 
HostAttributes().setHost(HOST_A).setMode(DRAINING))));
+    
expect(storageUtil.attributeStore.getHostAttributes("unknown")).andReturn(Optional.empty());
+
+    control.replay();
+
+    assertEquals(DRAINING, maintenance.getMode(HOST_A));
+    assertEquals(NONE, maintenance.getMode("unknown"));
+  }
+
+  @Test
+  public void testInverseOfferDrain() {
+    IScheduledTask task1 = makeTask(HOST_A, "taskA");
+    storageUtil.hostMaintenanceStore.saveHostMaintenanceRequest(
+        anyObject(IHostMaintenanceRequest.class));
+    expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1));
+    expectTaskDraining(task1);
+    IHostMaintenanceRequest maintenanceRequest =
+        IHostMaintenanceRequest.build(new HostMaintenanceRequest()
+            .setHost(HOST_A)
+            .setCreatedTimestampMs(System.currentTimeMillis())
+            .setDefaultSlaPolicy(SLA_POLICY));
+    expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A))
+        .andReturn(Optional.of(maintenanceRequest)).times(1);
+
+    control.replay();
+    maintenance.drainForInverseOffer(INVERSE_OFFER);
+  }
+
+  private void expectTaskDraining(IScheduledTask task) {
+    expectTaskDraining(task, false);
+  }
+
+  private void expectTaskDraining(IScheduledTask task, boolean force) {
+    slaManager.checkSlaThenAct(
+        eq(task),
+        eq(ISlaPolicy.build(SLA_POLICY)),
+        anyObject(Storage.MutateWork.class),
+        eq(force));
+  }
+
+  private void expectFetchTasksByHost(String hostName, Set<IScheduledTask> 
tasks) {
+    
expect(storageUtil.taskStore.fetchTasks(Query.slaveScoped(hostName).active())).andReturn(tasks);
+  }
+
+  private void expectMaintenanceModeChange(String hostName, MaintenanceMode 
mode) {
+    IHostAttributes attributes = IHostAttributes.build(new 
HostAttributes().setHost(hostName));
+
+    expect(storageUtil.attributeStore.getHostAttributes(hostName))
+        .andReturn(Optional.of(attributes));
+    IHostAttributes updated = 
IHostAttributes.build(attributes.newBuilder().setMode(mode));
+    
expect(storageUtil.attributeStore.saveHostAttributes(updated)).andReturn(true);
+  }
+
+  private void assertStatus(String host, MaintenanceMode mode, 
Set<IHostStatus> statuses) {
+    assertEquals(ImmutableSet.of(IHostStatus.build(new HostStatus(host, 
mode))), statuses);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java 
b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
index c6163bb..8a8c695 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
@@ -36,9 +36,9 @@ import org.apache.aurora.scheduler.base.Conversions;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.maintenance.MaintenanceController;
 import org.apache.aurora.scheduler.offers.HostOffer;
 import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;

Reply via email to