Enable `Tasks` to specify their own custom maintenance SLA.

`Tasks` can specify custom SLA requirements as part of
their `TaskConfig`. One of the new features is the ability
to specify an external coordinator that can ACK/NACK
maintenance requests for tasks. This will be hugely
beneficial for onboarding services that cannot satisfactorily
specify SLA in terms of running instances.

Maintenance requests are driven from the Scheduler to
improve management of nodes in the cluster.

Testing Done:
./build-support/jenkins/build.sh
./src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh

Bugs closed: AURORA-1978

Reviewed at https://reviews.apache.org/r/66716/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/f2acf53f
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/f2acf53f
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/f2acf53f

Branch: refs/heads/master
Commit: f2acf53ff0d0cbb7b0c62f021cc55c9e45b32f9c
Parents: 34be631
Author: Santhosh Kumar Shanmugham <santhoshkuma...@gmail.com>
Authored: Tue Jun 5 16:15:52 2018 -0700
Committer: Santhosh Kumar <sshanmug...@twitter.com>
Committed: Tue Jun 5 16:15:52 2018 -0700

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |   32 +
 .../thrift/org/apache/aurora/gen/api.thrift     |    6 +
 docs/README.md                                  |    1 +
 docs/features/sla-requirements.md               |  181 +++
 docs/operations/configuration.md                |   55 +-
 docs/reference/configuration.md                 |   34 +-
 docs/reference/scheduler-configuration.md       |   10 +
 .../vagrant/systemd/aurora-scheduler.service    |    6 +-
 .../apache/aurora/scheduler/app/AppModule.java  |    5 +
 .../aurora/scheduler/config/CliOptions.java     |    2 +
 .../configuration/ConfigurationManager.java     |  105 +-
 .../maintenance/MaintenanceController.java      |  470 +++++++
 .../maintenance/MaintenanceModule.java          |   78 ++
 .../scheduler/mesos/MesosCallbackHandler.java   |    2 +-
 .../apache/aurora/scheduler/sla/SlaManager.java |  437 ++++++
 .../apache/aurora/scheduler/sla/SlaModule.java  |   57 +-
 .../scheduler/state/MaintenanceController.java  |  294 -----
 .../aurora/scheduler/state/StateModule.java     |   12 -
 .../scheduler/thrift/ReadOnlySchedulerImpl.java |   10 +-
 .../thrift/SchedulerThriftInterface.java        |   24 +-
 .../python/apache/aurora/admin/admin_util.py    |   38 +
 .../apache/aurora/admin/host_maintenance.py     |   20 +
 .../python/apache/aurora/admin/maintenance.py   |   45 +
 .../python/apache/aurora/client/api/__init__.py |   12 +
 .../python/apache/aurora/client/cli/context.py  |   16 +
 .../python/apache/aurora/client/cli/jobs.py     |    7 +
 .../aurora/scheduler/app/SchedulerIT.java       |    9 +
 .../aurora/scheduler/base/TaskTestUtil.java     |    4 +-
 .../scheduler/config/CommandLineTest.java       |   14 +-
 .../configuration/ConfigurationManagerTest.java |  212 ++-
 .../aurora/scheduler/cron/quartz/CronIT.java    |    2 +-
 .../scheduler/cron/quartz/QuartzTestUtil.java   |    2 +-
 .../MaintenanceControllerImplTest.java          |  422 ++++++
 .../mesos/MesosCallbackHandlerTest.java         |    2 +-
 .../aurora/scheduler/sla/SlaManagerTest.java    | 1239 ++++++++++++++++++
 .../aurora/scheduler/sla/SlaModuleTest.java     |   15 +-
 .../state/MaintenanceControllerImplTest.java    |  264 ----
 .../thrift/SchedulerThriftInterfaceTest.java    |   58 +-
 .../aurora/scheduler/thrift/ThriftIT.java       |   12 +
 .../thrift/aop/MockDecoratedThrift.java         |    7 +
 .../apache/aurora/admin/test_maintenance.py     |   81 +-
 src/test/python/apache/aurora/api_util.py       |    3 +
 .../aurora/client/api/test_scheduler_client.py  |    9 +
 .../python/apache/aurora/client/cli/test_add.py |   22 +-
 .../apache/aurora/client/cli/test_kill.py       |   52 +-
 .../sh/org/apache/aurora/e2e/http_example.py    |    2 +-
 .../apache/aurora/e2e/partition_aware.aurora    |   14 +
 .../sh/org/apache/aurora/e2e/sla_coordinator.py |   60 +
 .../sh/org/apache/aurora/e2e/sla_policy.aurora  |   61 +
 .../sh/org/apache/aurora/e2e/test_end_to_end.sh |  135 +-
 ui/src/main/js/components/TaskConfigSummary.js  |   42 +
 51 files changed, 4058 insertions(+), 644 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 5e1f994..0ef75d6 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -1,3 +1,35 @@
