http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java 
b/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java
new file mode 100644
index 0000000..759a1bc
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java
@@ -0,0 +1,1239 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.TypeLiteral;
+
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.CoordinatorSlaPolicy;
+import org.apache.aurora.gen.CountSlaPolicy;
+import org.apache.aurora.gen.PercentageSlaPolicy;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.ServerInfo;
+import org.apache.aurora.gen.SlaPolicy;
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.IServerInfo;
+import org.apache.aurora.scheduler.storage.entities.ISlaPolicy;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TSimpleJSONProtocol;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.TIER_MANAGER;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
+
+public class SlaManagerTest extends EasyMockTest {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SlaManagerTest.class);
+  private static final String CLUSTER_NAME = "test_cluster";
+  private static final String STATS_URL_PREFIX = "fake_url";
+  private static final String HOST_A = "a";
+
+  private static final ISlaPolicy PERCENTAGE_SLA_POLICY = ISlaPolicy.build(
+      SlaPolicy.percentageSlaPolicy(
+          new PercentageSlaPolicy()
+              .setPercentage(66)
+              .setDurationSecs(1800)));
+
+  private static final ISlaPolicy COUNT_SLA_POLICY = ISlaPolicy.build(
+      SlaPolicy.countSlaPolicy(
+          new CountSlaPolicy()
+              .setCount(2)
+              .setDurationSecs(1800)));
+
+  private AsyncHttpClient httpClient;
+  private SlaManager slaManager;
+  private StorageTestUtil storageUtil;
+  private StateManager stateManager;
+  private IServerInfo serverInfo;
+  private Server jettyServer;
+  private CountDownLatch coordinatorResponded;
+
+  @Before
+  public void setUp() {
+    jettyServer = new Server(0); // Start Jetty server with ephemeral port
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    stateManager = createMock(StateManager.class);
+    httpClient = new DefaultAsyncHttpClient();
+    coordinatorResponded = new CountDownLatch(1);
+
+    serverInfo = IServerInfo.build(
+        new ServerInfo()
+            .setClusterName(CLUSTER_NAME)
+            .setStatsUrlPrefix(STATS_URL_PREFIX));
+
+    Injector injector = Guice.createInjector(
+        new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(Storage.class).toInstance(storageUtil.storage);
+            bind(StateManager.class).toInstance(stateManager);
+            bind(StatsProvider.class).toInstance(new FakeStatsProvider());
+            bind(TierManager.class).toInstance(TIER_MANAGER);
+            bind(AsyncHttpClient.class)
+                .annotatedWith(SlaManager.HttpClient.class)
+                .toInstance(httpClient);
+
+            bind(new TypeLiteral<Integer>() { })
+                .annotatedWith(SlaManager.MinRequiredInstances.class)
+                .toInstance(2);
+
+            bind(new TypeLiteral<Integer>() { })
+                .annotatedWith(SlaManager.MaxParallelCoordinators.class)
+                .toInstance(10);
+
+            bind(ScheduledExecutorService.class)
+                .annotatedWith(SlaManager.SlaManagerExecutor.class)
+                .toInstance(AsyncUtil.loggingScheduledExecutor(
+                    10, "SlaManagerTest-%d", LOG));
+
+            bind(IServerInfo.class).toInstance(serverInfo);
+          }
+        }
+    );
+    slaManager = injector.getInstance(SlaManager.class);
+
+    addTearDown(() -> jettyServer.stop());
+  }
+
+  private static IScheduledTask makeTask(
+      String taskId,
+      int instanceId,
+      ScheduleStatus status) {
+    return makeTask(taskId, instanceId, status, 1000, true);
+  }
+
+  private static IScheduledTask makeTask(
+      String taskId,
+      int instanceId,
+      long runningSince) {
+    return makeTask(taskId, instanceId, RUNNING, runningSince, true);
+  }
+
+  private static IScheduledTask makeTask(
+      String taskId,
+      int instanceId,
+      ScheduleStatus status,
+      long runningSince,
+      boolean prod) {
+    ScheduledTask builder = TaskTestUtil.addStateTransition(
+        TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB, instanceId, prod),
+        status,
+        runningSince).newBuilder();
+    builder.getAssignedTask().setSlaveHost(SlaManagerTest.HOST_A);
+    return IScheduledTask.build(builder);
+  }
+
+  /**
+   * Verifies that SLA check passes and the supplied {@link 
Storage.MutateWork} gets executed
+   * for a job that has {@link CountSlaPolicy#count} + 1 tasks that have been 
RUNNING for
+   * the required {@link CountSlaPolicy#durationSecs} with {@link 
CountSlaPolicy#count} set to 2.
+   */
+  @Test
+  public void testCheckCountSlaPassesThenActs() {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    IScheduledTask task2 = makeTask("taskB", 2, RUNNING);
+    IScheduledTask task3 = makeTask("taskC", 3, RUNNING);
+
+    // mock calls to fetch all active tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active()))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    // mock calls to fetch all RUNNING tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING)))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    // expect that the fetchTask in the work is called, after sla check passes
+    
expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId()))
+        .andReturn(Optional.of(task1));
+
+    control.replay();
+
+    slaManager.checkSlaThenAct(
+        task1,
+        COUNT_SLA_POLICY,
+        storeProvider -> storeProvider
+            .getUnsafeTaskStore()
+            .fetchTask(task1.getAssignedTask().getTaskId()),
+        false);
+  }
+
+  /**
+   * Verifies that SLA check gets overridden, passes and the supplied {@link 
Storage.MutateWork} is
+   * executed for a job that has and aggressive {@link CountSlaPolicy#count} > 
total number of
+   * instances.
+   */
+  @Test
+  public void testCheckCountSlaOverridesAggressiveCountPassesThenActs() {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    IScheduledTask task2 = makeTask("taskB", 2, RUNNING);
+    IScheduledTask task3 = makeTask("taskC", 3, RUNNING);
+
+    // mock calls to fetch all active tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active()))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    // mock calls to fetch all RUNNING tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING)))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    // expect that the fetchTask in the work is called, after sla check passes
+    
expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId()))
+        .andReturn(Optional.of(task1));
+
+    control.replay();
+
+    slaManager.checkSlaThenAct(
+        task1,
+        ISlaPolicy.build(
+            SlaPolicy.countSlaPolicy(
+                new CountSlaPolicy()
+                    .setCount(3)
+                    .setDurationSecs(1800))),
+        storeProvider -> storeProvider
+            .getUnsafeTaskStore()
+            .fetchTask(task1.getAssignedTask().getTaskId()),
+        false);
+  }
+
+  /**
+   * Verifies that SLA check gets overridden, fails and the supplied {@link 
Storage.MutateWork} is
+   * not executed for a job that has and aggressive {@link 
CountSlaPolicy#count} > total number of
+   * instances.
+   */
+  @Test
+  public void testCheckCountSlaOverridesAggressiveCountFailsAct() {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    IScheduledTask task2 = makeTask("taskB", 2, RUNNING);
+    IScheduledTask task3 = makeTask("taskC", 3, PENDING);
+
+    // mock calls to fetch all active tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active()))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    // mock calls to fetch all RUNNING tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING)))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    control.replay();
+
+    slaManager.checkSlaThenAct(
+        task1,
+        ISlaPolicy.build(
+            SlaPolicy.countSlaPolicy(
+                new CountSlaPolicy()
+                    .setCount(3)
+                    .setDurationSecs(1800))),
+        storeProvider -> storeProvider
+            .getUnsafeTaskStore()
+            .fetchTask(task1.getAssignedTask().getTaskId()),
+        false);
+  }
+
+  /**
+   * Verifies that SLA check fails and the supplied {@link Storage.MutateWork} 
does not get
+   * executed for a job that has {@link CountSlaPolicy#count} tasks that have 
been RUNNING for
+   * the required {@link CountSlaPolicy#durationSecs} and 1 task in PENDING 
with
+   * {@link CountSlaPolicy#count} set to 2.
+   */
+  @Test
+  public void testCheckCountSlaFailsDoesNotAct() {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    IScheduledTask task2 = makeTask("taskB", 2, RUNNING);
+    IScheduledTask task3 = makeTask("taskC", 3, PENDING);
+
+    // mock calls to fetch all active tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active()))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    // mock calls to fetch all RUNNING tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING)))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    control.replay();
+
+    slaManager.checkSlaThenAct(
+        task1,
+        COUNT_SLA_POLICY,
+        storeProvider -> storeProvider
+            .getUnsafeTaskStore()
+            .fetchTask(task1.getAssignedTask().getTaskId()),
+        false);
+  }
+
+  /**
+   * Verifies that SLA check is skipped and the supplied {@link 
Storage.MutateWork} gets
+   * executed for a job that does not meet {@link 
SlaManager#minRequiredInstances}.
+   */
+  @Test
+  public void testCheckCountSlaFailsDueToMinRequiredInstancesActs() {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+
+    // mock calls to fetch all active tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active()))
+        .andReturn(ImmutableSet.of(task1));
+
+    // expect that the fetchTask in the work is called, after sla check passes
+    
expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId()))
+        .andReturn(Optional.of(task1));
+
+    control.replay();
+
+    slaManager.checkSlaThenAct(
+        task1,
+        ISlaPolicy.build(
+            SlaPolicy.countSlaPolicy(
+                new CountSlaPolicy()
+                    .setCount(0)
+                    .setDurationSecs(0))),
+        storeProvider -> storeProvider
+            .getUnsafeTaskStore()
+            .fetchTask(task1.getAssignedTask().getTaskId()),
+        false);
+  }
+
+  /**
+   * Verifies that SLA check is skipped and the supplied {@link 
Storage.MutateWork} gets
+   * executed for a job is not the correct tier.
+   */
+  @Test
+  public void testCheckCountSlaFailsDueToTierActs() {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING, 1000, false);
+
+    // mock calls to fetch all active tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active()))
+        .andReturn(ImmutableSet.of(task1));
+
+    // expect that the fetchTask in the work is called, after sla check passes
+    
expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId()))
+        .andReturn(Optional.of(task1));
+
+    control.replay();
+
+    slaManager.checkSlaThenAct(
+        task1,
+        ISlaPolicy.build(
+            SlaPolicy.countSlaPolicy(
+                new CountSlaPolicy()
+                    .setCount(0)
+                    .setDurationSecs(0))),
+        storeProvider -> storeProvider
+            .getUnsafeTaskStore()
+            .fetchTask(task1.getAssignedTask().getTaskId()),
+        false);
+  }
+
+  /**
+   * Verifies that SLA check fails and the supplied {@link Storage.MutateWork} 
does not get
+   * executed for a job that has {@link CountSlaPolicy#count} tasks that have 
been RUNNING for
+   * the required {@link CountSlaPolicy#durationSecs} and 1 task in RUNNING 
for less than
+   * {@link CountSlaPolicy#durationSecs} with {@link CountSlaPolicy#count} set 
to 2.
+   */
+  @Test
+  public void testCheckCountSlaFailsDueToRunningTimeDoesNotAct() {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    IScheduledTask task2 = makeTask("taskB", 2, RUNNING);
+    IScheduledTask task3 = makeTask("taskC", 3, System.currentTimeMillis());
+
+    // mock calls to fetch all active tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active()))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    // mock calls to fetch all RUNNING tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING)))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    control.replay();
+
+    slaManager.checkSlaThenAct(
+        task1,
+        COUNT_SLA_POLICY,
+        storeProvider -> storeProvider
+            .getUnsafeTaskStore()
+            .fetchTask(task1.getAssignedTask().getTaskId()),
+        false);
+  }
+
+  /**
+   * Verifies that when {@code force} is {@code True} supplied {@link 
Storage.MutateWork} gets
+   * executed for a job that has 1 task in RUNNING for less than {@link 
CountSlaPolicy#durationSecs}
+   * with {@link CountSlaPolicy#count} set to 2, without checking SLA.
+   */
+  @Test
+  public void testCheckCountSlaCheckForceAct() {
+    IScheduledTask task1 = makeTask("taskA", 1, System.currentTimeMillis());
+
+    // expect that the fetchTask is the work is called after force
+    
expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId()))
+        .andReturn(Optional.of(task1));
+
+    control.replay();
+
+    slaManager.checkSlaThenAct(
+        task1,
+        COUNT_SLA_POLICY,
+        storeProvider -> storeProvider
+            .getUnsafeTaskStore()
+            .fetchTask(task1.getAssignedTask().getTaskId()),
+        true);
+  }
+
+  /**
+   * Verifies that SLA check passes and the supplied {@link 
Storage.MutateWork} gets executed
+   * for a job that has more {@link PercentageSlaPolicy#percentage} tasks that 
have been RUNNING for
+   * the required {@link PercentageSlaPolicy#durationSecs} with
+   * {@link PercentageSlaPolicy#percentage} set to 66.
+   */
+  @Test
+  public void testCheckPercentageSlaPassesThenActs() {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    IScheduledTask task2 = makeTask("taskB", 2, RUNNING);
+    IScheduledTask task3 = makeTask("taskC", 3, RUNNING);
+
+    // mock calls to fetch all active tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active()))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    // mock calls to fetch all RUNNING tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING)))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    // expect that the fetchTask in the work is called, after sla check passes
+    
expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId()))
+        .andReturn(Optional.of(task1));
+
+    control.replay();
+
+    slaManager.checkSlaThenAct(
+        task1,
+        PERCENTAGE_SLA_POLICY,
+        storeProvider -> storeProvider
+            .getUnsafeTaskStore()
+            .fetchTask(task1.getAssignedTask().getTaskId()),
+        false);
+  }
+
+  /**
+   * Verifies that SLA check overridden and the supplied {@link 
Storage.MutateWork} gets executed
+   * for a job that requires aggressive {@link PercentageSlaPolicy#percentage} 
tasks to be RUNNING
+   * for the required {@link PercentageSlaPolicy#durationSecs} with
+   * {@link PercentageSlaPolicy#percentage} set to 67.
+   */
+  @Test
+  public void testCheckPercentageSlaWithAggressivePercentagePassesThenActs() {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    IScheduledTask task2 = makeTask("taskB", 2, RUNNING);
+    IScheduledTask task3 = makeTask("taskC", 3, RUNNING);
+
+    // mock calls to fetch all active tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active()))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    // mock calls to fetch all RUNNING tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING)))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    // expect that the fetchTask in the work is called, after sla check passes
+    
expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId()))
+        .andReturn(Optional.of(task1));
+
+    control.replay();
+
+    // We set the required running percentage to 67 which will allow 0 
instances to be in
+    // maintenance is overridden to allow exactly 1 instance to be in 
maintenance.
+    slaManager.checkSlaThenAct(
+        task1,
+        ISlaPolicy.build(
+            SlaPolicy.percentageSlaPolicy(
+                new PercentageSlaPolicy()
+                    .setPercentage(67)
+                    .setDurationSecs(1800))),
+        storeProvider -> storeProvider
+            .getUnsafeTaskStore()
+            .fetchTask(task1.getAssignedTask().getTaskId()),
+        false);
+  }
+
+  /**
+   * Verifies that SLA check is overridden and the supplied {@link 
Storage.MutateWork} gets
+   * executed for a job that requires aggressive {@link 
PercentageSlaPolicy#percentage} tasks to
+   * be RUNNING for the required {@link PercentageSlaPolicy#durationSecs} with
+   * {@link PercentageSlaPolicy#percentage} set to 67.
+   */
+  @Test
+  public void testCheckPercentageSlaWithAggressivePercentageFailsAct() {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    IScheduledTask task2 = makeTask("taskB", 2, RUNNING);
+    IScheduledTask task3 = makeTask("taskC", 3, PENDING);
+
+    // mock calls to fetch all active tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active()))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    // mock calls to fetch all RUNNING tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING)))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    control.replay();
+
+    // We set the required running percentage to 67 which will allow 0 
instances to be in
+    // maintenance is not overridden to allow 1 instance to be in maintenance.
+    slaManager.checkSlaThenAct(
+        task1,
+        ISlaPolicy.build(
+            SlaPolicy.percentageSlaPolicy(
+                new PercentageSlaPolicy()
+                    .setPercentage(67)
+                    .setDurationSecs(1800))),
+        storeProvider -> storeProvider
+            .getUnsafeTaskStore()
+            .fetchTask(task1.getAssignedTask().getTaskId()),
+        false);
+  }
+
+  /**
+   * Verifies that SLA check fails and the supplied {@link Storage.MutateWork} 
does not get
+   * executed for a job that has {@link PercentageSlaPolicy#percentage} tasks 
that have been
+   * RUNNING for the required {@link PercentageSlaPolicy#durationSecs} and 1 
task in PENDING with
+   * {@link PercentageSlaPolicy#percentage} set to 66.
+   */
+  @Test
+  public void testCheckPercentageSlaFailsDoesNotAct() {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    IScheduledTask task2 = makeTask("taskB", 2, RUNNING);
+    IScheduledTask task3 = makeTask("taskC", 3, PENDING);
+
+    // mock calls to fetch all active tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active()))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    // mock calls to fetch all RUNNING tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING)))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    control.replay();
+
+    slaManager.checkSlaThenAct(
+        task1,
+        PERCENTAGE_SLA_POLICY,
+        storeProvider -> storeProvider
+            .getUnsafeTaskStore()
+            .fetchTask(task1.getAssignedTask().getTaskId()),
+        false);
+  }
+
+  /**
+   * Verifies that SLA check is skipped and the supplied {@link 
Storage.MutateWork} gets
+   * executed for a job that does not meet {@link 
SlaManager#minRequiredInstances}.
+   */
+  @Test
+  public void testCheckPercentageSlaFailsDueToMinRequiredInstancesActs() {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+
+    // mock calls to fetch all active tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active()))
+        .andReturn(ImmutableSet.of(task1));
+
+    // expect that the fetchTask in the work is called, after sla check passes
+    
expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId()))
+        .andReturn(Optional.of(task1));
+
+    control.replay();
+
+    slaManager.checkSlaThenAct(
+        task1,
+        ISlaPolicy.build(
+            SlaPolicy.percentageSlaPolicy(
+                new PercentageSlaPolicy()
+                    .setPercentage(0)
+                    .setDurationSecs(0))),
+        storeProvider -> storeProvider
+            .getUnsafeTaskStore()
+            .fetchTask(task1.getAssignedTask().getTaskId()),
+        false);
+  }
+
+  /**
+   * Verifies that SLA check is skipped and the supplied {@link 
Storage.MutateWork} gets
+   * executed for a job is not correct tier.
+   */
+  @Test
+  public void testCheckPercentageSlaFailsDueToTierActs() {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING, 1000, false);
+
+    // mock calls to fetch all active tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active()))
+        .andReturn(ImmutableSet.of(task1));
+
+    // expect that the fetchTask in the work is called, after sla check passes
+    
expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId()))
+        .andReturn(Optional.of(task1));
+
+    control.replay();
+
+    slaManager.checkSlaThenAct(
+        task1,
+        ISlaPolicy.build(
+            SlaPolicy.percentageSlaPolicy(
+                new PercentageSlaPolicy()
+                    .setPercentage(0)
+                    .setDurationSecs(0))),
+        storeProvider -> storeProvider
+            .getUnsafeTaskStore()
+            .fetchTask(task1.getAssignedTask().getTaskId()),
+        false);
+  }
+
+  /**
+   * Verifies that SLA check fails and the supplied {@link Storage.MutateWork} 
does not get
+   * executed for a job that has {@link PercentageSlaPolicy#percentage} tasks 
that have been
+   * RUNNING for the required {@link PercentageSlaPolicy#durationSecs} and 1 
task in RUNNING for
+   * less than {@link PercentageSlaPolicy#durationSecs} with {@link 
PercentageSlaPolicy#percentage}
+   * set to 66.
+   */
+  @Test
+  public void testCheckPercentageSlaFailsDueToRunningTimeDoesNotAct() {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    IScheduledTask task2 = makeTask("taskB", 2, RUNNING);
+    IScheduledTask task3 = makeTask("taskC", 3, System.currentTimeMillis());
+
+    // mock calls to fetch all active tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active()))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    // mock calls to fetch all RUNNING tasks for the job for sla calculation
+    
expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING)))
+        .andReturn(ImmutableSet.of(task1, task2, task3));
+
+    control.replay();
+
+    slaManager.checkSlaThenAct(
+        task1,
+        PERCENTAGE_SLA_POLICY,
+        storeProvider -> storeProvider
+            .getUnsafeTaskStore()
+            .fetchTask(task1.getAssignedTask().getTaskId()),
+        false);
+  }
+
+  /**
+   * Verifies that when {@code force} is {@code True} the supplied {@link 
Storage.MutateWork} gets
+   * executed for a job that has 1 task in RUNNING for less than
+   * {@link PercentageSlaPolicy#durationSecs}
+   * with {@link PercentageSlaPolicy#percentage} set to 66, without checking 
SLA.
+   */
+  @Test
+  public void testCheckPercentageSlaCheckForceAct() {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+
+    // expect that the fetchTask is the work is called after force
+    
expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId()))
+        .andReturn(Optional.of(task1));
+
+    control.replay();
+
+    slaManager.checkSlaThenAct(
+        task1,
+        PERCENTAGE_SLA_POLICY,
+        storeProvider -> storeProvider
+            .getUnsafeTaskStore()
+            .fetchTask(task1.getAssignedTask().getTaskId()),
+        true);
+  }
+
+  /**
+   * Verifies that SLA check passes and the supplied {@link 
Storage.MutateWork} gets executed
+   * for a job when {@link CoordinatorSlaPolicy#coordinatorUrl} responds
+   * with {@code {"drain": true}}.
+   */
+  @Test
+  public void testCheckCoordinatorSlaPassesThenActs() throws Exception {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    CountDownLatch workCalled = new CountDownLatch(1);
+
+    jettyServer.setHandler(mockCoordinatorResponse(task1, "{\"drain\": 
true}"));
+    jettyServer.start();
+
+    // expect that the fetchTask in the work is called, after sla check passes
+    
expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId()))
+        .andReturn(Optional.of(task1));
+
+    control.replay();
+
+    while (!coordinatorResponded.await(100, TimeUnit.MILLISECONDS)) {
+      slaManager.checkSlaThenAct(
+          task1,
+          createCoordinatorSlaPolicy(),
+          storeProvider -> {
+            // set the marker to indicate that we performed the work
+            workCalled.countDown();
+            storeProvider
+                .getUnsafeTaskStore()
+                .fetchTask(task1.getAssignedTask().getTaskId());
+            return null;
+          },
+          false);
+    }
+
+    // wait until we are sure that the server has responded
+    coordinatorResponded.await();
+    workCalled.await();
+
+    assertEquals(0, coordinatorResponded.getCount());
+    // check the work was called
+    assertEquals(0, workCalled.getCount());
+  }
+
+  /**
+   * Verifies that SLA check passes and the supplied {@link 
Storage.MutateWork} gets executed
+   * for a job when {@link CoordinatorSlaPolicy#coordinatorUrl} responds
+   * with {@code {"drain": true}}.
+   */
+  @Test
+  public void testCheckCoordinatorSlaPassesThenActsCustomStatusKey() throws 
Exception {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    CountDownLatch workCalled = new CountDownLatch(1);
+
+    jettyServer.setHandler(mockCoordinatorResponse(task1, "{\"custom-key\": 
true}"));
+    jettyServer.start();
+
+    // expect that the fetchTask in the work is called, after sla check passes
+    
expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId()))
+        .andReturn(Optional.of(task1));
+
+    control.replay();
+
+    while (!coordinatorResponded.await(100, TimeUnit.MILLISECONDS)) {
+      slaManager.checkSlaThenAct(
+          task1,
+          createCoordinatorSlaPolicy("custom-key"),
+          storeProvider -> {
+            // set the marker to indicate that we performed the work
+            workCalled.countDown();
+            storeProvider
+                .getUnsafeTaskStore()
+                .fetchTask(task1.getAssignedTask().getTaskId());
+            return null;
+          },
+          false);
+    }
+
+    workCalled.await();
+
+    assertEquals(0, coordinatorResponded.getCount());
+    // check the work was called
+    assertEquals(0, workCalled.getCount());
+  }
+
+  /**
+   * Verifies that SLA check passes and the supplied {@link 
Storage.MutateWork} does not get
+   * executed for a job when {@link CoordinatorSlaPolicy#coordinatorUrl} 
responds
+   * with {@code {"drain": false}}.
+   */
+  @Test
+  public void testCheckCoordinatorSlaFailsDoesNotAct() throws Exception {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    CountDownLatch workCalled = new CountDownLatch(1);
+
+    jettyServer.setHandler(mockCoordinatorResponse(task1, "{\"drain\": 
false}"));
+    jettyServer.start();
+
+    control.replay();
+
+    while (!coordinatorResponded.await(100, TimeUnit.MILLISECONDS)) {
+      slaManager.checkSlaThenAct(
+          task1,
+          createCoordinatorSlaPolicy(),
+          storeProvider -> {
+            // set the marker to indicate that we performed the work
+            workCalled.countDown();
+            storeProvider
+                .getUnsafeTaskStore()
+                .fetchTask(task1.getAssignedTask().getTaskId());
+            return null;
+          },
+          false);
+    }
+
+    try {
+      workCalled.await(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      // expected
+    }
+
+    assertEquals(0, coordinatorResponded.getCount());
+    // check the work was not called
+    assertEquals(1, workCalled.getCount());
+  }
+
+  /**
+   * Verifies that when {@code force} is {@code True} the supplied {@link 
Storage.MutateWork} gets
+   * executed without checking the SLA.
+   */
+  @Test
+  public void testCheckCoordinatorSlaCheckForceAct() throws Exception {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    CountDownLatch workCalled = new CountDownLatch(1);
+
+    jettyServer.setHandler(mockCoordinatorResponse(task1, "{\"drain\": 
false}"));
+    jettyServer.start();
+
+    // expect that the fetchTask in the work is called after force
+    
expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId()))
+        .andReturn(Optional.of(task1));
+
+    control.replay();
+
+    slaManager.checkSlaThenAct(
+        task1,
+        createCoordinatorSlaPolicy(),
+        storeProvider -> {
+          // set the marker to indicate that we performed the work
+          workCalled.countDown();
+          storeProvider
+              .getUnsafeTaskStore()
+              .fetchTask(task1.getAssignedTask().getTaskId());
+          return null;
+        },
+        true);
+
+    workCalled.await();
+
+    // coordinator is not contacted
+    assertEquals(1, coordinatorResponded.getCount());
+    // check the work was called
+    assertEquals(0, workCalled.getCount());
+  }
+
+  /**
+   * Verifies that SLA check fails and the supplied {@link Storage.MutateWork} 
does not get
+   * executed for a job when {@link CoordinatorSlaPolicy#coordinatorUrl} 
responds
+   * with error.
+   */
+  @Test
+  public void testCheckCoordinatorSlaErrorDoesNotAct() throws Exception {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    CountDownLatch workCalled = new CountDownLatch(1);
+
+    jettyServer.setHandler(mockCoordinatorError());
+    jettyServer.start();
+
+    control.replay();
+
+    while (!coordinatorResponded.await(100, TimeUnit.MILLISECONDS)) {
+      slaManager.checkSlaThenAct(
+          task1,
+          createCoordinatorSlaPolicy(),
+          storeProvider -> {
+            // set the marker to indicate that we performed the work
+            workCalled.countDown();
+            storeProvider
+                .getUnsafeTaskStore()
+                .fetchTask(task1.getAssignedTask().getTaskId());
+            return null;
+          },
+          false);
+    }
+
+    try {
+      workCalled.await(1, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      // expected
+    }
+
+    assertEquals(0, coordinatorResponded.getCount());
+    // check the work was not called
+    assertEquals(1, workCalled.getCount());
+  }
+
+  /**
+   * Verifies that SLA check fails and the supplied {@link Storage.MutateWork} 
does not get
+   * executed for a job when {@link CoordinatorSlaPolicy#coordinatorUrl} 
throws.
+   */
+  @Test
+  public void testCheckCoordinatorSlaThrowsDoesNotAct() throws Exception {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    CountDownLatch workCalled = new CountDownLatch(1);
+
+    jettyServer.setHandler(mockCoordinatorError());
+    jettyServer.start();
+
+    control.replay();
+
+    slaManager.checkSlaThenAct(
+        task1,
+        ISlaPolicy.build(SlaPolicy.coordinatorSlaPolicy(
+            new CoordinatorSlaPolicy()
+                .setCoordinatorUrl("http://does.not.exist.com";))),
+        storeProvider -> {
+          // set the marker to indicate that we performed the work
+          workCalled.countDown();
+          storeProvider
+              .getUnsafeTaskStore()
+              .fetchTask(task1.getAssignedTask().getTaskId());
+          return null;
+        },
+        false);
+
+    try {
+      workCalled.await(1, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      // expected
+    }
+
+    // check the work was not called
+    assertEquals(1, workCalled.getCount());
+  }
+
+  /**
+   * Start 2 actions simultaneously and make the first one to enter the 
coordinator lock to block
+   * on a semaphore. Only when the semaphore is released will the next action 
be able acquire the
+   * lock. At the end both actions should have completed.
+   */
+  @Test
+  public void testCheckCoordinatorSlaNoLockDoesNotAct() throws Exception {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    IScheduledTask task2 = makeTask("taskB", 2, RUNNING);
+
+    CountDownLatch blocked1 = new CountDownLatch(1);
+    CountDownLatch blocked2 = new CountDownLatch(1);
+    CountDownLatch action1 = new CountDownLatch(1);
+    CountDownLatch action2 = new CountDownLatch(1);
+    CountDownLatch finishAction1 = new CountDownLatch(1);
+    CountDownLatch finishAction2 = new CountDownLatch(1);
+
+    jettyServer.setHandler(mockCoordinatorResponses("{\"drain\": true}"));
+    jettyServer.start();
+
+    
expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId()))
+        .andReturn(Optional.of(task1));
+
+    
expect(storageUtil.taskStore.fetchTask(task2.getAssignedTask().getTaskId()))
+        .andReturn(Optional.of(task2));
+
+    control.replay();
+
+    // kick off a new thread to attempt starting action1
+    new Thread(() -> {
+      try {
+        // wait until action1 enters the lock
+        while (!action1.await(100, TimeUnit.MILLISECONDS)) {
+          // start action1
+          slaManager.checkSlaThenAct(
+              task1,
+              createCoordinatorSlaPolicy(),
+              storeProvider -> {
+                LOG.info("Starting action1 for task:{}", 
slaManager.getTaskKey(task1));
+                // set the marker to indicate that we started the work
+                action1.countDown();
+                storeProvider
+                    .getUnsafeTaskStore()
+                    .fetchTask(task1.getAssignedTask().getTaskId());
+                // we will block here to make sure that the lock is held
+                blocked1.await();
+                finishAction1.countDown();
+                LOG.info("Finished action1 for task:{}", 
slaManager.getTaskKey(task1));
+                return null;
+              },
+              false);
+        }
+      } catch (InterruptedException e) {
+        fail();
+      }
+    }).start();
+
+    // wait for action to ge the lock
+    action1.await();
+
+    // kick off a new thread to attempt starting action2
+    new Thread(() -> {
+      try {
+        // wait until action2 enters the lock
+        while (!action2.await(100, TimeUnit.MILLISECONDS)) {
+          // start action2
+          slaManager.checkSlaThenAct(
+              task2,
+              createCoordinatorSlaPolicy(),
+              storeProvider -> {
+                LOG.info("Starting action2 for task:{}", 
slaManager.getTaskKey(task2));
+                // set the marker to indicate that we started the work
+                action2.countDown();
+                storeProvider
+                    .getUnsafeTaskStore()
+                    .fetchTask(task2.getAssignedTask().getTaskId());
+                // we will block here to make sure that the lock is held
+                blocked2.await();
+                finishAction2.countDown();
+                LOG.info("Finished action2 for task:{}", 
slaManager.getTaskKey(task2));
+                return null;
+              },
+              false);
+        }
+      } catch (InterruptedException e) {
+        fail();
+      }
+    }).start();
+
+    // both actions should not have entered critical section
+    assertNotEquals(action1.getCount(), action2.getCount());
+    // action1 should have entered the lock
+    assertEquals(0, action1.getCount());
+    // action2 should not have entered the lock
+    assertEquals(1, action2.getCount());
+
+    // unblock blocked action1
+    blocked1.countDown();
+    // wait for both of the actions to finish
+    finishAction1.await();
+
+    // wait for the second action to enter the lock
+    action2.await();
+
+    // action2 should have entered the lock
+    assertEquals(0, action2.getCount());
+
+    // unblock blocked action2
+    blocked2.countDown();
+
+    finishAction2.await();
+  }
+
+  /**
+   * Start 2 actions simultaneously with different coordinator urls, so they 
can both enter the
+   * appropriate coordinator locks and try to acquire the semaphore. Both the 
actions must be able
+   * to enter the critical area and start the work simultaneously.
+   * At the end only one action would have fully completed the work, since the 
semaphore is not
+   * released.
+   */
+  @Test
+  public void testCheckCoordinatorSlaParallelAct() throws Exception {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    IScheduledTask task2 = makeTask("taskB", 2, RUNNING);
+
+    Semaphore blocked = new Semaphore(1);
+    CountDownLatch action1 = new CountDownLatch(1);
+    CountDownLatch action2 = new CountDownLatch(1);
+    CountDownLatch finished = new CountDownLatch(2);
+
+    jettyServer.setHandler(mockCoordinatorResponses("{\"drain\": true}"));
+    jettyServer.start();
+
+    
expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId()))
+        .andReturn(Optional.of(task1));
+
+    
expect(storageUtil.taskStore.fetchTask(task2.getAssignedTask().getTaskId()))
+        .andReturn(Optional.of(task2));
+
+    control.replay();
+
+    // kick off a new thread to attempt starting action1
+    new Thread(() -> {
+      try {
+        // wait until action1 enters the lock
+        while (!action1.await(100, TimeUnit.MILLISECONDS)) {
+          // start action1
+          slaManager.checkSlaThenAct(
+              task1,
+              ISlaPolicy.build(SlaPolicy.coordinatorSlaPolicy(
+                  new CoordinatorSlaPolicy()
+                      .setCoordinatorUrl(
+                          // Note that the url is different although referring 
to the same server
+                          String.format("http://localhost:%d";, 
jettyServer.getURI().getPort())))),
+              storeProvider -> {
+                LOG.info("Starting action for task:{}", 
slaManager.getTaskKey(task1));
+                // set the marker to indicate that we started the work
+                action1.countDown();
+                storeProvider
+                    .getUnsafeTaskStore()
+                    .fetchTask(task1.getAssignedTask().getTaskId());
+                // we will block here to make sure that the lock is held
+                blocked.acquire();
+                finished.countDown();
+                LOG.info("Finished action for task:{}", 
slaManager.getTaskKey(task1));
+                return null;
+              },
+              false);
+        }
+      } catch (InterruptedException e) {
+        fail();
+      }
+    }).start();
+
+    // kick off a new thread to attempt starting action2
+    new Thread(() -> {
+      try {
+        // wait until action2 enters the lock
+        while (!action2.await(100, TimeUnit.MILLISECONDS)) {
+          // start action2
+          slaManager.checkSlaThenAct(
+              task2,
+              ISlaPolicy.build(SlaPolicy.coordinatorSlaPolicy(
+                  new CoordinatorSlaPolicy()
+                      .setCoordinatorUrl(
+                          // Note that the url is different although referring 
to the same server
+                          String.format("http://0.0.0.0:%d";, 
jettyServer.getURI().getPort())))),
+              storeProvider -> {
+                LOG.info("Starting action for task:{}", 
slaManager.getTaskKey(task2));
+                // set the marker to indicate that we started the work
+                action2.countDown();
+                storeProvider
+                    .getUnsafeTaskStore()
+                    .fetchTask(task2.getAssignedTask().getTaskId());
+                // we will block here to make sure that the lock is held
+                blocked.acquire();
+                finished.countDown();
+                LOG.info("Finished action for task:{}", 
slaManager.getTaskKey(task2));
+                return null;
+              },
+              false);
+        }
+      } catch (InterruptedException e) {
+        fail();
+      }
+    }).start();
+
+    // wait for both of the actions to get the lock
+    action1.await();
+    action2.await();
+
+    assertEquals(0, action1.getCount());
+    assertEquals(0, action2.getCount());
+
+    // only 1 action has fully completed
+    assertEquals(1, finished.getCount());
+
+    // unblock the blocked action
+    blocked.release();
+
+    finished.await();
+
+    // both actions have fully completed
+    assertEquals(0, finished.getCount());
+  }
+
+  private ISlaPolicy createCoordinatorSlaPolicy() {
+    return createCoordinatorSlaPolicy("drain");
+  }
+
+  private ISlaPolicy createCoordinatorSlaPolicy(String statusKey) {
+    return ISlaPolicy.build(SlaPolicy.coordinatorSlaPolicy(
+        new CoordinatorSlaPolicy()
+            .setCoordinatorUrl(String.format("http://localhost:%d";, 
jettyServer.getURI().getPort()))
+            .setStatusKey(statusKey)
+    ));
+  }
+
+  private AbstractHandler mockCoordinatorResponse(
+      IScheduledTask task,
+      String pollResponse) {
+
+    return new AbstractHandler() {
+      @Override
+      public void handle(
+          String target,
+          Request baseRequest,
+          HttpServletRequest request,
+          HttpServletResponse response) throws IOException {
+        try {
+          String taskKey = slaManager.getTaskKey(task);
+          String query = Joiner
+              .on("=")
+              .join(SlaManager.TASK_PARAM, URLEncoder.encode(taskKey, 
"UTF-8"));
+          String body = new TSerializer(new TSimpleJSONProtocol.Factory())
+              .toString(task.newBuilder());
+          if (request.getQueryString().equals(query)
+              && 
request.getReader().lines().collect(Collectors.joining()).equals(body)) {
+            createResponse(baseRequest, response, pollResponse);
+          }
+          coordinatorResponded.countDown();
+        } catch (TException e) {
+          fail();
+        }
+      }
+    };
+  }
+
+  private AbstractHandler mockCoordinatorResponses(String mockResponse) {
+    return new AbstractHandler() {
+      @Override
+      public void handle(String target, Request baseRequest, 
HttpServletRequest request,
+                         HttpServletResponse response) throws IOException {
+        createResponse(baseRequest, response, mockResponse);
+      }
+    };
+  }
+
+  private void createResponse(
+      Request baseRequest,
+      HttpServletResponse response,
+      String mockResponse) throws IOException {
+    response.setStatus(HttpServletResponse.SC_OK);
+    response.getWriter().write(mockResponse);
+    response.getWriter().flush();
+    baseRequest.setHandled(true);
+  }
+
+  private AbstractHandler mockCoordinatorError() {
+    return new AbstractHandler() {
+      @Override
+      public void handle(String target, Request baseRequest, 
HttpServletRequest request,
+                         HttpServletResponse response) {
+        response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+        baseRequest.setHandled(true);
+        coordinatorResponded.countDown();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java 
b/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java
index d37e7a0..b38a1e3 100644
--- a/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java
@@ -30,17 +30,21 @@ import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.common.util.Clock;
 import org.apache.aurora.common.util.testing.FakeClock;
+import org.apache.aurora.gen.ServerInfo;
+import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.app.LifecycleModule;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.config.types.TimeAmount;
 import org.apache.aurora.scheduler.sla.SlaModule.SlaUpdater;
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IServerInfo;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.TIER_MANAGER;
 import static 
org.apache.aurora.scheduler.sla.MetricCalculator.MetricCategory.JOB_UPTIMES;
 import static 
org.apache.aurora.scheduler.sla.MetricCalculator.MetricCategory.MEDIANS;
 import static 
org.apache.aurora.scheduler.sla.MetricCalculator.MetricCategory.PLATFORM_UPTIME;
@@ -50,6 +54,9 @@ import static org.junit.Assert.assertNotNull;
 
 public class SlaModuleTest extends EasyMockTest {
 
+  private static final String CLUSTER_NAME = "test_cluster";
+  private static final String STATS_URL_PREFIX = "fake_url";
+
   private Injector injector;
   private FakeClock clock;
   private StorageTestUtil storageUtil;
@@ -57,7 +64,7 @@ public class SlaModuleTest extends EasyMockTest {
   private SlaModule module;
 
   @Before
-  public void setUp() throws Exception {
+  public void setUp() {
     storageUtil = new StorageTestUtil(this);
     clock = new FakeClock();
     statsProvider = createMock(StatsProvider.class);
@@ -76,6 +83,12 @@ public class SlaModuleTest extends EasyMockTest {
                 bind(Clock.class).toInstance(clock);
                 bind(Storage.class).toInstance(storageUtil.storage);
                 bind(StatsProvider.class).toInstance(statsProvider);
+                bind(TierManager.class).toInstance(TIER_MANAGER);
+                bind(IServerInfo.class).toInstance(
+                    IServerInfo.build(
+                        new ServerInfo()
+                            .setClusterName(CLUSTER_NAME)
+                            .setStatsUrlPrefix(STATS_URL_PREFIX)));
               }
             }).build()
     );

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
 
b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
deleted file mode 100644
index 770846e..0000000
--- 
a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.Executor;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.stats.StatsProvider;
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.HostStatus;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
-import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.TaskTestUtil;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.events.PubsubEventModule;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IHostStatus;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.apache.mesos.v1.Protos;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
-import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
-import static org.apache.aurora.gen.MaintenanceMode.NONE;
-import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED;
-import static org.apache.aurora.gen.ScheduleStatus.KILLED;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static 
org.apache.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl;
-import static 
org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-
-public class MaintenanceControllerImplTest extends EasyMockTest {
-
-  private static final String HOST_A = "a";
-  private static final Set<String> A = ImmutableSet.of(HOST_A);
-  private static final Protos.OfferID OFFER_ID = Protos.OfferID.newBuilder()
-      .setValue("offer-id")
-      .build();
-  private static final Protos.AgentID AGENT_ID = Protos.AgentID.newBuilder()
-      .setValue("agent-id")
-      .build();
-  private static final Protos.FrameworkID FRAMEWORK_ID = 
Protos.FrameworkID.newBuilder()
-      .setValue("framework-id")
-      .build();
-  private static final Protos.URL AGENT_URL = Protos.URL.newBuilder()
-      .setAddress(Protos.Address.newBuilder()
-          .setHostname(HOST_A)
-          .setPort(5051))
-      .setScheme("http")
-      .build();
-  private static final Protos.Unavailability UNAVAILABILITY = 
Protos.Unavailability.newBuilder()
-      .setStart(Protos.TimeInfo.newBuilder()
-          .setNanoseconds(Amount.of(1L, Time.MINUTES).as(Time.NANOSECONDS)))
-      .build();
-
-  private static final Protos.InverseOffer INVERSE_OFFER = 
Protos.InverseOffer.newBuilder()
-      .setId(OFFER_ID)
-      .setAgentId(AGENT_ID)
-      .setUrl(AGENT_URL)
-      .setFrameworkId(FRAMEWORK_ID)
-      .setUnavailability(UNAVAILABILITY)
-      .build();
-
-  private StorageTestUtil storageUtil;
-  private StateManager stateManager;
-  private MaintenanceController maintenance;
-  private EventSink eventSink;
-
-  @Before
-  public void setUp() throws Exception {
-    storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
-    stateManager = createMock(StateManager.class);
-    TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class);
-    expectBatchExecute(batchWorker, storageUtil.storage, control).anyTimes();
-
-    Injector injector = Guice.createInjector(
-        new PubsubEventModule(),
-        new AbstractModule() {
-          @Override
-          protected void configure() {
-            StateModule.bindMaintenanceController(binder());
-            bind(Storage.class).toInstance(storageUtil.storage);
-            bind(StateManager.class).toInstance(stateManager);
-            bind(StatsProvider.class).toInstance(new FakeStatsProvider());
-            bind(Executor.class).annotatedWith(AsyncExecutor.class)
-                .toInstance(MoreExecutors.directExecutor());
-            bind(TaskEventBatchWorker.class).toInstance(batchWorker);
-          }
-        });
-    maintenance = injector.getInstance(MaintenanceController.class);
-    eventSink = PubsubTestUtil.startPubsub(injector);
-  }
-
-  private static IScheduledTask makeTask(String host, String taskId) {
-    ScheduledTask builder = TaskTestUtil.addStateTransition(
-        TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB),
-        RUNNING,
-        1000).newBuilder();
-    builder.getAssignedTask().setSlaveHost(host);
-    return IScheduledTask.build(builder);
-  }
-
-  @Test
-  public void testMaintenanceCycle() {
-    IScheduledTask task1 = makeTask(HOST_A, "taskA");
-    IScheduledTask task2 = makeTask(HOST_A, "taskB");
-
-    expectMaintenanceModeChange(HOST_A, SCHEDULED);
-    expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1, task2));
-    expectTaskDraining(task1);
-    expectTaskDraining(task2);
-    expectMaintenanceModeChange(HOST_A, DRAINING);
-    IHostAttributes attributes =
-        IHostAttributes.build(new 
HostAttributes().setHost(HOST_A).setMode(DRAINING));
-
-    expect(storageUtil.attributeStore.getHostAttributes(HOST_A))
-        .andReturn(Optional.of(attributes)).times(2);
-
-    
expect(storageUtil.attributeStore.getHostAttributes()).andReturn(ImmutableSet.of(attributes));
-    expectFetchTasksByHost(HOST_A, ImmutableSet.of(task2));
-    // TaskA is KILLED and therefore no longer active
-    expectFetchTasksByHost(HOST_A, ImmutableSet.of());
-    expectMaintenanceModeChange(HOST_A, DRAINED);
-    expectMaintenanceModeChange(HOST_A, NONE);
-
-    control.replay();
-
-    assertStatus(HOST_A, SCHEDULED, maintenance.startMaintenance(A));
-    assertStatus(HOST_A, DRAINING, maintenance.drain(A));
-    assertStatus(HOST_A, DRAINING, maintenance.getStatus(A));
-    eventSink.post(
-        TaskStateChange.transition(
-            IScheduledTask.build(task1.newBuilder().setStatus(KILLED)), 
RUNNING));
-    eventSink.post(
-        TaskStateChange.transition(
-            IScheduledTask.build(task2.newBuilder().setStatus(KILLED)), 
RUNNING));
-    assertStatus(HOST_A, NONE, maintenance.endMaintenance(A));
-  }
-
-  @Test
-  public void testUnknownHost() {
-    expect(storageUtil.attributeStore.getHostAttributes("b"))
-        .andReturn(Optional.empty());
-
-    control.replay();
-
-    assertEquals(ImmutableSet.of(),
-        maintenance.startMaintenance(ImmutableSet.of("b")));
-  }
-
-  @Test
-  public void testDrainEmptyHost() {
-    expectMaintenanceModeChange(HOST_A, SCHEDULED);
-    expectFetchTasksByHost(HOST_A, ImmutableSet.of());
-    expectMaintenanceModeChange(HOST_A, DRAINED);
-
-    control.replay();
-
-    assertStatus(HOST_A, SCHEDULED, maintenance.startMaintenance(A));
-    assertStatus(HOST_A, DRAINED, maintenance.drain(A));
-  }
-
-  @Test
-  public void testEndEarly() {
-    expectMaintenanceModeChange(HOST_A, SCHEDULED);
-    expectMaintenanceModeChange(HOST_A, NONE);
-    
expect(storageUtil.attributeStore.getHostAttributes(HOST_A)).andReturn(Optional.of(
-        IHostAttributes.build(new 
HostAttributes().setHost(HOST_A).setMode(NONE))));
-
-    control.replay();
-
-    assertStatus(HOST_A, SCHEDULED, maintenance.startMaintenance(A));
-
-    // End maintenance without DRAINING.
-    assertStatus(HOST_A, NONE, maintenance.endMaintenance(A));
-
-    // Make sure a later transition on the host does not cause any ill effects 
that could surface
-    // from stale internal state.
-    eventSink.post(TaskStateChange.transition(
-        IScheduledTask.build(makeTask(HOST_A, 
"taskA").newBuilder().setStatus(KILLED)), RUNNING));
-  }
-
-  @Test
-  public void testGetMode() {
-    
expect(storageUtil.attributeStore.getHostAttributes(HOST_A)).andReturn(Optional.of(
-        IHostAttributes.build(new 
HostAttributes().setHost(HOST_A).setMode(DRAINING))));
-    
expect(storageUtil.attributeStore.getHostAttributes("unknown")).andReturn(Optional.empty());
-
-    control.replay();
-
-    assertEquals(DRAINING, maintenance.getMode(HOST_A));
-    assertEquals(NONE, maintenance.getMode("unknown"));
-  }
-
-  @Test
-  public void testInverseOfferDrain() {
-    IScheduledTask task1 = makeTask(HOST_A, "taskA");
-    expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1));
-    expectTaskDraining(task1);
-
-    control.replay();
-    maintenance.drainForInverseOffer(INVERSE_OFFER);
-  }
-
-  private void expectTaskDraining(IScheduledTask task) {
-    expect(stateManager.changeState(
-        storageUtil.mutableStoreProvider,
-        Tasks.id(task),
-        Optional.empty(),
-        ScheduleStatus.DRAINING,
-        MaintenanceControllerImpl.DRAINING_MESSAGE))
-        .andReturn(StateChangeResult.SUCCESS);
-  }
-
-  private void expectFetchTasksByHost(String hostName, Set<IScheduledTask> 
tasks) {
-    
expect(storageUtil.taskStore.fetchTasks(Query.slaveScoped(hostName).active())).andReturn(tasks);
-  }
-
-  private void expectMaintenanceModeChange(String hostName, MaintenanceMode 
mode) {
-    IHostAttributes attributes = IHostAttributes.build(new 
HostAttributes().setHost(hostName));
-
-    expect(storageUtil.attributeStore.getHostAttributes(hostName))
-        .andReturn(Optional.of(attributes));
-    IHostAttributes updated = 
IHostAttributes.build(attributes.newBuilder().setMode(mode));
-    
expect(storageUtil.attributeStore.saveHostAttributes(updated)).andReturn(true);
-  }
-
-  private void assertStatus(String host, MaintenanceMode mode, 
Set<IHostStatus> statuses) {
-    assertEquals(ImmutableSet.of(IHostStatus.build(new HostStatus(host, 
mode))), statuses);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
 
b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 2cf66d8..0fc3673 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -50,6 +50,7 @@ import org.apache.aurora.gen.ListBackupsResult;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.MesosContainer;
 import org.apache.aurora.gen.Metadata;
+import org.apache.aurora.gen.PercentageSlaPolicy;
 import org.apache.aurora.gen.PulseJobUpdateResult;
 import org.apache.aurora.gen.QueryRecoveryResult;
 import org.apache.aurora.gen.Range;
@@ -62,6 +63,7 @@ import org.apache.aurora.gen.ResponseDetail;
 import org.apache.aurora.gen.Result;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.SlaPolicy;
 import org.apache.aurora.gen.StartJobUpdateResult;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskConstraint;
@@ -77,10 +79,10 @@ import 
org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
 import org.apache.aurora.scheduler.cron.CronException;
 import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.maintenance.MaintenanceController;
 import org.apache.aurora.scheduler.quota.QuotaCheckResult;
 import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.reconciliation.TaskReconciler;
-import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.UUIDGenerator;
@@ -153,6 +155,7 @@ import static 
org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.NOOP_J
 import static 
org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.NO_CRON;
 import static 
org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.PRUNE_TASKS;
 import static 
org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.RESTART_SHARDS;
+import static 
org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.SLA_DRAIN_HOSTS;
 import static 
org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.START_JOB_UPDATE;
 import static 
org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.START_MAINTENANCE;
 import static 
org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.jobAlreadyExistsMessage;
@@ -1097,6 +1100,59 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
     assertEquals(1L, statsProvider.getLongValue(END_MAINTENANCE));
   }
 