+0.21.0
+======
+
+### New/updated:
+- Introduce ability for tasks to specify custom SLA requirements via the new 
`SlaPolicy` structs.
+  There are 3 different SLA Policies that are currently supported - 
`CountSlaPolicy`,
+  `PercentageSlaPolicy` and `CoordinatorSlaPolicy`. SLA policies based on 
count and percentage
+  express the required number of `RUNNING` instances as either a count or 
percentage in addition to
+  allowing the duration-window for which these requirements have to be 
satisfied. For applications
+  that need more control over how SLA is determined, a custom SLA calculator 
can be configured a.k.a
+  Coordinator. Any action that can affect SLA, will first check with the 
Coordinator before
+  proceeding.
+
+    **IMPORTANT: The storage changes required for this feature will make 
scheduler
+    snapshot backwards incompatible. Scheduler will be unable to read snapshot 
if rolled back to
+    previous version. If rollback is absolutely necessary, perform the 
following steps:**
+    1. Stop all host maintenance requests via `aurora_admin host_activate`.
+    2. Ensure a new snapshot is created by running `aurora_admin 
scheduler_snapshot <cluster>`
+    3. Rollback to previous version
+
+  Note: The `Coordinator` interface required for the `CoordinatorSlaPolicy` is 
experimental at
+  this juncture and is bound to change in the future.
+
+### Deprecations and removals:
+
+- Deprecated the `aurora_admin host_drain` command used for maintenance. With 
this release the SLA
+  computations are moved to the scheduler and it is no longer required for the 
client to compute
+  SLAs and watch the drains. The scheduler persists any host maintenance 
request and performs
+  SLA-aware drain of the tasks, before marking the host as `DRAINED`. So 
maintenance requests
+  survive across scheduler fail-overs. Use the newly introduced `aurora_admin 
sla_host_drain`
+  to skip the SLA computations on the admin client.
+
 0.20.0
 ======
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/api/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift 
b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
index ff48000..54007c8 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -1244,6 +1244,12 @@ service AuroraAdmin extends AuroraSchedulerManager {
   /** Set the given hosts back into serving mode. */
   Response endMaintenance(1: Hosts hosts)
 
+  /**
+   * Ask scheduler to put hosts into DRAINING mode and move scheduled tasks 
off of the hosts
+   * such that its SLA requirements are satisfied. Use defaultSlaPolicy if it 
is not set for a task.
+   **/
+  Response slaDrainHosts(1: Hosts hosts, 2: SlaPolicy defaultSlaPolicy, 3: i64 
timeoutSecs)
+
   /** Start a storage snapshot and block until it completes. */
   Response snapshot()
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/docs/README.md
----------------------------------------------------------------------
diff --git a/docs/README.md b/docs/README.md
index 166bf1c..8bf936d 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -28,6 +28,7 @@ Description of important Aurora features.
  * [Services](features/services.md)
  * [Service Discovery](features/service-discovery.md)
  * [SLA Metrics](features/sla-metrics.md)
+ * [SLA Requirements](features/sla-requirements.md)
  * [Webhooks](features/webhooks.md)
 
 ## Operators

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/docs/features/sla-requirements.md
----------------------------------------------------------------------
diff --git a/docs/features/sla-requirements.md 
b/docs/features/sla-requirements.md
new file mode 100644
index 0000000..555b174
--- /dev/null
+++ b/docs/features/sla-requirements.md
@@ -0,0 +1,181 @@
+SLA Requirements
+================
+
+- [Overview](#overview)
+- [Default SLA](#default-sla)
+- [Custom SLA](#custom-sla)
+  - [Count-based](#count-based)
+  - [Percentage-based](#percentage-based)
+  - [Coordinator-based](#coordinator-based)
+
+## Overview
+
+Aurora guarantees SLA requirements for jobs. These requirements limit the 
impact of cluster-wide
+maintenance operations on the jobs. For instance, when an operator upgrades
+the OS on all the Mesos agent machines, the tasks scheduled on them needs to 
be drained.
+By specifying the SLA requirements a job can make sure that it has enough 
instances to
+continue operating safely without incurring downtime.
+
+> SLA is defined as minimum number of active tasks required for a job every 
duration window.
+A task is active if it was in `RUNNING` state during the last duration window.
+
+There is a [default](#default-sla) SLA guarantee for
+[preferred](../features/multitenancy.md#configuration-tiers) tier jobs and it 
is also possible to
+specify [custom](#custom-sla) SLA requirements.
+
+## Default SLA
+
+Aurora guarantees a default SLA requirement for tasks in
+[preferred](../features/multitenancy.md#configuration-tiers) tier.
+
+> 95% of tasks in a job will be `active` for every 30 mins.
+
+
+## Custom SLA
+
+For jobs that require different SLA requirements, Aurora allows jobs to 
specify their own
+SLA requirements via the `SlaPolicies`. There are 3 different ways to express 
SLA requirements.
+
+### [Count-based](../reference/configuration.md#countslapolicy-objects)
+
+For jobs that need a minimum `number` of instances to be running all the time,
+[`CountSlaPolicy`](../reference/configuration.md#countslapolicy-objects)
+provides the ability to express the minimum number of required active 
instances (i.e. number of
+tasks that are `RUNNING` for at least `duration_secs`). For instance, if we 
have a
+`replicated-service` that has 3 instances and needs at least 2 instances every 
30 minutes to be
+treated healthy, the SLA requirement can be expressed with a
+[`CountSlaPolicy`](../reference/configuration.md#countslapolicy-objects) like 
below,
+
+```python
+Job(
+  name = 'replicated-service',
+  role = 'www-data',
+  instances = 3,
+  sla_policy = CountSlaPolicy(
+    count = 2,
+    duration_secs = 1800
+  )
+  ...
+)
+```
+
+### 
[Percentage-based](../reference/configuration.md#percentageslapolicy-objects)
+
+For jobs that need a minimum `percentage` of instances to be running all the 
time,
+[`PercentageSlaPolicy`](../reference/configuration.md#percentageslapolicy-objects)
 provides the
+ability to express the minimum percentage of required active instances (i.e. 
percentage of tasks
+that are `RUNNING` for at least `duration_secs`). For instance, if we have a 
`webservice` that
+has 10000 instances for handling peak load and cannot have more than 0.1% of 
the instances down
+for every 1 hr, the SLA requirement can be expressed with a
+[`PercentageSlaPolicy`](../reference/configuration.md#percentageslapolicy-objects)
 like below,
+
+```python
+Job(
+  name = 'frontend-service',
+  role = 'www-data',
+  instances = 10000,
+  sla_policy = PercentageSlaPolicy(
+    percentage = 99.9,
+    duration_secs = 3600
+  )
+  ...
+)
+```
+
+### 
[Coordinator-based](../reference/configuration.md#coordinatorslapolicy-objects)
+
+When none of the above methods are enough to describe the SLA requirements for 
a job, then the SLA
+calculation can be off-loaded to a custom service called the `Coordinator`. 
The `Coordinator` needs
+to expose an endpoint that will be called to check if removal of a task will 
affect the SLA
+requirements for the job. This is useful to control the number of tasks that 
undergoes maintenance
+at a time, without affected the SLA for the application.
+
+Consider the example, where we have a `storage-service` stores 2 replicas of 
an object. Each replica
+is distributed across the instances, such that replicas are stored on 
different hosts. In addition
+a consistent-hash is used for distributing the data across the instances.
+
+When an instance needs to be drained (say for host-maintenance), we have to 
make sure that at least 1 of
+the 2 replicas remains available. In such a case, a `Coordinator` service can 
be used to maintain
+the SLA guarantees required for the job.
+
+The job can be configured with a
+[`CoordinatorSlaPolicy`](../reference/configuration.md#coordinatorslapolicy-objects)
 to specify the
+coordinator endpoint and the field in the response JSON that indicates if the 
SLA will be affected
+or not affected, when the task is removed.
+
+```python
+Job(
+  name = 'storage-service',
+  role = 'www-data',
+  sla_policy = CoordinatorSlaPolicy(
+    coordinator_url = 'http://coordinator.example.com',
+    status_key = 'drain'
+  )
+  ...
+)
+```
+
+
+#### Coordinator Interface [Experimental]
+
+When a 
[`CoordinatorSlaPolicy`](../reference/configuration.md#coordinatorslapolicy-objects)
 is
+specified for a job, any action that requires removing a task
+(such as drains) will be required to get approval from the `Coordinator` 
before proceeding. The
+coordinator service needs to expose a HTTP endpoint, that can take a 
`task-key` param
+(`<cluster>/<role>/<env>/<name>/<instance>`) and a json body describing the 
task
+details and return a response json that will contain the boolean status for 
allowing or disallowing
+the task's removal.
+
+##### Request:
+```javascript
+POST /
+  ?task=<cluster>/<role>/<env>/<name>/<instance>
+
+{
+  "assignedTask": {
+    "taskId": "taskA",
+    "slaveHost": "a",
+    "task": {
+      "job": {
+        "role": "role",
+        "environment": "devel",
+        "name": "job"
+      },
+      ...
+    },
+    "assignedPorts": {
+      "http": 1000
+    },
+    "instanceId": 1
+    ...
+  },
+  ...
+}
+```
+
+##### Response:
+```json
+{
+  "drain": true
+}
+```
+
+If Coordinator allows removal of the task, then the task’s
+[termination 
lifecycle](../reference/configuration.md#httplifecycleconfig-objects)
+is triggered. If Coordinator does not allow removal, then the request will be 
retried again in the
+future.
+
+#### Coordinator Actions
+
+Coordinator endpoint get its own lock and this is used to serializes calls to 
the Coordinator.
+It guarantees that only one concurrent request is sent to a coordinator 
endpoint. This allows
+coordinators to simply look the current state of the tasks to determine its 
SLA (without having
+to worry about in-flight and pending requests). However if there are multiple 
coordinators,
+maintenance can be done in parallel across all the coordinators.
+
+_Note: Single concurrent request to a coordinator endpoint does not translate 
as exactly-once
+guarantee. The coordinator must be able to handle duplicate drain
+requests for the same task._
+
+
+

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/docs/operations/configuration.md
----------------------------------------------------------------------
diff --git a/docs/operations/configuration.md b/docs/operations/configuration.md
index 85a6fab..4890c09 100644
--- a/docs/operations/configuration.md
+++ b/docs/operations/configuration.md
@@ -312,19 +312,19 @@ increased).
 
 To enable this in the Scheduler, you can set the following options:
 
-    --enable_update_affinity=true
-    --update_affinity_reservation_hold_time=3mins
+    -enable_update_affinity=true
+    -update_affinity_reservation_hold_time=3mins
 
 You will need to tune the hold time to match the behavior you see in your 
cluster. If you have extremely
 high update throughput, you might have to extend it as processing updates 
could easily add significant
 delays between scheduling attempts. You may also have to tune scheduling 
parameters to achieve the
 throughput you need in your cluster. Some relevant settings (with defaults) 
are:
 
-    --max_schedule_attempts_per_sec=40
-    --initial_schedule_penalty=1secs
-    --max_schedule_penalty=1mins
-    --scheduling_max_batch_size=3
-    --max_tasks_per_schedule_attempt=5
+    -max_schedule_attempts_per_sec=40
+    -initial_schedule_penalty=1secs
+    -max_schedule_penalty=1mins
+    -scheduling_max_batch_size=3
+    -max_tasks_per_schedule_attempt=5
 
 There are metrics exposed by the Scheduler which can provide guidance on where 
the bottleneck is.
 Example metrics to look at:
@@ -337,3 +337,44 @@ Example metrics to look at:
 Most likely you'll run into limits with the number of update instances that 
can be processed per minute
 before you run into any other limits. So if your total work done per minute 
starts to exceed 2k instances,
 you may need to extend the update_affinity_reservation_hold_time.
+
+## Cluster Maintenance
+
+Aurora performs maintenance related task drains. One of the scheduler options 
that can control
+how often the scheduler polls for maintenance work can be controlled via,
+
+    -host_maintenance_polling_interval=1min
+
+## Enforcing SLA limitations
+
+Since tasks can specify their own `SLAPolicy`, the cluster needs to limit 
these SLA requirements.
+Too aggressive a requirement can permanently block any type of maintenance work
+(ex: OS/Kernel/Security upgrades) on a host and hold it hostage.
+
+An operator can control the limits for SLA requirements via these scheduler 
configuration options:
+
+    -max_sla_duration_secs=2hrs
+    -min_required_instances_for_sla_check=20
+
+_Note: These limits only apply for `CountSlaPolicy` and `PercentageSlaPolicy`._
+
+### Limiting Coordinator SLA
+
+With `CoordinatorSlaPolicy` the SLA calculation is off-loaded to an external 
HTTP service. Some
+relevant scheduler configuration options are,
+
+    -sla_coordinator_timeout=1min
+    -max_parallel_coordinated_maintenance=10
+
+Since handing off the SLA calculation to an external service can potentially 
block maintenance
+on hosts for an indefinite amount of time (either due to a mis-configured 
coordinator or due to
+a valid degraded service). In those situations the following metrics will be 
helpful to identify the
+offending tasks.
+
+    sla_coordinator_user_errors_*     (counter tracking number of times the 
coordinator for the task
+                                       returned a bad response.)
+    sla_coordinator_errors_*          (counter tracking number of times the 
scheduler was not able
+                                       to communicate with the coordinator of 
the task.)
+    sla_coordinator_lock_starvation_* (counter tracking number of times the 
scheduler was not able to
+                                       get the lock for the coordinator of the 
task.)
+

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/docs/reference/configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md
index d4b869b..acab4c5 100644
--- a/docs/reference/configuration.md
+++ b/docs/reference/configuration.md
@@ -23,6 +23,7 @@ configuration design.
     - [Announcer Objects](#announcer-objects)
     - [Container Objects](#container)
     - [LifecycleConfig Objects](#lifecycleconfig-objects)
+    - [SlaPolicy Objects](#slapolicy-objects)
 - [Specifying Scheduling Constraints](#specifying-scheduling-constraints)
 - [Template Namespaces](#template-namespaces)
     - [mesos Namespace](#mesos-namespace)
@@ -343,7 +344,7 @@ Job Schema
   ```contact``` | String | Best email address to reach the owner of the job. 
For production jobs, this is usually a team mailing list.
   ```instances```| Integer | Number of instances (sometimes referred to as 
replicas or shards) of the task to create. (Default: 1)
   ```cron_schedule``` | String | Cron schedule in cron format. May only be 
used with non-service jobs. See [Cron Jobs](../features/cron-jobs.md) for more 
information. Default: None (not a cron job.)
-  ```cron_collision_policy``` | String | Policy to use when a cron job is 
triggered while a previous run is still active. KILL_EXISTING Kill the previous 
run, and schedule the new run CANCEL_NEW Let the previous run continue, and 
cancel the new run. (Default: KILL_EXISTING)
+  ```cron_collision_policy``` | String | Policy to use when a cron job is 
triggered while a previous run is still active. KILL\_EXISTING Kill the 
previous run, and schedule the new run CANCEL\_NEW Let the previous run 
continue, and cancel the new run. (Default: KILL_EXISTING)
   ```update_config``` | ```UpdateConfig``` object | Parameters for controlling 
the rate and policy of rolling updates.
   ```constraints``` | dict | Scheduling constraints for the tasks. See the 
section on the [constraint specification 
language](#specifying-scheduling-constraints)
   ```service``` | Boolean | If True, restart tasks regardless of success or 
failure. (Default: False)
@@ -359,6 +360,7 @@ Job Schema
   ```partition_policy``` | ```PartitionPolicy``` object | An optional 
partition policy that allows job owners to define how to handle partitions for 
running tasks (in partition-aware Aurora clusters)
   ```metadata``` | list of ```Metadata``` objects | list of ```Metadata``` 
objects for user's customized metadata information.
   ```executor_config``` | ```ExecutorConfig``` object | Allows choosing an 
alternative executor defined in `custom_executor_config` to be used instead of 
Thermos. Tasks will be launched with Thermos as the executor by default. See 
[Custom Executors](../features/custom-executors.md) for more info.
+  ```sla_policy``` |  Choice of ```CountSlaPolicy```, 
```PercentageSlaPolicy``` or ```CoordinatorSlaPolicy``` object | An optional 
SLA policy that allows job owners to describe the SLA requirements for the job. 
See [SlaPolicy Objects](#slapolicy-objects) for more information.
 
 
 ### UpdateConfig Objects
@@ -564,7 +566,7 @@ See [Docker Command Line 
Reference](https://docs.docker.com/reference/commandlin
   ```graceful_shutdown_wait_secs``` | Integer | The amount of time (in 
seconds) to wait after hitting the ```graceful_shutdown_endpoint``` before 
proceeding with the [task termination 
lifecycle](https://aurora.apache.org/documentation/latest/reference/task-lifecycle/#forceful-termination-killing-restarting).
 (Default: 5)
   ```shutdown_wait_secs```          | Integer | The amount of time (in 
seconds) to wait after hitting the ```shutdown_endpoint``` before proceeding 
with the [task termination 
lifecycle](https://aurora.apache.org/documentation/latest/reference/task-lifecycle/#forceful-termination-killing-restarting).
 (Default: 5)
 
-#### graceful_shutdown_endpoint
+#### graceful\_shutdown\_endpoint
 
 If the Job is listening on the port as specified by the HttpLifecycleConfig
 (default: `health`), a HTTP POST request will be sent over localhost to this
@@ -581,6 +583,34 @@ does not shut down on its own after `shutdown_wait_secs` 
seconds, it will be
 forcefully killed.
 
 
+### SlaPolicy Objects
+
+Configuration for specifying custom [SLA 
requirements](../features/sla-requirements.md) for a job. There are 3 supported 
SLA policies
+namely, [`CountSlaPolicy`](#countslapolicy-objects), 
[`PercentageSlaPolicy`](#percentageslapolicy-objects) and 
[`CoordinatorSlaPolicy`](#coordinatorslapolicy-objects).
+
+
+### CountSlaPolicy Objects
+
+  param                             | type    | description
+  -----                             | :----:  | -----------
+  ```count```                       | Integer | The number of active instances 
required every `durationSecs`.
+  ```duration_secs```               | Integer | Minimum time duration a task 
needs to be `RUNNING` to be treated as active.
+
+### PercentageSlaPolicy Objects
+
+  param                             | type    | description
+  -----                             | :----:  | -----------
+  ```percentage```                  | Float   | The percentage of active 
instances required every `durationSecs`.
+  ```duration_secs```               | Integer | Minimum time duration a task 
needs to be `RUNNING` to be treated as active.
+
+### CoordinatorSlaPolicy Objects
+
+  param                             | type    | description
+  -----                             | :----:  | -----------
+  ```coordinator_url```             | String  | The URL to the 
[Coordinator](../features/sla-requirements.md#coordinator) service to be 
contacted before performing SLA affecting actions (job updates, host drains 
etc).
+  ```status_key```                  | String  | The field in the Coordinator 
response that indicates the SLA status for working on the task. (Default: 
`drain`)
+
+
 Specifying Scheduling Constraints
 =================================
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/docs/reference/scheduler-configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/scheduler-configuration.md 
b/docs/reference/scheduler-configuration.md
index a659cfa..805e516 100644
--- a/docs/reference/scheduler-configuration.md
+++ b/docs/reference/scheduler-configuration.md
@@ -106,6 +106,8 @@ Optional flags:
        Minimum guaranteed time for task history retention before any pruning 
is attempted.
 -history_prune_threshold (default (2, days))
        Time after which the scheduler will prune terminated task history.
+-host_maintenance_polling_interval (default (1, minute))
+       Interval between polling for pending host maintenance requests.
 -hostname
        The hostname to advertise in ZooKeeper instead of the locally-resolved 
hostname.
 -http_authentication_mechanism (default NONE)
@@ -134,6 +136,8 @@ Optional flags:
        Maximum delay between attempts to schedule a flapping task.
 -max_leading_duration (default (1, days))
        After leading for this duration, the scheduler should commit suicide.
+-max_parallel_coordinated_maintenance (default 10)
+       Maximum number of coordinators that can be contacted in parallel.
 -max_registration_delay (default (1, mins))
        Max allowable delay to allow the driver to register before aborting
 -max_reschedule_task_delay_on_startup (default (30, secs))
@@ -144,6 +148,8 @@ Optional flags:
        Maximum number of scheduling attempts to make per second.
 -max_schedule_penalty (default (1, mins))
        Maximum delay between attempts to schedule a PENDING tasks.
+-max_sla_duration_secs (default (2, hrs))
+       Maximum duration window for which SLA requirements are to be satisfied. 
This does not apply to jobs that have a CoordinatorSlaPolicy.
 -max_status_update_batch_size (default 1000) [must be > 0]
        The maximum number of status updates that can be processed in a batch.
 -max_task_event_batch_size (default 300) [must be > 0]
@@ -156,6 +162,8 @@ Optional flags:
        Upper limit on the number of failures allowed during a job update. This 
helps cap potentially unbounded entries into storage.
 -min_offer_hold_time (default (5, mins))
        Minimum amount of time to hold a resource offer before declining.
+-min_required_instances_for_sla_check (default 20)
+       Minimum number of instances required for a job to be eligible for SLA 
check. This does not apply to jobs that have a CoordinatorSlaPolicy.
 -native_log_election_retries (default 20)
        The maximum number of attempts to obtain a new log writer.
 -native_log_election_timeout (default (15, secs))
@@ -214,6 +222,8 @@ Optional flags:
        Path to shiro.ini for authentication and authorization configuration.
 -shiro_realm_modules (default [class 
org.apache.aurora.scheduler.http.api.security.IniShiroRealmModule])
        Guice modules for configuring Shiro Realms.
+-sla_coordinator_timeout (default (1, min)) [must be > 0]
+       Timeout interval for communicating with Coordinator.
 -sla_non_prod_metrics (default [])
        Metric categories collected for non production tasks.
 -sla_prod_metrics (default [JOB_UPTIMES, PLATFORM_UPTIME, MEDIANS])

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/examples/vagrant/systemd/aurora-scheduler.service
----------------------------------------------------------------------
diff --git a/examples/vagrant/systemd/aurora-scheduler.service 
b/examples/vagrant/systemd/aurora-scheduler.service
index 57e4bba..fa76ad0 100644
--- a/examples/vagrant/systemd/aurora-scheduler.service
+++ b/examples/vagrant/systemd/aurora-scheduler.service
@@ -31,6 +31,8 @@ Environment="JAVA_OPTS='-Djava.library.path=/usr/lib \
   -Xdebug \
   -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005'"
 WorkingDirectory=/home/vagrant/aurora/dist/install/aurora-scheduler
+
+# min_offer_hold_time is set to 1mins to shorten time to update attributes on 
offers. AURORA-1985.
 
ExecStart=/home/vagrant/aurora/dist/install/aurora-scheduler/bin/aurora-scheduler
 \
   -cluster_name=devcluster \
   -hostname=aurora.local \
@@ -58,7 +60,9 @@ 
ExecStart=/home/vagrant/aurora/dist/install/aurora-scheduler/bin/aurora-schedule
   -allow_container_volumes=true \
   -offer_filter_duration=0secs \
   -mesos_driver=V1_DRIVER \
-  -unavailability_threshold=1mins
+  -unavailability_threshold=1mins \
+  -min_required_instances_for_sla_check=2 \
+  -min_offer_hold_time=1mins
 User=root
 Group=root
 Restart=always

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java 
b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
index ffc0744..edc13d4 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -26,6 +26,7 @@ import com.google.inject.AbstractModule;
 
 import org.apache.aurora.GuiceUtils;
 import org.apache.aurora.common.inject.TimedInterceptor;
+import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.Stats;
 import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.util.Clock;
@@ -46,6 +47,7 @@ import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.http.JettyServerModule;
+import org.apache.aurora.scheduler.maintenance.MaintenanceModule;
 import org.apache.aurora.scheduler.mesos.SchedulerDriverModule;
 import org.apache.aurora.scheduler.metadata.MetadataModule;
 import org.apache.aurora.scheduler.offers.OfferManagerModule;
@@ -146,6 +148,8 @@ public class AppModule extends AbstractModule {
             opts.main.allowGpuResource,
             opts.app.enableMesosFetcher,
             opts.app.allowContainerVolumes,
+            opts.sla.minRequiredInstances,
+            opts.sla.maxSlaDuration.as(Time.SECONDS),
             opts.app.allowedJobEnvironments),
         opts.main.driverImpl,
         opts);
@@ -190,6 +194,7 @@ public class AppModule extends AbstractModule {
     install(new StateModule(options));
     install(new SlaModule(options.sla));
     install(new UpdaterModule(options.updater));
+    install(new MaintenanceModule(options.maintenance));
     bind(StatsProvider.class).toInstance(Stats.STATS_PROVIDER);
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java 
b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
index a2fb039..2dbd535 100644
--- a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
+++ b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
@@ -35,6 +35,7 @@ import 
org.apache.aurora.scheduler.http.api.security.HttpSecurityModule;
 import org.apache.aurora.scheduler.http.api.security.IniShiroRealmModule;
 import org.apache.aurora.scheduler.http.api.security.Kerberos5ShiroRealmModule;
 import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;
+import org.apache.aurora.scheduler.maintenance.MaintenanceModule;
 import org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule;
 import org.apache.aurora.scheduler.offers.OfferManagerModule;
 import org.apache.aurora.scheduler.preemptor.PreemptorModule;
@@ -86,6 +87,7 @@ public class CliOptions {
   public final StatsModule.Options stats = new StatsModule.Options();
   public final CronModule.Options cron = new CronModule.Options();
   public final ResourceSettings resourceSettings = new ResourceSettings();
+  public final MaintenanceModule.Options maintenance = new 
MaintenanceModule.Options();
   final List<Object> custom;
 
   public CliOptions() {

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
 
b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
index 4073229..2fd1746 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
@@ -17,7 +17,6 @@ import java.util.List;
 import java.util.Optional;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-
 import javax.annotation.Nullable;
 import javax.inject.Inject;
 
@@ -30,8 +29,11 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
 import org.apache.aurora.gen.Container;
+import org.apache.aurora.gen.CountSlaPolicy;
 import org.apache.aurora.gen.DockerParameter;
 import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.PercentageSlaPolicy;
+import org.apache.aurora.gen.SlaPolicy;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.scheduler.TierManager;
@@ -99,6 +101,8 @@ public class ConfigurationManager {
     private final boolean enableMesosFetcher;
     private final boolean allowContainerVolumes;
     private final Pattern allowedJobEnvironments;
+    private final int minRequiredInstances;
+    private final long maxSlaDurationSecs;
 
     public ConfigurationManagerSettings(
         ImmutableSet<Container._Fields> allowedContainerTypes,
@@ -108,6 +112,8 @@ public class ConfigurationManager {
         boolean allowGpuResource,
         boolean enableMesosFetcher,
         boolean allowContainerVolumes,
+        int minRequiredInstances,
+        long maxSlaDurationSecs,
         String allowedJobEnvironment) {
 
       this.allowedContainerTypes = requireNonNull(allowedContainerTypes);
@@ -118,6 +124,8 @@ public class ConfigurationManager {
       this.enableMesosFetcher = enableMesosFetcher;
       this.allowContainerVolumes = allowContainerVolumes;
       this.allowedJobEnvironments = 
Pattern.compile(requireNonNull(allowedJobEnvironment));
+      this.minRequiredInstances = minRequiredInstances;
+      this.maxSlaDurationSecs = maxSlaDurationSecs;
     }
   }
 
@@ -196,7 +204,9 @@ public class ConfigurationManager {
     }
 
     builder.setTaskConfig(
-        
validateAndPopulate(ITaskConfig.build(builder.getTaskConfig())).newBuilder());
+        validateAndPopulate(
+            ITaskConfig.build(builder.getTaskConfig()),
+            builder.getInstanceCount()).newBuilder());
 
     // Only one of [service=true, cron_schedule] may be set.
     if (!Strings.isNullOrEmpty(job.getCronSchedule()) && 
builder.getTaskConfig().isIsService()) {
@@ -207,6 +217,89 @@ public class ConfigurationManager {
     return IJobConfiguration.build(builder);
   }
 
+  /**
+   * Validates the {@link TaskConfig}'s {@link SlaPolicy}, if any.
+   *
+   * A valid {@link SlaPolicy} is one that allows at least 1 instance to be 
drained for a job.
+   *
+   * @param config {@link TaskConfig} to be validated.
+   * @param instanceCount number of instances in the job.
+   * @throws TaskDescriptionException thrown when {@link SlaPolicy} is not 
valid.
+   */
+  private void validateSlaPolicy(
+      TaskConfig config,
+      int instanceCount) throws TaskDescriptionException {
+
+    if (config.isSetSlaPolicy()) {
+      if (tierManager.getTier(ITaskConfig.build(config)).isRevocable()
+          || tierManager.getTier(ITaskConfig.build(config)).isPreemptible()) {
+        throw new TaskDescriptionException(String.format(
+            "Tier '%s' does not support SlaPolicy.",
+            config.getTier()));
+      }
+
+      SlaPolicy slaPolicy = config.getSlaPolicy();
+      if (!slaPolicy.isSetCoordinatorSlaPolicy()
+          && instanceCount < settings.minRequiredInstances) {
+        throw new TaskDescriptionException(String.format(
+            "Job with fewer than %d instances cannot have Percentage/Count 
SlaPolicy.",
+            settings.minRequiredInstances));
+      }
+
+      if (slaPolicy.isSetCountSlaPolicy()) {
+        validateCountSlaPolicy(instanceCount, slaPolicy.getCountSlaPolicy());
+      }
+
+      if (slaPolicy.isSetPercentageSlaPolicy()) {
+        validatePercentageSlaPolicy(instanceCount, 
slaPolicy.getPercentageSlaPolicy());
+      }
+    }
+  }
+
+  private void validatePercentageSlaPolicy(
+      int instanceCount,
+      PercentageSlaPolicy slaPolicy) throws TaskDescriptionException {
+    if (slaPolicy.isSetPercentage()) {
+      double percentage = slaPolicy.getPercentage() / 100.0;
+      if (instanceCount - instanceCount * percentage < 1) {
+        throw new TaskDescriptionException(String.format(
+            "Current PercentageSlaPolicy: percentage=%f will not allow any 
instances to be killed. "
+                + "Must be less than %f.",
+            slaPolicy.getPercentage(),
+            ((double) (instanceCount - 1)) / instanceCount * 100.0));
+      }
+    }
+
+    if (slaPolicy.isSetDurationSecs()
+        && slaPolicy.getDurationSecs() > settings.maxSlaDurationSecs) {
+      throw new TaskDescriptionException(String.format(
+          "PercentageSlaPolicy: durationSecs=%d must be less than cluster-wide 
maximum of %d secs.",
+          slaPolicy.getDurationSecs(),
+          settings.maxSlaDurationSecs));
+    }
+  }
+
+  private void validateCountSlaPolicy(
+      int instanceCount,
+      CountSlaPolicy slaPolicy) throws TaskDescriptionException {
+    if (slaPolicy.isSetCount()
+        && instanceCount - slaPolicy.getCount() < 1) {
+      throw new TaskDescriptionException(String.format(
+          "Current CountSlaPolicy: count=%d will not allow any instances to be 
killed. "
+              + "Must be less than instanceCount=%d.",
+          slaPolicy.getCount(),
+          instanceCount));
+    }
+
+    if (slaPolicy.isSetDurationSecs()
+        && slaPolicy.getDurationSecs() > settings.maxSlaDurationSecs) {
+      throw new TaskDescriptionException(String.format(
+          "CountSlaPolicy: durationSecs=%d must be less than cluster-wide 
maximum of %d secs.",
+          slaPolicy.getDurationSecs(),
+          settings.maxSlaDurationSecs));
+    }
+  }
+
   @VisibleForTesting
   static final String NO_DOCKER_PARAMETERS =
       "This scheduler is configured to disallow Docker parameters.";
@@ -236,10 +329,14 @@ public class ConfigurationManager {
    *
    *
    * @param config Task config to validate and populate.
+   * @param instanceCount The number of instances in the job.
    * @return A reference to the modified {@code config} (for chaining).
    * @throws TaskDescriptionException If the task is invalid.
    */
-  public ITaskConfig validateAndPopulate(ITaskConfig config) throws 
TaskDescriptionException {
+  public ITaskConfig validateAndPopulate(
+      ITaskConfig config,
+      int instanceCount) throws TaskDescriptionException {
+
     TaskConfig builder = config.newBuilder();
 
     if (config.isSetTier() && 
!UserProvidedStrings.isGoodIdentifier(config.getTier())) {
@@ -374,6 +471,8 @@ public class ConfigurationManager {
       }
     }
 
+    validateSlaPolicy(builder, instanceCount);
+
     maybeFillLinks(builder);
 
     return ITaskConfig.build(builder);

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java
 
b/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java
new file mode 100644
index 0000000..dd2462d
--- /dev/null
+++ 
b/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java
@@ -0,0 +1,470 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.maintenance;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import javax.inject.Inject;
+import javax.inject.Qualifier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractScheduledService;
+
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.gen.HostMaintenanceRequest;
+import org.apache.aurora.gen.HostStatus;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.PercentageSlaPolicy;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.SlaPolicy;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.config.types.TimeAmount;
+import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.sla.SlaManager;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IHostMaintenanceRequest;
+import org.apache.aurora.scheduler.storage.entities.IHostStatus;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ISlaPolicy;
+import org.apache.mesos.v1.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.util.Objects.requireNonNull;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
+import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
+
+/**
+ * Logic that puts hosts into maintenance mode, and triggers draining of hosts 
upon request.
+ * All state-changing functions return their results.  Additionally, all 
state-changing functions
+ * will ignore requests to change state of unknown hosts and subsequently omit 
these hosts from
+ * return values.
+ */
+public interface MaintenanceController {
+
+  /**
+   * Places hosts in maintenance mode.
+   * Hosts in maintenance mode are less-preferred for scheduling.
+   * No change will be made for hosts that are not recognized, and 
unrecognized hosts will not be
+   * included in the result.
+   *
+   * @param hosts Hosts to put into maintenance mode.
+   * @return The adjusted state of the hosts.
+   */
+  Set<IHostStatus> startMaintenance(Set<String> hosts);
+
+  /**
+   * Initiate a drain of all active tasks on {@code hosts}.
+   *
+   * @param hosts Hosts to drain.
+   * @return The adjusted state of the hosts.  Hosts without any active tasks 
will be immediately
+   *         moved to DRAINED.
+   */
+  Set<IHostStatus> drain(Set<String> hosts);
+
+  /**
+   * Initiate an SLA-aware drain of all active tasks on {@code hosts}.
+   *
+   * @param hosts Hosts to drain.
+   * @param defaultSlaPolicy SlaPolicy to use if a task does not have an 
SlaPolicy.
+   * @param timeoutSecs Interval after which tasks will be forcefully drained 
without checking SLA.
+   * @return The adjusted state of the hosts. Hosts without any active tasks 
will be immediately
+   *         moved to DRAINED.
+   */
+  Set<IHostStatus> slaDrain(Set<String> hosts, SlaPolicy defaultSlaPolicy, 
long timeoutSecs);
+
+  /**
+   * Drain tasks defined by the inverse offer.
+   * This method doesn't set any host attributes.
+   *
+   * @param inverseOffer the inverse offer to use.
+   */
+  void drainForInverseOffer(Protos.InverseOffer inverseOffer);
+
+  /**
+   * Fetches the current maintenance mode of {$code host}.
+   *
+   * @param host Host to fetch state for.
+   * @return Maintenance mode of host, {@link MaintenanceMode#NONE} if the 
host is not known.
+   */
+  MaintenanceMode getMode(String host);
+
+  /**
+   * Fetches the current state of {@code hosts}.
+   *
+   * @param hosts Hosts to fetch state for.
+   * @return The state of the hosts.
+   */
+  Set<IHostStatus> getStatus(Set<String> hosts);
+
+  /**
+   * Moves {@code hosts} out of maintenance mode, returning them to mode NONE.
+   *
+   * @param hosts Hosts to move out of maintenance mode.
+   * @return The adjusted state of the hosts.
+   */
+  Set<IHostStatus> endMaintenance(Set<String> hosts);
+
+  /**
+   * Records the maintenance requests for hosts and drains any active tasks 
from the host
+   * asynchronously.
+   *
+   * Tasks are drained iff it will satisfy the required SLA for the task. 
Task's SLA is either the
+   * {@link SlaPolicy} configured as part of the TaskConfig or the default 
{@link SlaPolicy}
+   * specified as part of the maintenance request. If neither then the task is 
drained immediately.
+   *
+   * In order to avoid tasks from blocking maintenance perpetually each 
maintenance request has a
+   * timeout after which all tasks forcefully drained.
+   */
+  class MaintenanceControllerImpl
+      extends AbstractScheduledService implements MaintenanceController, 
EventSubscriber {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MaintenanceControllerImpl.class);
+
+    @VisibleForTesting
+    @Qualifier
+    @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+    public @interface PollingInterval { }
+
+    @VisibleForTesting
+    static final String DRAINING_MESSAGE = "Draining machine for maintenance.";
+
+    private static final String MISSING_MAINTENANCE_REQUEST = 
"missing_maintenance_request";
+    private static final SlaPolicy ZERO_PERCENT_SLA = 
SlaPolicy.percentageSlaPolicy(
+        new PercentageSlaPolicy()
+            .setPercentage(0)
+            .setDurationSecs(0));
+
+    private final Storage storage;
+    private final Amount<Long, Time> pollingInterval;
+    private final TaskEventBatchWorker batchWorker;
+    private final SlaManager slaManager;
+    private final StateManager stateManager;
+
+    private final AtomicLong missingMaintenanceCounter;
+
+    @Inject
+    public MaintenanceControllerImpl(
+        Storage storage,
+        @PollingInterval Amount<Long, Time> pollingInterval,
+        TaskEventBatchWorker batchWorker,
+        SlaManager slaManager,
+        StateManager stateManager,
+        StatsProvider statsProvider) {
+
+      this.storage = requireNonNull(storage);
+      this.pollingInterval = checkNotNull(pollingInterval);
+      this.batchWorker = requireNonNull(batchWorker);
+      this.slaManager = requireNonNull(slaManager);
+      this.stateManager = requireNonNull(stateManager);
+      this.missingMaintenanceCounter = 
statsProvider.makeCounter(MISSING_MAINTENANCE_REQUEST);
+    }
+
+    private Set<String> drainTasksOnHost(String host, StoreProvider store) {
+      Query.Builder query = Query.slaveScoped(host).active();
+
+      List<IScheduledTask> candidates = new 
ArrayList<>(store.getTaskStore().fetchTasks(query));
+
+      if (candidates.isEmpty()) {
+        LOG.info("No tasks to drain on host: {}", host);
+        return Collections.emptySet();
+      }
+
+      // shuffle the candidates to avoid head-of-line blocking
+      Collections.shuffle(candidates);
+      candidates.forEach(task -> drainTask(task, store));
+
+      return candidates.stream().map(Tasks::id).collect(Collectors.toSet());
+    }
+
+    private Set<IHostStatus> watchDrainingTasks(MutableStoreProvider store, 
Set<String> hosts) {
+      LOG.info("Hosts to drain: " + hosts);
+      Set<String> emptyHosts = Sets.newHashSet();
+      for (String host : hosts) {
+        Set<String> drainedTasks = drainTasksOnHost(host, store);
+        // If there are no tasks on the host, immediately transition to 
DRAINED.
+        if (drainedTasks.isEmpty()) {
+          emptyHosts.add(host);
+        }
+      }
+
+      return ImmutableSet.<IHostStatus>builder()
+          .addAll(setMaintenanceMode(store, emptyHosts, DRAINED))
+          .addAll(setMaintenanceMode(store, Sets.difference(hosts, 
emptyHosts), DRAINING))
+          .build();
+    }
+
+    /**
+     * Notifies the MaintenanceController that a task has changed state.
+     *
+     * @param change Event
+     */
+    @Subscribe
+    public void taskChangedState(final TaskStateChange change) {
+      if (Tasks.isTerminated(change.getNewState())) {
+        final String host = change.getTask().getAssignedTask().getSlaveHost();
+        batchWorker.execute(store -> {
+          // If the task _was_ associated with a draining host, and it was the 
last task on the
+          // host.
+          Optional<IHostAttributes> attributes =
+              store.getAttributeStore().getHostAttributes(host);
+          if (attributes.isPresent() && attributes.get().getMode() == 
DRAINING) {
+            Query.Builder builder = Query.slaveScoped(host).active();
+            Iterable<IScheduledTask> activeTasks = 
store.getTaskStore().fetchTasks(builder);
+            if (Iterables.isEmpty(activeTasks)) {
+              LOG.info("Moving host {} into DRAINED", host);
+              setMaintenanceMode(store, ImmutableSet.of(host), DRAINED);
+            } else {
+              LOG.info("Host {} is DRAINING with active tasks: {}", host, 
Tasks.ids(activeTasks));
+            }
+          }
+          return BatchWorker.NO_RESULT;
+        });
+      }
+    }
+
+    @Override
+    public Set<IHostStatus> startMaintenance(Set<String> hosts) {
+      return storage.write(
+          storeProvider -> setMaintenanceMode(storeProvider, hosts, 
MaintenanceMode.SCHEDULED));
+    }
+
+    private void recordMaintenanceRequests(
+        MutableStoreProvider store,
+        Set<String> hosts,
+        SlaPolicy defaultSlaPolicy,
+        long timeoutSecs) {
+
+      hosts.forEach(
+          host -> store.getHostMaintenanceStore().saveHostMaintenanceRequest(
+              IHostMaintenanceRequest.build(
+                  new HostMaintenanceRequest()
+                      .setHost(host)
+                      .setDefaultSlaPolicy(defaultSlaPolicy)
+                      .setTimeoutSecs(timeoutSecs)
+                      .setCreatedTimestampMs(System.currentTimeMillis()))));
+    }
+
+    @Override
+    public Set<IHostStatus> drain(Set<String> hosts) {
+      return storage.write(store -> {
+        // Create a dummy maintenance request zero percent sla and timeout to 
force drain.
+        recordMaintenanceRequests(store, hosts, ZERO_PERCENT_SLA, 0);
+        return watchDrainingTasks(store, hosts);
+      });
+    }
+
+    @Override
+    public Set<IHostStatus> slaDrain(
+        Set<String> hosts,
+        SlaPolicy defaultSlaPolicy,
+        long timeoutSecs) {
+
+      // We can have only one maintenance request per host at any time.
+      // So we will simply overwrite any existing request. If the current one 
is actively handled,
+      // during the write, the new one will just be a no-op, since the host is 
already being
+      // drained. If host is in DRAINED it will be moved back into DRAINING 
and then back into
+      // DRAINED without having to perform any work.
+      return storage.write(store -> {
+        recordMaintenanceRequests(store, hosts, defaultSlaPolicy, timeoutSecs);
+        return setMaintenanceMode(store, hosts, DRAINING);
+      });
+    }
+
+    private Optional<String> getHostname(Protos.InverseOffer offer) {
+      if (offer.getUrl().getAddress().hasHostname()) {
+        return Optional.of(offer.getUrl().getAddress().getHostname());
+      } else {
+        return Optional.empty();
+      }
+    }
+
+    @Override
+    public void drainForInverseOffer(Protos.InverseOffer offer) {
+      // TaskStore does not allow for querying by agent id.
+      Optional<String> hostname = getHostname(offer);
+
+      if (hostname.isPresent()) {
+        String host = hostname.get();
+        storage.write(storeProvider -> {
+          // Create a dummy maintenance request zero percent sla and timeout 
to force drain.
+          recordMaintenanceRequests(storeProvider, ImmutableSet.of(host), 
ZERO_PERCENT_SLA, 0);
+          return drainTasksOnHost(host, storeProvider);
+        });
+      } else {
+        LOG.error("Unable to drain tasks on agent {} because "
+            + "no hostname attached to inverse offer {}.", offer.getAgentId(), 
offer.getId());
+      }
+    }
+
+    private static final Function<IHostAttributes, String> HOST_NAME =
+        IHostAttributes::getHost;
+
+    private static final Function<IHostAttributes, IHostStatus> 
ATTRS_TO_STATUS =
+        attributes -> IHostStatus.build(
+            new 
HostStatus().setHost(attributes.getHost()).setMode(attributes.getMode()));
+
+    private static final Function<IHostStatus, MaintenanceMode> GET_MODE = 
IHostStatus::getMode;
+
+    @Override
+    public MaintenanceMode getMode(final String host) {
+      return storage.read(storeProvider -> 
storeProvider.getAttributeStore().getHostAttributes(host)
+          .map(ATTRS_TO_STATUS)
+          .map(GET_MODE)
+          .orElse(MaintenanceMode.NONE));
+    }
+
+    @Override
+    public Set<IHostStatus> getStatus(final Set<String> hosts) {
+      return storage.read(storeProvider -> {
+        // Warning - this is filtering _all_ host attributes.  If using this 
to frequently query
+        // for a small set of hosts, a getHostAttributes variant should be 
added.
+        return storeProvider.getAttributeStore().getHostAttributes().stream()
+            .filter(Predicates.compose(Predicates.in(hosts), HOST_NAME))
+            .map(ATTRS_TO_STATUS)
+            .collect(Collectors.toSet());
+      });
+    }
+
+    @Override
+    public Set<IHostStatus> endMaintenance(final Set<String> hosts) {
+      return storage.write(
+          storeProvider -> {
+            hosts.forEach(
+                h -> 
storeProvider.getHostMaintenanceStore().removeHostMaintenanceRequest(h));
+            return setMaintenanceMode(storeProvider, hosts, 
MaintenanceMode.NONE);
+          });
+    }
+
+    private Set<IHostStatus> setMaintenanceMode(
+        MutableStoreProvider storeProvider,
+        Set<String> hosts,
+        MaintenanceMode mode) {
+
+      AttributeStore.Mutable store = storeProvider.getAttributeStore();
+      ImmutableSet.Builder<IHostStatus> statuses = ImmutableSet.builder();
+      for (String host : hosts) {
+        LOG.info("Setting maintenance mode to {} for host {}", mode, host);
+        Optional<IHostAttributes> toSave = 
AttributeStore.Util.mergeMode(store, host, mode);
+        if (toSave.isPresent()) {
+          store.saveHostAttributes(toSave.get());
+          LOG.info("Updated host attributes: " + toSave.get());
+          statuses.add(IHostStatus.build(new 
HostStatus().setHost(host).setMode(mode)));
+        }
+      }
+      return statuses.build();
+    }
+
+    @VisibleForTesting
+    void runForTest() {
+      runOneIteration();
+    }
+
+    @Timed
+    @Override
+    protected void runOneIteration() {
+      LOG.info("Looking for hosts in DRAINING state");
+      storage.read(store -> {
+        store.getAttributeStore()
+            .getHostAttributes()
+            .stream()
+            .filter(h -> h.getMode() == DRAINING)
+            .forEach(h -> {
+              if (drainTasksOnHost(h.getHost(), store).isEmpty()) {
+                storage.write(mutable -> setMaintenanceMode(
+                    mutable,
+                    ImmutableSet.of(h.getHost()),
+                    DRAINED));
+              }
+            });
+        return null;
+      });
+    }
+
+    @Override
+    protected Scheduler scheduler() {
+      return Scheduler.newFixedDelaySchedule(
+          pollingInterval.getValue(),
+          pollingInterval.getValue(),
+          pollingInterval.getUnit().getTimeUnit());
+    }
+
+    private void drainTask(IScheduledTask task, StoreProvider store) {
+      String host = task.getAssignedTask().getSlaveHost();
+      Optional<IHostMaintenanceRequest> hostMaintenanceRequest =
+          store.getHostMaintenanceStore().getHostMaintenanceRequest(host);
+      if (!hostMaintenanceRequest.isPresent()) {
+        LOG.error("No maintenance request found for host: {}. Assuming SLA not 
satisfied.", host);
+        missingMaintenanceCounter.incrementAndGet();
+        return;
+      }
+
+      boolean force = false;
+      long expireMs =
+          System.currentTimeMillis() - 
hostMaintenanceRequest.get().getCreatedTimestampMs();
+      if (hostMaintenanceRequest.get().getTimeoutSecs()
+            < TimeAmount.of(expireMs, Time.MILLISECONDS).as(Time.SECONDS)) {
+        LOG.warn("Maintenance request timed out for host: {} after {} secs. 
Forcing drain of {}.",
+            host, hostMaintenanceRequest.get().getTimeoutSecs(), 
Tasks.id(task));
+        force = true;
+      }
+
+      final ISlaPolicy slaPolicy = 
task.getAssignedTask().getTask().isSetSlaPolicy()
+          ? task.getAssignedTask().getTask().getSlaPolicy()
+          : hostMaintenanceRequest.get().getDefaultSlaPolicy();
+
+      slaManager.checkSlaThenAct(
+          task,
+          slaPolicy,
+          storeProvider -> stateManager.changeState(
+              storeProvider,
+              Tasks.id(task),
+              Optional.empty(),
+              ScheduleStatus.DRAINING,
+              Optional.of(DRAINING_MESSAGE)),
+          force);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceModule.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceModule.java 
b/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceModule.java
new file mode 100644
index 0000000..f4f4ec3
--- /dev/null
+++ 
b/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceModule.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.maintenance;
+
+import java.util.concurrent.ScheduledExecutorService;
+import javax.inject.Singleton;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.inject.AbstractModule;
+import com.google.inject.PrivateModule;
+import com.google.inject.TypeLiteral;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.config.types.TimeAmount;
+import org.apache.aurora.scheduler.config.validators.PositiveAmount;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Binding module for maintenance related logic.
+ */
+public class MaintenanceModule extends AbstractModule {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MaintenanceModule.class);
+
+  @Parameters(separators = "=")
+  public static class Options {
+    @Parameter(names = "-host_maintenance_polling_interval",
+        validateValueWith = PositiveAmount.class,
+        description = "Interval between polling for pending host maintenance 
requests.")
+    public TimeAmount hostMaintenancePollingInterval = new TimeAmount(1, 
Time.MINUTES);
+  }
+
+  private final Options options;
+
+  public MaintenanceModule(final Options options) {
+    this.options = options;
+  }
+
+  @Override
+  protected void configure() {
+    bind(new TypeLiteral<Amount<Long, Time>>() { })
+        .annotatedWith(
+            
MaintenanceController.MaintenanceControllerImpl.PollingInterval.class)
+        .toInstance(options.hostMaintenancePollingInterval);
+    
bind(MaintenanceController.class).to(MaintenanceController.MaintenanceControllerImpl.class);
+    
bind(MaintenanceController.MaintenanceControllerImpl.class).in(Singleton.class);
+    PubsubEventModule.bindSubscriber(
+        binder(),
+        MaintenanceController.MaintenanceControllerImpl.class);
+
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        bind(ScheduledExecutorService.class).toInstance(
+            
AsyncUtil.singleThreadLoggingScheduledExecutor("MaintenanceController-%d", 
LOG));
+      }
+    });
+    SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
+        .to(MaintenanceController.MaintenanceControllerImpl.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java 
b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
index 3b4df55..53b14e9 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
@@ -39,10 +39,10 @@ import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.maintenance.MaintenanceController;
 import org.apache.aurora.scheduler.offers.HostOffer;
 import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.offers.OfferManagerModule;
-import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java 
b/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java
new file mode 100644
index 0000000..98bec48
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java
@@ -0,0 +1,437 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+import javax.inject.Qualifier;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Striped;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.config.types.TimeAmount;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import org.apache.aurora.scheduler.storage.entities.ICoordinatorSlaPolicy;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.IServerInfo;
+import org.apache.aurora.scheduler.storage.entities.ISlaPolicy;
+import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TSimpleJSONProtocol;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.Param;
+import org.asynchttpclient.Response;
+import org.asynchttpclient.util.HttpConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Provides methods for performing SLA-safe work. It is used for maintenance 
and job update
+ * operations to guarantee that a job's SLA requirements are always satisfied.
+ */
+public class SlaManager {
+  private static final Logger LOG = LoggerFactory.getLogger(SlaManager.class);
+  @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  @interface SlaManagerExecutor { }
+
+  @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  @interface MaxParallelCoordinators { }
+
+  @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  @interface HttpClient { }
+
+  @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  @interface MinRequiredInstances { }
+
+  @VisibleForTesting
+  static final String TASK_PARAM = "task";
+
+  private static final String ATTEMPTS_STAT_NAME = "sla_coordinator_attempts";
+  private static final String SUCCESS_STAT_NAME = "sla_coordinator_success";
+  private static final String ERRORS_STAT_NAME = "sla_coordinator_errors";
+  private static final String USER_ERRORS_STAT_NAME = 
"sla_coordinator_user_errors";
+  private static final String LOCK_STARVATION_STAT_NAME = 
"sla_coordinator_lock_starvation";
+
+  private final ScheduledExecutorService executor;
+  private final Storage storage;
+  private final IServerInfo serverInfo;
+  private final AsyncHttpClient httpClient;
+  private final Striped<Lock> lock;
+  private final int minRequiredInstances;
+  private final TierManager tierManager;
+
+  private final AtomicLong attemptsCounter;
+  private final AtomicLong successCounter;
+  private final AtomicLong errorsCounter;
+  private final AtomicLong userErrorsCounter;
+  private final AtomicLong lockStarvationCounter;
+  private final LoadingCache<String, AtomicLong> errorsByTaskCounter;
+  private final LoadingCache<String, AtomicLong> userErrorsByTaskCounter;
+  private final LoadingCache<String, AtomicLong> lockStarvationByTaskCounter;
+
+  @Inject
+  SlaManager(@SlaManagerExecutor ScheduledExecutorService executor,
+             @MaxParallelCoordinators Integer maxCoordinatorLocks,
+             @MinRequiredInstances Integer minRequiredInstances,
+             Storage storage,
+             IServerInfo serverInfo,
+             @HttpClient AsyncHttpClient httpClient,
+             TierManager tierManager,
+             StatsProvider statsProvider) {
+
+    this.executor = requireNonNull(executor);
+    this.storage = requireNonNull(storage);
+    this.serverInfo = requireNonNull(serverInfo);
+    this.httpClient = requireNonNull(httpClient);
+    this.tierManager = requireNonNull(tierManager);
+    this.minRequiredInstances = requireNonNull(minRequiredInstances);
+    this.attemptsCounter = statsProvider.makeCounter(ATTEMPTS_STAT_NAME);
+    this.successCounter = statsProvider.makeCounter(SUCCESS_STAT_NAME);
+    this.errorsCounter = statsProvider.makeCounter(ERRORS_STAT_NAME);
+    this.userErrorsCounter = statsProvider.makeCounter(USER_ERRORS_STAT_NAME);
+    this.lockStarvationCounter = 
statsProvider.makeCounter(LOCK_STARVATION_STAT_NAME);
+    this.lock = Striped.lazyWeakLock(requireNonNull(maxCoordinatorLocks));
+    this.errorsByTaskCounter = CacheBuilder.newBuilder().build(
+        new CacheLoader<String, AtomicLong>() {
+          @Override
+          public AtomicLong load(String key) {
+            return statsProvider.makeCounter(key);
+          }
+        }
+    );
+    this.userErrorsByTaskCounter = CacheBuilder.newBuilder().build(
+        new CacheLoader<String, AtomicLong>() {
+          @Override
+          public AtomicLong load(String key) {
+            return statsProvider.makeCounter(key);
+          }
+        }
+    );
+    this.lockStarvationByTaskCounter = CacheBuilder.newBuilder().build(
+        new CacheLoader<String, AtomicLong>() {
+          @Override
+          public AtomicLong load(String key) {
+            return statsProvider.makeCounter(key);
+          }
+        }
+    );
+  }
+
+  private long getSlaDuration(ISlaPolicy slaPolicy) {
+    if (slaPolicy.isSetPercentageSlaPolicy()) {
+      return slaPolicy.getPercentageSlaPolicy().getDurationSecs();
+    } else if (slaPolicy.isSetCountSlaPolicy()) {
+      return slaPolicy.getCountSlaPolicy().getDurationSecs();
+    }
+
+    throw new IllegalArgumentException("Expected a percentage/count sla 
policy.");
+  }
+
+  private boolean meetsSLAInstances(ISlaPolicy slaPolicy, long running, long 
total) {
+    if (slaPolicy.isSetPercentageSlaPolicy()) {
+      double percentageRunning = ((double) running / total) * 100.0;
+      double percentageRequired = 
slaPolicy.getPercentageSlaPolicy().getPercentage();
+      long numMaintenance = (long) ((1 - percentageRequired / 100.0) * total);
+      if (numMaintenance < 1) {
+        double updated = ((total - 1) / (double) total) * 100;
+        LOG.warn("Invalid PercentageSlaPolicy(percentage={}) for task with {} 
instances "
+                + "that allows {} instances to be in maintenance."
+                + "Using percentage={} which allows 1 instance to be under 
maintenance.",
+            percentageRequired, total, numMaintenance, updated);
+        percentageRequired = updated;
+      }
+      return percentageRunning >= percentageRequired;
+    } else if (slaPolicy.isSetCountSlaPolicy()) {
+      long numRequired = slaPolicy.getCountSlaPolicy().getCount();
+      long numMaintenance = total - numRequired;
+      if (numMaintenance < 1) {
+        long updated = total - 1;
+        LOG.warn("Invalid CountSlaPolicy(count={}) for task with {} instances "
+                + "that allows {} instances to be in maintenance."
+                + "Using count={} which allows 1 instance to be under 
maintenance.",
+            numRequired, total, numMaintenance, updated);
+        numRequired = updated;
+      }
+      return running >= numRequired;
+    }
+
+    throw new IllegalArgumentException("Expected a percentage/count sla 
policy.");
+  }
+
+  private boolean meetsSLADuration(IScheduledTask task, Amount<Long, Time> 
slaDuration) {
+    return task.getTaskEvents()
+        .stream()
+        .filter(te -> te.getStatus() == ScheduleStatus.RUNNING)
+        .map(ITaskEvent::getTimestamp)
+        .max(Long::compare)
+        .map(t -> System.currentTimeMillis() - t > 
slaDuration.as(Time.MILLISECONDS))
+        .orElse(false);
+  }
+
+  private boolean checkSla(IScheduledTask task, ISlaPolicy slaPolicy, 
StoreProvider store) {
+    // Find the number of active tasks for the job, it will be used as the set 
of tasks
+    // against which the percentage/count based SLA will be calculated against.
+    final long numActive = store.getTaskStore().fetchTasks(
+        Query.jobScoped(task.getAssignedTask().getTask().getJob()).active()
+    ).size();
+
+    if (skipSla(task, numActive)) {
+      LOG.info("Skip SLA for {} because it is not production or does not have 
enough instances.",
+          Tasks.id(task));
+      return true;
+    }
+
+    // Find tasks which have been RUNNING for the required SLA duration.
+    Amount<Long, Time> slaDuration = new TimeAmount(getSlaDuration(slaPolicy), 
Time.SECONDS);
+    final Set<IScheduledTask> running = store.getTaskStore().fetchTasks(
+        Query.jobScoped(task.getAssignedTask().getTask().getJob())
+            .byStatus(ScheduleStatus.RUNNING))
+        .stream()
+        .filter(t -> !Tasks.id(t).equals(Tasks.id(task))) // exclude the task 
to be removed
+        .filter(t -> meetsSLADuration(t, slaDuration)) // task is running for 
sla duration
+        .collect(Collectors.toSet());
+
+    // Check it we satisfy the number of RUNNING tasks per duration time.
+    boolean meetsSla = meetsSLAInstances(slaPolicy, running.size(), numActive);
+
+    LOG.info("SlaCheck: {}, {} tasks unaffected after updating state for {}.",
+        meetsSla,
+        running.size(),
+        Tasks.id(task));
+
+    return meetsSla;
+  }
+
+  /**
+   * Performs the supplied {@link Storage.MutateWork} after checking with the 
configured
+   * coordinator endpoint to make sure it is safe to perform the work.
+   *
+   * NOTE: Both the SLA check and the {@link Storage.MutateWork} will be 
performed within a
+   * {@link Lock} that is indexed by the coordinator url. We do this to make 
sure that mutations
+   * to the SLA are performed atomically, so the Coordinator does not have to 
track
+   * concurrent requests and simulate SLA changes.
+   *
+   * @param task Task whose SLA is to checked.
+   * @param slaPolicy {@link ICoordinatorSlaPolicy} to use for checking SLA.
+   * @param work {@link Storage.MutateWork} to perform, if SLA is satisfied.
+   * @param <T> The type of result the {@link Storage.MutateWork} produces.
+   * @param <E> The type of exception the {@link Storage.MutateWork} throw.
+   */
+  private <T, E extends Exception> void askCoordinatorThenAct(
+      IScheduledTask task,
+      ICoordinatorSlaPolicy slaPolicy,
+      Storage.MutateWork<T, E> work) {
+
+    String taskKey = getTaskKey(task);
+
+    LOG.debug("Awaiting lock on coordinator: {} for task: {}",
+        slaPolicy.getCoordinatorUrl(),
+        taskKey);
+    Lock l = lock.get(slaPolicy.getCoordinatorUrl());
+    if (l.tryLock()) {
+      try {
+        LOG.info("Acquired lock on coordinator: {} for task: {}",
+            slaPolicy.getCoordinatorUrl(),
+            taskKey);
+        attemptsCounter.incrementAndGet();
+
+        if (coordinatorAllows(task, taskKey, slaPolicy)) {
+          LOG.info("Performing work after coordinator: {} approval for task: 
{}",
+              slaPolicy.getCoordinatorUrl(),
+              taskKey);
+          storage.write(work);
+        }
+      } catch (RuntimeException e) {
+        LOG.error("Unexpected failure during coordinator sla check against: {} 
for task: {}",
+            slaPolicy.getCoordinatorUrl(),
+            taskKey,
+            e);
+        throw e;
+      } catch (Exception e) {
+        incrementErrorCount(ERRORS_STAT_NAME, taskKey);
+        LOG.error("Failed to talk to coordinator: {} for task: {}",
+            slaPolicy.getCoordinatorUrl(),
+            taskKey,
+            e);
+      } finally {
+        LOG.info("Releasing lock for coordinator: {} and task: {}",
+            slaPolicy.getCoordinatorUrl(),
+            taskKey);
+        l.unlock();
+      }
+    } else {
+      incrementErrorCount(LOCK_STARVATION_STAT_NAME, taskKey);
+      LOG.info("Failed to acquire lock on coordinator: {} for task: {}",
+          slaPolicy.getCoordinatorUrl(),
+          taskKey);
+    }
+  }
+
+  private boolean coordinatorAllows(
+      IScheduledTask task,
+      String taskKey,
+      ICoordinatorSlaPolicy slaPolicy)
+      throws InterruptedException, ExecutionException, TException {
+
+    LOG.info("Checking coordinator: {} for task: {}", 
slaPolicy.getCoordinatorUrl(), taskKey);
+
+    Response response = httpClient.preparePost(slaPolicy.getCoordinatorUrl())
+        .setQueryParams(ImmutableList.of(new Param(TASK_PARAM, taskKey)))
+        .setBody(new TSerializer(new 
TSimpleJSONProtocol.Factory()).toString(task.newBuilder()))
+        .execute()
+        .get();
+
+    if (response.getStatusCode() != HttpConstants.ResponseStatusCodes.OK_200) {
+      LOG.error("Request failed to coordinator: {} for task: {}. Response: {}",
+          slaPolicy.getCoordinatorUrl(),
+          taskKey,
+          response.getStatusCode());
+      incrementErrorCount(USER_ERRORS_STAT_NAME, taskKey);
+      return false;
+    }
+
+    successCounter.incrementAndGet();
+    String json = response.getResponseBody();
+    LOG.info("Got response: {} from {} for task: {}",
+        json,
+        slaPolicy.getCoordinatorUrl(),
+        taskKey);
+
+    Map<String, Boolean> result = new Gson().fromJson(
+        json,
+        new TypeReference<Map<String, Boolean>>() { }.getType());
+
+    return result.get(slaPolicy.isSetStatusKey() ? slaPolicy.getStatusKey() : 
"drain");
+  }
+
+  @VisibleForTesting
+  String getTaskKey(IScheduledTask task) {
+    IJobKey jobKey = Tasks.getJob(task);
+    int instanceId = Tasks.getInstanceId(task);
+    return String.format("%s/%s/%s/%s/%s", serverInfo.getClusterName(),
+        jobKey.getRole(), jobKey.getEnvironment(), jobKey.getName(), 
instanceId);
+  }
+
+  private void incrementErrorCount(String prefix, String taskKey) {
+    try {
+      if (prefix.equals(ERRORS_STAT_NAME)) {
+        errorsCounter.incrementAndGet();
+        errorsByTaskCounter.get(prefix + "_" + taskKey).incrementAndGet();
+      }
+
+      if (prefix.equals(USER_ERRORS_STAT_NAME)) {
+        userErrorsCounter.incrementAndGet();
+        userErrorsByTaskCounter.get(prefix + "_" + taskKey).incrementAndGet();
+      }
+
+      if (prefix.equals(LOCK_STARVATION_STAT_NAME)) {
+        lockStarvationCounter.incrementAndGet();
+        lockStarvationByTaskCounter.get(prefix + "_" + 
taskKey).incrementAndGet();
+      }
+    } catch (ExecutionException e) {
+      LOG.error("Failed increment failure metrics for task: {}", taskKey, e);
+    }
+  }
+
+  /**
+   * Checks the SLA for the given task using the {@link ISlaPolicy} supplied.
+   *
+   * @param task Task whose SLA is to be checked.
+   * @param slaPolicy {@link ISlaPolicy} to use.
+   * @param work {@link Storage.MutateWork} to perform, if SLA is satisfied.
+   * @param force boolean to indicate if work should be performed without 
checking SLA.
+   * @param <T> The type of result the {@link Storage.MutateWork} produces.
+   * @param <E> The type of exception the {@link Storage.MutateWork} throw.
+   * @throws E raises exception of type E if the {@link Storage.MutateWork} 
throws.
+   */
+  @Timed
+  public <T, E extends Exception> void checkSlaThenAct(
+      IScheduledTask task,
+      ISlaPolicy slaPolicy,
+      Storage.MutateWork<T, E> work,
+      boolean force) throws E {
+
+    if (force) {
+      LOG.info("Forcing work without applying SlaPolicy: {} for {}", 
slaPolicy, Tasks.id(task));
+      storage.write(work);
+      return;
+    }
+
+    LOG.info("Using SlaPolicy: {} for {}", slaPolicy, Tasks.id(task));
+
+    // has custom coordinator sla policy
+    if (slaPolicy.isSetCoordinatorSlaPolicy()) {
+      // schedule work to perform coordinated transition
+      executor.execute(() -> askCoordinatorThenAct(
+          task,
+          slaPolicy.getCoordinatorSlaPolicy(),
+          work));
+    } else {
+      // verify sla and perform work if satisfied
+      storage.write(store -> {
+        if (checkSla(task, slaPolicy, store)) {
+          work.apply(store);
+        }
+        return null; // TODO(sshanmugham) we need to satisfy the interface, 
refactor later
+      });
+    }
+  }
+
+  private boolean skipSla(IScheduledTask task, long numActive) {
+    if (!tierManager.getTier(task.getAssignedTask().getTask()).isPreemptible()
+        && 
!tierManager.getTier(task.getAssignedTask().getTask()).isRevocable()) {
+      return numActive < minRequiredInstances;
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java 
b/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
index 25ed474..07082a9 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
@@ -18,7 +18,6 @@ import java.lang.annotation.Target;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-
 import javax.inject.Inject;
 import javax.inject.Qualifier;
 
@@ -30,6 +29,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.inject.AbstractModule;
 import com.google.inject.Singleton;
+import com.google.inject.TypeLiteral;
 
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
@@ -39,6 +39,9 @@ import org.apache.aurora.scheduler.config.types.TimeAmount;
 import org.apache.aurora.scheduler.config.validators.PositiveAmount;
 import 
org.apache.aurora.scheduler.sla.MetricCalculator.MetricCalculatorSettings;
 import org.apache.aurora.scheduler.sla.MetricCalculator.MetricCategory;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,6 +54,7 @@ import static java.util.Objects.requireNonNull;
 import static 
org.apache.aurora.scheduler.sla.MetricCalculator.MetricCategory.JOB_UPTIMES;
 import static 
org.apache.aurora.scheduler.sla.MetricCalculator.MetricCategory.MEDIANS;
 import static 
org.apache.aurora.scheduler.sla.MetricCalculator.MetricCategory.PLATFORM_UPTIME;
+import static org.asynchttpclient.Dsl.asyncHttpClient;
 
 /**
  * Binding module for the sla processor.
@@ -76,6 +80,27 @@ public class SlaModule extends AbstractModule {
         description = "Metric categories collected for non production tasks.",
         splitter = CommaSplitter.class)
     public List<MetricCategory> slaNonProdMetrics = ImmutableList.of();
+
+    @Parameter(names = "-sla_coordinator_timeout",
+        validateValueWith = PositiveAmount.class,
+        description = "Timeout interval for communicating with Coordinator.")
+    public TimeAmount slaCoordinatorTimeout = new TimeAmount(1, Time.MINUTES);
+
+    @Parameter(names = "-max_parallel_coordinated_maintenance",
+        description = "Maximum number of coordinators that can be contacted in 
parallel.")
+    public Integer maxParallelCoordinators = 10;
+
+    @Parameter(names = "-min_required_instances_for_sla_check",
+        description = "Minimum number of instances required for a job to be 
eligible for SLA "
+            + "check. This does not apply to jobs that have a 
CoordinatorSlaPolicy.")
+    public Integer minRequiredInstances = 20;
+
+    @Parameter(names = "-max_sla_duration_secs",
+        validateValueWith = PositiveAmount.class,
+        description = "Maximum duration window for which SLA requirements are 
to be satisfied."
+            + "This does not apply to jobs that have a CoordinatorSlaPolicy."
+    )
+    public TimeAmount maxSlaDuration = new TimeAmount(2, Time.HOURS);
   }
 
   @VisibleForTesting
@@ -104,6 +129,36 @@ public class SlaModule extends AbstractModule {
 
     bind(SlaUpdater.class).in(Singleton.class);
     
SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(SlaUpdater.class);
+
+    DefaultAsyncHttpClientConfig config = new 
DefaultAsyncHttpClientConfig.Builder()
+        
.setConnectTimeout(options.slaCoordinatorTimeout.as(Time.MILLISECONDS).intValue())
+        
.setHandshakeTimeout(options.slaCoordinatorTimeout.as(Time.MILLISECONDS).intValue())
+        
.setSslSessionTimeout(options.slaCoordinatorTimeout.as(Time.MILLISECONDS).intValue())
+        
.setReadTimeout(options.slaCoordinatorTimeout.as(Time.MILLISECONDS).intValue())
+        
.setRequestTimeout(options.slaCoordinatorTimeout.as(Time.MILLISECONDS).intValue())
+        .setKeepAliveStrategy(new DefaultKeepAliveStrategy())
+        .build();
+    AsyncHttpClient httpClient = asyncHttpClient(config);
+
+    bind(AsyncHttpClient.class)
+        .annotatedWith(SlaManager.HttpClient.class)
+        .toInstance(httpClient);
+
+    bind(new TypeLiteral<Integer>() { })
+        .annotatedWith(SlaManager.MinRequiredInstances.class)
+        .toInstance(options.minRequiredInstances);
+
+    bind(new TypeLiteral<Integer>() { })
+        .annotatedWith(SlaManager.MaxParallelCoordinators.class)
+        .toInstance(options.maxParallelCoordinators);
+
+    bind(ScheduledExecutorService.class)
+        .annotatedWith(SlaManager.SlaManagerExecutor.class)
+        .toInstance(AsyncUtil.loggingScheduledExecutor(
+            options.maxParallelCoordinators,
+            "SlaManager-%d", LOG));
+
+    bind(SlaManager.class).in(javax.inject.Singleton.class);
   }
 
   // TODO(ksweeney): This should use AbstractScheduledService.

Reply via email to