+  @Test
+  public void testSLAHostMaintenance() throws Exception {
+    Set<String> hostnames = ImmutableSet.of("a");
+    SlaPolicy defaultSlaPolicy = SlaPolicy.percentageSlaPolicy(
+        new PercentageSlaPolicy().setPercentage(95));
+    Set<IHostStatus> none = status("a", NONE);
+    Set<IHostStatus> scheduled = status("a", SCHEDULED);
+    Set<IHostStatus> draining = status("a", DRAINING);
+    Set<IHostStatus> drained = status("a", DRAINING);
+    expect(maintenance.getStatus(hostnames)).andReturn(none);
+    expect(maintenance.startMaintenance(hostnames)).andReturn(scheduled);
+    expect(maintenance.slaDrain(hostnames, defaultSlaPolicy, 
10)).andReturn(draining);
+    expect(maintenance.getStatus(hostnames)).andReturn(draining);
+    expect(maintenance.getStatus(hostnames)).andReturn(drained);
+    expect(maintenance.endMaintenance(hostnames)).andReturn(none);
+
+    control.replay();
+
+    Hosts hosts = new Hosts(hostnames);
+
+    assertEquals(
+        IHostStatus.toBuildersSet(none),
+        
thrift.maintenanceStatus(hosts).getResult().getMaintenanceStatusResult()
+            .getStatuses());
+    assertEquals(1L, statsProvider.getLongValue(MAINTENANCE_STATUS));
+    assertEquals(
+        IHostStatus.toBuildersSet(scheduled),
+        thrift.startMaintenance(hosts).getResult().getStartMaintenanceResult()
+            .getStatuses());
+    assertEquals(1L, statsProvider.getLongValue(START_MAINTENANCE));
+    assertEquals(
+        IHostStatus.toBuildersSet(draining),
+        thrift.slaDrainHosts(hosts, defaultSlaPolicy, 10)
+            .getResult()
+            .getDrainHostsResult()
+            .getStatuses());
+    assertEquals(1L, statsProvider.getLongValue(SLA_DRAIN_HOSTS));
+    assertEquals(
+        IHostStatus.toBuildersSet(draining),
+        
thrift.maintenanceStatus(hosts).getResult().getMaintenanceStatusResult()
+            .getStatuses());
+    assertEquals(2L, statsProvider.getLongValue(MAINTENANCE_STATUS));
+    assertEquals(
+        IHostStatus.toBuildersSet(drained),
+        
thrift.maintenanceStatus(hosts).getResult().getMaintenanceStatusResult()
+            .getStatuses());
+    assertEquals(3L, statsProvider.getLongValue(MAINTENANCE_STATUS));
+    assertEquals(
+        IHostStatus.toBuildersSet(none),
+        
thrift.endMaintenance(hosts).getResult().getEndMaintenanceResult().getStatuses());
+    assertEquals(1L, statsProvider.getLongValue(END_MAINTENANCE));
+  }
+
   private static Response okEmptyResponse() {
     return response(OK, Optional.empty());
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java 
b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
index 40851c4..f0199a4 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
@@ -25,9 +25,12 @@ import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
 
 import org.apache.aurora.common.application.ShutdownStage;
 import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.gen.AuroraAdmin;
 import org.apache.aurora.gen.Container;
@@ -48,10 +51,12 @@ import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
 import org.apache.aurora.scheduler.app.local.FakeNonVolatileStorage;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.config.CliOptions;
+import org.apache.aurora.scheduler.config.types.TimeAmount;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import 
org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.cron.quartz.CronModule;
+import org.apache.aurora.scheduler.maintenance.MaintenanceController;
 import org.apache.aurora.scheduler.mesos.DriverFactory;
 import org.apache.aurora.scheduler.mesos.DriverSettings;
 import org.apache.aurora.scheduler.mesos.FrameworkInfoFactory;
@@ -138,6 +143,10 @@ public class ThriftIT extends EasyMockTest {
             bindMock(StorageBackup.class);
             bindMock(SnapshotStore.class);
             bind(IServerInfo.class).toInstance(SERVER_INFO);
+            bind(new TypeLiteral<Amount<Long, Time>>() { })
+                .annotatedWith(
+                    
MaintenanceController.MaintenanceControllerImpl.PollingInterval.class)
+                .toInstance(new TimeAmount(1, Time.MINUTES));
           }
 
           @Provides
@@ -181,6 +190,8 @@ public class ThriftIT extends EasyMockTest {
         true,
         true,
         false,
+        20,
+        1800,
         ConfigurationManager.DEFAULT_ALLOWED_JOB_ENVIRONMENTS);
 
     createThrift(configurationManagerSettings);
@@ -189,6 +200,7 @@ public class ThriftIT extends EasyMockTest {
 
     TaskConfig task = TaskTestUtil.makeConfig(TaskTestUtil.JOB).newBuilder();
     task.unsetExecutorConfig();
+    task.unsetSlaPolicy();
     task.setProduction(false)
         .setTier(TaskTestUtil.DEV_TIER_NAME)
         .setContainer(Container.docker(new DockerContainer()

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/thrift/aop/MockDecoratedThrift.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/thrift/aop/MockDecoratedThrift.java 
b/src/test/java/org/apache/aurora/scheduler/thrift/aop/MockDecoratedThrift.java
index d412090..9903811 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/thrift/aop/MockDecoratedThrift.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/thrift/aop/MockDecoratedThrift.java
@@ -36,6 +36,7 @@ import org.apache.aurora.gen.JobUpdateRequest;
 import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.gen.Response;
 import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.SlaPolicy;
 import org.apache.aurora.gen.TaskQuery;
 import org.apache.aurora.scheduler.http.api.security.AuthorizingParam;
 import org.apache.aurora.scheduler.thrift.auth.DecoratedThrift;
@@ -200,6 +201,12 @@ public class MockDecoratedThrift implements 
AnnotatedAuroraAdmin {
   }
 
   @Override
+  public Response slaDrainHosts(Hosts hosts, SlaPolicy defaultSlaPolicy, long 
timeoutSecs)
+      throws TException {
+    return this.annotatedAuroraAdmin.slaDrainHosts(hosts, defaultSlaPolicy, 
timeoutSecs);
+  }
+
+  @Override
   public Response snapshot() throws TException {
     return this.annotatedAuroraAdmin.snapshot();
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/python/apache/aurora/admin/test_maintenance.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/admin/test_maintenance.py 
b/src/test/python/apache/aurora/admin/test_maintenance.py
index ca0239b..d0f7956 100644
--- a/src/test/python/apache/aurora/admin/test_maintenance.py
+++ b/src/test/python/apache/aurora/admin/test_maintenance.py
@@ -18,7 +18,13 @@ from mock import Mock, patch
 from twitter.common.contextutil import temporary_file
 
 from apache.aurora.admin.host_maintenance import HostMaintenance
-from apache.aurora.admin.maintenance import host_activate, host_deactivate, 
host_drain, host_status
+from apache.aurora.admin.maintenance import (
+    host_activate,
+    host_deactivate,
+    host_drain,
+    host_status,
+    sla_host_drain
+)
 
 from .util import AuroraClientCommandTest
 
@@ -29,12 +35,15 @@ from gen.apache.aurora.api.ttypes import (
     HostStatus,
     MaintenanceMode,
     MaintenanceStatusResult,
+    PercentageSlaPolicy,
+    SlaPolicy,
     StartMaintenanceResult
 )
 
 
 class TestMaintenanceCommands(AuroraClientCommandTest):
   HOSTNAMES = ['us-grf-20', 'us-jim-47', 'us-suz-01']
+  DEFAULT_SLA_PERCENTAGE = 95
 
   def make_mock_options(self):
     mock_options = Mock()
@@ -46,6 +55,9 @@ class TestMaintenanceCommands(AuroraClientCommandTest):
     mock_options.percentage = None
     mock_options.duration = None
     mock_options.reason = None
+    mock_options.default_duration = '30m'
+    mock_options.default_percentage = self.DEFAULT_SLA_PERCENTAGE
+    mock_options.timeout = '2h'
     return mock_options
 
   def create_host_statuses(self, maintenance_mode, skip_hosts=None):
@@ -127,9 +139,9 @@ class TestMaintenanceCommands(AuroraClientCommandTest):
     mock_scheduler_proxy.startMaintenance.return_value = 
self.create_start_maintenance_result()
     mock_scheduler_proxy.drainHosts.return_value = 
self.create_start_maintenance_result()
     mock_vector = self.create_mock_probe_hosts_vector([
-        self.create_probe_hosts(self.HOSTNAMES[0], 95, True, None),
-        self.create_probe_hosts(self.HOSTNAMES[1], 95, True, None),
-        self.create_probe_hosts(self.HOSTNAMES[2], 95, True, None)
+        self.create_probe_hosts(self.HOSTNAMES[0], 
self.DEFAULT_SLA_PERCENTAGE, True, None),
+        self.create_probe_hosts(self.HOSTNAMES[1], 
self.DEFAULT_SLA_PERCENTAGE, True, None),
+        self.create_probe_hosts(self.HOSTNAMES[2], 
self.DEFAULT_SLA_PERCENTAGE, True, None)
     ])
 
     with contextlib.nested(
@@ -163,9 +175,9 @@ class TestMaintenanceCommands(AuroraClientCommandTest):
         skip_hosts=['us-grf-20'])
     mock_scheduler_proxy.drainHosts.return_value = 
self.create_start_maintenance_result()
     mock_vector = self.create_mock_probe_hosts_vector([
-      self.create_probe_hosts(self.HOSTNAMES[0], 95, True, None),
-      self.create_probe_hosts(self.HOSTNAMES[1], 95, True, None),
-      self.create_probe_hosts(self.HOSTNAMES[2], 95, True, None)
+      self.create_probe_hosts(self.HOSTNAMES[0], self.DEFAULT_SLA_PERCENTAGE, 
True, None),
+      self.create_probe_hosts(self.HOSTNAMES[1], self.DEFAULT_SLA_PERCENTAGE, 
True, None),
+      self.create_probe_hosts(self.HOSTNAMES[2], self.DEFAULT_SLA_PERCENTAGE, 
True, None)
     ])
 
     with contextlib.nested(
@@ -192,9 +204,9 @@ class TestMaintenanceCommands(AuroraClientCommandTest):
       mock_scheduler_proxy.startMaintenance.return_value = 
self.create_start_maintenance_result()
       mock_scheduler_proxy.drainHosts.return_value = 
self.create_start_maintenance_result()
       mock_vector = self.create_mock_probe_hosts_vector([
-          self.create_probe_hosts(self.HOSTNAMES[0], 95, False, None),
-          self.create_probe_hosts(self.HOSTNAMES[1], 95, False, None),
-          self.create_probe_hosts(self.HOSTNAMES[2], 95, False, None)
+          self.create_probe_hosts(self.HOSTNAMES[0], 
self.DEFAULT_SLA_PERCENTAGE, False, None),
+          self.create_probe_hosts(self.HOSTNAMES[1], 
self.DEFAULT_SLA_PERCENTAGE, False, None),
+          self.create_probe_hosts(self.HOSTNAMES[2], 
self.DEFAULT_SLA_PERCENTAGE, False, None)
       ])
 
       with contextlib.nested(
@@ -222,9 +234,9 @@ class TestMaintenanceCommands(AuroraClientCommandTest):
       mock_scheduler_proxy.startMaintenance.return_value = 
self.create_start_maintenance_result()
       mock_scheduler_proxy.drainHosts.return_value = 
self.create_start_maintenance_result()
       mock_vector = self.create_mock_probe_hosts_vector([
-          self.create_probe_hosts(self.HOSTNAMES[0], 95, False, None),
-          self.create_probe_hosts(self.HOSTNAMES[1], 95, False, None),
-          self.create_probe_hosts(self.HOSTNAMES[2], 95, False, None)
+          self.create_probe_hosts(self.HOSTNAMES[0], 
self.DEFAULT_SLA_PERCENTAGE, False, None),
+          self.create_probe_hosts(self.HOSTNAMES[1], 
self.DEFAULT_SLA_PERCENTAGE, False, None),
+          self.create_probe_hosts(self.HOSTNAMES[2], 
self.DEFAULT_SLA_PERCENTAGE, False, None)
       ])
       mock_wait = Mock()
 
@@ -326,3 +338,46 @@ class TestMaintenanceCommands(AuroraClientCommandTest):
       mock_scheduler_proxy.maintenanceStatus.assert_called_with(
         Hosts(set(self.HOSTNAMES)),
         retry=True)
+
+  def test_perform_sla_maintenance_hosts(self):
+    mock_options = self.make_mock_options()
+
+    mock_api, mock_scheduler_proxy = self.create_mock_api()
+    mock_scheduler_proxy.slaDrainHosts.return_value = 
self.create_start_maintenance_result()
+
+    with contextlib.nested(
+      patch('apache.aurora.client.api.SchedulerProxy', 
return_value=mock_scheduler_proxy),
+      patch('apache.aurora.admin.maintenance.CLUSTERS', 
new=self.TEST_CLUSTERS),
+      patch('twitter.common.app.get_options', return_value=mock_options)):
+      sla_host_drain([self.TEST_CLUSTER])
+
+      mock_scheduler_proxy.slaDrainHosts.assert_called_with(
+        Hosts(set(self.HOSTNAMES)),
+        SlaPolicy(percentageSlaPolicy=PercentageSlaPolicy(
+          percentage=self.DEFAULT_SLA_PERCENTAGE,
+          durationSecs=1800)),
+        7200)
+      assert mock_scheduler_proxy.slaDrainHosts.call_count == 1
+
+  def test_perform_sla_maintenance_hosts_defaults(self):
+    mock_options = self.make_mock_options()
+    mock_options.default_percentage = self.DEFAULT_SLA_PERCENTAGE
+    mock_options.default_duration = '2m'
+    mock_options.timeout = '10m'
+
+    mock_api, mock_scheduler_proxy = self.create_mock_api()
+    mock_scheduler_proxy.slaDrainHosts.return_value = 
self.create_start_maintenance_result()
+
+    with contextlib.nested(
+      patch('apache.aurora.client.api.SchedulerProxy', 
return_value=mock_scheduler_proxy),
+      patch('apache.aurora.admin.maintenance.CLUSTERS', 
new=self.TEST_CLUSTERS),
+      patch('twitter.common.app.get_options', return_value=mock_options)):
+      sla_host_drain([self.TEST_CLUSTER])
+
+      mock_scheduler_proxy.slaDrainHosts.assert_called_with(
+        Hosts(set(self.HOSTNAMES)),
+        SlaPolicy(percentageSlaPolicy=PercentageSlaPolicy(
+          percentage=self.DEFAULT_SLA_PERCENTAGE,
+          durationSecs=120)),
+        600)
+      assert mock_scheduler_proxy.slaDrainHosts.call_count == 1

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/python/apache/aurora/api_util.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/api_util.py 
b/src/test/python/apache/aurora/api_util.py
index 3fc9b47..2022db5 100644
--- a/src/test/python/apache/aurora/api_util.py
+++ b/src/test/python/apache/aurora/api_util.py
@@ -58,6 +58,9 @@ class SchedulerThriftApiSpec(ReadOnlyScheduler.Iface):
   def drainHosts(self, hosts):
     pass
 
+  def slaDrainHosts(self, hosts, defaultSlaPolicy, timeoutSecs):
+    pass
+
   def maintenanceStatus(self, hosts):
     pass
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/python/apache/aurora/client/api/test_scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_client.py 
b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
index f2a2eae..d1e1e43 100644
--- a/src/test/python/apache/aurora/client/api/test_scheduler_client.py
+++ b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
@@ -45,6 +45,7 @@ from gen.apache.aurora.api.ttypes import (
     ResponseCode,
     ResponseDetail,
     ScheduleStatus,
+    SlaPolicy,
     TaskQuery
 )
 
@@ -211,6 +212,14 @@ class 
TestSchedulerProxyAdminInjection(TestSchedulerProxyInjection):
     self.mox.ReplayAll()
     self.make_scheduler_proxy().drainHosts(Hosts())
 
+  def test_slaDrainHosts(self):
+    self.mock_thrift_client.slaDrainHosts(
+      IsA(Hosts),
+      IsA(SlaPolicy),
+      10).AndReturn(DEFAULT_RESPONSE)
+    self.mox.ReplayAll()
+    self.make_scheduler_proxy().slaDrainHosts(Hosts(), SlaPolicy(), 10)
+
   def test_maintenanceStatus(self):
     self.mock_thrift_client.maintenanceStatus(
       IsA(Hosts)).AndReturn(DEFAULT_RESPONSE)

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/python/apache/aurora/client/cli/test_add.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_add.py 
b/src/test/python/apache/aurora/client/cli/test_add.py
index b22b9f7..ff4ccf6 100644
--- a/src/test/python/apache/aurora/client/cli/test_add.py
+++ b/src/test/python/apache/aurora/client/cli/test_add.py
@@ -20,7 +20,8 @@ from apache.aurora.client.cli.options import TaskInstanceKey
 
 from .util import AuroraClientCommandTest, FakeAuroraCommandContext, 
mock_verb_options
 
-from gen.apache.aurora.api.ttypes import ScheduleStatus
+from gen.apache.aurora.api.constants import ACTIVE_STATES
+from gen.apache.aurora.api.ttypes import ScheduleStatus, TaskQuery
 
 
 class TestAddCommand(AuroraClientCommandTest):
@@ -37,6 +38,8 @@ class TestAddCommand(AuroraClientCommandTest):
     self._mock_options.open_browser = True
     self._fake_context.add_expected_query_result(self.create_query_call_result(
         self.create_scheduled_task(1, ScheduleStatus.RUNNING)))
+    self._fake_context.add_expected_query_result(
+      self.create_query_call_result(), job_key=self.TEST_JOBKEY)
 
     self._mock_api.add_instances.return_value = 
self.create_simple_success_response()
 
@@ -50,11 +53,17 @@ class TestAddCommand(AuroraClientCommandTest):
     assert mock_webbrowser.mock_calls == [
         call("http://something_or_other/scheduler/bozo/test/hello";)
     ]
+    assert self._mock_api.query_no_configs.mock_calls == [
+      call(TaskQuery(jobKeys=[self.TEST_JOBKEY.to_thrift()], 
statuses=ACTIVE_STATES)),
+      call(TaskQuery(jobKeys=[self.TEST_JOBKEY.to_thrift()], 
statuses=ACTIVE_STATES))
+    ]
 
   def test_wait_added_instances(self):
     self._mock_options.wait_until = 'RUNNING'
     self._fake_context.add_expected_query_result(self.create_query_call_result(
         self.create_scheduled_task(1, ScheduleStatus.PENDING)))
+    self._fake_context.add_expected_query_result(
+      self.create_query_call_result(), job_key=self.TEST_JOBKEY)
 
     self._mock_api.add_instances.return_value = 
self.create_simple_success_response()
 
@@ -70,6 +79,10 @@ class TestAddCommand(AuroraClientCommandTest):
         self.TEST_JOBKEY,
         self._mock_api,
         [2, 3, 4])]
+    assert self._mock_api.query_no_configs.mock_calls == [
+      call(TaskQuery(jobKeys=[self.TEST_JOBKEY.to_thrift()], 
statuses=ACTIVE_STATES)),
+      call(TaskQuery(jobKeys=[self.TEST_JOBKEY.to_thrift()], 
statuses=ACTIVE_STATES))
+    ]
 
   def test_no_active_instance(self):
     
self._fake_context.add_expected_query_result(self.create_empty_task_result())
@@ -79,8 +92,15 @@ class TestAddCommand(AuroraClientCommandTest):
   def test_add_instances_raises(self):
     self._fake_context.add_expected_query_result(self.create_query_call_result(
         self.create_scheduled_task(1, ScheduleStatus.PENDING)))
+    self._fake_context.add_expected_query_result(
+      self.create_query_call_result(), job_key=self.TEST_JOBKEY)
 
     self._mock_api.add_instances.return_value = self.create_error_response()
 
     with pytest.raises(Context.CommandError):
       self._command.execute(self._fake_context)
+
+    assert self._mock_api.query_no_configs.mock_calls == [
+      call(TaskQuery(jobKeys=[self.TEST_JOBKEY.to_thrift()], 
statuses=ACTIVE_STATES)),
+      call(TaskQuery(jobKeys=[self.TEST_JOBKEY.to_thrift()], 
statuses=ACTIVE_STATES))
+    ]

Reply via email to