Repository: aurora
Updated Branches:
  refs/heads/master f054e9b10 -> 4e28e73bb


http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java 
b/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java
index 2c27ec7..efe0c06 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java
@@ -14,47 +14,101 @@
 package org.apache.aurora.scheduler.updater;
 
 import java.util.Optional;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableSet;
 
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.gen.InstanceKey;
+import org.apache.aurora.gen.InstanceTaskConfig;
 import org.apache.aurora.gen.JobUpdateInstructions;
 import org.apache.aurora.gen.JobUpdateKey;
 import org.apache.aurora.gen.JobUpdateSettings;
 import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.PercentageSlaPolicy;
+import org.apache.aurora.gen.Range;
 import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.SlaPolicy;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.updater.InstanceActionHandler.KillTask;
+import org.easymock.Capture;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class KillTaskTest extends EasyMockTest {
+  private static final String TASK_ID = "task_id";
+  private static final IJobKey JOB = JobKeys.from("role", "env", "job");
+  private static final IInstanceKey INSTANCE =
+      IInstanceKey.build(new InstanceKey(JOB.newBuilder(), 0));
+  private static final IJobUpdateKey UPDATE_ID =
+      IJobUpdateKey.build(new JobUpdateKey(JOB.newBuilder(), "update_id"));
+  private static final SlaPolicy TEST_SLA_POLICY_OLD = 
SlaPolicy.percentageSlaPolicy(
+      new PercentageSlaPolicy()
+          .setPercentage(95)
+          .setDurationSecs(3600));
+  private static final SlaPolicy TEST_SLA_POLICY_NEW = 
SlaPolicy.percentageSlaPolicy(
+      new PercentageSlaPolicy()
+          .setPercentage(0)
+          .setDurationSecs(0));
+  private static final ITaskConfig CONFIG_NO_SLA = ITaskConfig.build(
+      TaskTestUtil.makeConfig(JOB, true, Optional.empty()).newBuilder());
+  private static final ITaskConfig OLD_CONFIG_WITH_SLA =
+      TaskTestUtil.makeConfig(JOB, true, Optional.of(TEST_SLA_POLICY_OLD));
+  private static final ITaskConfig NEW_CONFIG_WITH_SLA =
+      TaskTestUtil.makeConfig(JOB, true, Optional.of(TEST_SLA_POLICY_NEW));
   private static final IJobUpdateInstructions INSTRUCTIONS = 
IJobUpdateInstructions.build(
       new JobUpdateInstructions()
           .setSettings(
               new JobUpdateSettings()
                   .setMinWaitInInstanceRunningMs(1000)));
-  private static final IJobKey JOB = JobKeys.from("role", "env", "job");
-  private static final IInstanceKey INSTANCE =
-      IInstanceKey.build(new InstanceKey(JOB.newBuilder(), 0));
-  private static final IJobUpdateKey UPDATE_ID =
-          IJobUpdateKey.build(new JobUpdateKey(JOB.newBuilder(), "update_id"));
+  private static final IJobUpdateInstructions INSTRUCTIONS_SLA_AWARE = 
IJobUpdateInstructions.build(
+      new JobUpdateInstructions()
+          .setSettings(
+              new JobUpdateSettings()
+                  .setMinWaitInInstanceRunningMs(1000)
+                  .setSlaAware(true))
+          .setInitialState(ImmutableSet.of(new InstanceTaskConfig(
+              OLD_CONFIG_WITH_SLA.newBuilder(),
+              ImmutableSet.of(new Range(0, 0)))))
+          .setDesiredState(new InstanceTaskConfig(
+              NEW_CONFIG_WITH_SLA.newBuilder(),
+              ImmutableSet.of(new Range(0, 0)))));
+  private static final IJobUpdateInstructions INSTRUCTIONS_SLA_AWARE_NO_POLICY
+      = IJobUpdateInstructions.build(
+          new JobUpdateInstructions()
+              .setSettings(
+                  new JobUpdateSettings()
+                      .setMinWaitInInstanceRunningMs(1000)
+                      .setSlaAware(true))
+              .setDesiredState(new InstanceTaskConfig(
+                  CONFIG_NO_SLA.newBuilder(),
+                  ImmutableSet.of(new Range(0, 0)))));
 
   private StorageTestUtil storageUtil;
   private StateManager stateManager;
   private InstanceActionHandler handler;
   private UpdateAgentReserver updateAgentReserver;
+  private SlaKillController slaKillController;
 
   @Before
   public void setUp() {
@@ -62,19 +116,19 @@ public class KillTaskTest extends EasyMockTest {
     storageUtil.expectOperations();
     stateManager = createMock(StateManager.class);
     updateAgentReserver = createMock(UpdateAgentReserver.class);
-    handler = new InstanceActionHandler.KillTask(false);
+    handler = new KillTask(false);
+    slaKillController = createMock(SlaKillController.class);
   }
 
   @Test
   public void testInstanceKill() throws Exception {
-    String id = "task_id";
     storageUtil.expectTaskFetch(
         Query.instanceScoped(INSTANCE).active(),
-        TaskTestUtil.makeTask(id, INSTANCE.getJobKey()));
+        TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey()));
 
     expect(stateManager.changeState(
         storageUtil.mutableStoreProvider,
-        id,
+        TASK_ID,
         Optional.empty(),
         ScheduleStatus.KILLING,
         Optional.of("Killed for job update " + UPDATE_ID.getId())))
@@ -89,36 +143,36 @@ public class KillTaskTest extends EasyMockTest {
         stateManager,
         updateAgentReserver,
         JobUpdateStatus.ROLLING_BACK,
-        UPDATE_ID);
+        UPDATE_ID,
+        slaKillController);
   }
 
   @Test
   public void testKillForUpdateReservesAgentForInstance() throws Exception {
-    String id = "task_id";
-    IScheduledTask task = TaskTestUtil.makeTask(id, INSTANCE.getJobKey(), 1, 
"agent01");
+    IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 
1, "agent01");
     storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task);
 
     expect(stateManager.changeState(
         storageUtil.mutableStoreProvider,
-        id,
+        TASK_ID,
         Optional.empty(),
         ScheduleStatus.KILLING,
         Optional.of("Killed for job update " + UPDATE_ID.getId())))
         .andReturn(StateChangeResult.SUCCESS);
-
     updateAgentReserver.reserve(task.getAssignedTask().getSlaveId(), INSTANCE);
     expectLastCall();
 
     control.replay();
 
-    new InstanceActionHandler.KillTask(true).getReevaluationDelay(
+    new KillTask(true).getReevaluationDelay(
         INSTANCE,
         INSTRUCTIONS,
         storageUtil.mutableStoreProvider,
         stateManager,
         updateAgentReserver,
         JobUpdateStatus.ROLLING_BACK,
-        UPDATE_ID);
+        UPDATE_ID,
+        slaKillController);
   }
 
   @Test
@@ -134,6 +188,177 @@ public class KillTaskTest extends EasyMockTest {
         stateManager,
         updateAgentReserver,
         JobUpdateStatus.ROLLING_BACK,
-        UPDATE_ID);
+        UPDATE_ID,
+        slaKillController);
+  }
+
+  /**
+   * Ensures that if an instance is killed with code {@code slaAware} option 
in the instructions,
+   * then the kill is sent to be handled by the {@link SlaKillController}.
+   */
+  @Test
+  public void testInstanceKillSlaAware() throws Exception {
+    IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey());
+    storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task);
+
+    slaKillController.slaKill(
+        eq(storageUtil.mutableStoreProvider),
+        eq(INSTANCE),
+        eq(task),
+        eq(UPDATE_ID),
+        eq(INSTRUCTIONS_SLA_AWARE.getDesiredState().getTask().getSlaPolicy()),
+        eq(JobUpdateStatus.ROLLING_FORWARD),
+        anyObject());
+    expectLastCall();
+
+    control.replay();
+
+    handler.getReevaluationDelay(
+        INSTANCE,
+        INSTRUCTIONS_SLA_AWARE,
+        storageUtil.mutableStoreProvider,
+        stateManager,
+        updateAgentReserver,
+        JobUpdateStatus.ROLLING_FORWARD,
+        UPDATE_ID,
+        slaKillController);
+  }
+
+  /**
+   * Ensures that if an instance killed while {@link 
JobUpdateStatus#ROLLING_BACK}, it uses the old
+   * configuration.
+   */
+  @Test
+  public void testInstanceKillSlaAwareRollback() throws Exception {
+    IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 
1, "agent01");
+    storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task);
+
+    Capture<Consumer<Storage.MutableStoreProvider>> killCommandCapture = 
createCapture();
+    slaKillController.slaKill(
+        eq(storageUtil.mutableStoreProvider),
+        eq(INSTANCE),
+        eq(task),
+        eq(UPDATE_ID),
+        
eq(INSTRUCTIONS_SLA_AWARE.getInitialState().asList().get(0).getTask().getSlaPolicy()),
+        eq(JobUpdateStatus.ROLLING_BACK),
+        capture(killCommandCapture));
+    expectLastCall();
+
+    control.replay();
+
+    handler.getReevaluationDelay(
+        INSTANCE,
+        INSTRUCTIONS_SLA_AWARE,
+        storageUtil.mutableStoreProvider,
+        stateManager,
+        updateAgentReserver,
+        JobUpdateStatus.ROLLING_BACK,
+        UPDATE_ID,
+        slaKillController);
+  }
+
+  /**
+   * Ensure the correct behavior of the consumer passed into
+   * {@link SlaKillController#slaKill}. It should behave as
+   * {@link KillTask#killAndMaybeReserve}.
+   */
+  @Test
+  public void testInstanceKillSlaAwareKillCommand() throws Exception {
+    IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 
1, "agent01");
+    storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task);
+
+    Capture<Consumer<Storage.MutableStoreProvider>> killCommandCapture = 
createCapture();
+    slaKillController.slaKill(
+        eq(storageUtil.mutableStoreProvider),
+        eq(INSTANCE),
+        eq(task),
+        eq(UPDATE_ID),
+        eq(INSTRUCTIONS_SLA_AWARE.getDesiredState().getTask().getSlaPolicy()),
+        eq(JobUpdateStatus.ROLLING_FORWARD),
+        capture(killCommandCapture));
+    expectLastCall();
+
+    // Ensure the correct kill command consumer has been passed in and 
expected behavior occurs
+    // when executed.
+    expect(stateManager.changeState(
+        storageUtil.mutableStoreProvider,
+        TASK_ID,
+        Optional.empty(),
+        ScheduleStatus.KILLING,
+        Optional.of("Killed for job update " + UPDATE_ID.getId())))
+        .andReturn(StateChangeResult.SUCCESS);
+    updateAgentReserver.reserve(task.getAssignedTask().getSlaveId(), INSTANCE);
+    expectLastCall();
+
+    control.replay();
+
+    new KillTask(true).getReevaluationDelay(
+        INSTANCE,
+        INSTRUCTIONS_SLA_AWARE,
+        storageUtil.mutableStoreProvider,
+        stateManager,
+        updateAgentReserver,
+        JobUpdateStatus.ROLLING_FORWARD,
+        UPDATE_ID,
+        slaKillController);
+
+    assertTrue(killCommandCapture.hasCaptured());
+    killCommandCapture.getValue().accept(storageUtil.mutableStoreProvider);
+  }
+
+  /**
+   * Ensures that if an instance is killed with code {@code slaAware} option 
in the instructions
+   * but no {@link org.apache.aurora.gen.SlaPolicy} with the task, then the 
kill does not fail but
+   * instead continues as a non-sla-aware kill.
+   */
+  @Test
+  public void testInstanceKillSlaAwareMissingSlaPolicy() throws Exception {
+    IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 
1, "agent01");
+    storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task);
+    expect(stateManager.changeState(
+        storageUtil.mutableStoreProvider,
+        TASK_ID,
+        Optional.empty(),
+        ScheduleStatus.KILLING,
+        Optional.of("Killed for job update " + UPDATE_ID.getId())))
+        .andReturn(StateChangeResult.SUCCESS);
+
+    control.replay();
+
+    handler.getReevaluationDelay(
+        INSTANCE,
+        INSTRUCTIONS_SLA_AWARE_NO_POLICY,
+        storageUtil.mutableStoreProvider,
+        stateManager,
+        updateAgentReserver,
+        JobUpdateStatus.ROLLING_FORWARD,
+        UPDATE_ID,
+        slaKillController);
+  }
+
+  /**
+   * Ensures that if SLA-aware kill is called while not in an active state we 
throw an exception.
+   */
+  @Test
+  public void testInstanceKillSlaAwareBadStatus() {
+    IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 
1, "agent01");
+    storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task);
+
+    control.replay();
+
+    try {
+      handler.getReevaluationDelay(
+          INSTANCE,
+          INSTRUCTIONS_SLA_AWARE_NO_POLICY,
+          storageUtil.mutableStoreProvider,
+          stateManager,
+          updateAgentReserver,
+          JobUpdateStatus.ROLL_FORWARD_PAUSED,
+          UPDATE_ID,
+          slaKillController);
+      fail();
+    } catch (UpdateStateException e) {
+      // Expected
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/java/org/apache/aurora/scheduler/updater/SlaKillControllerTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/updater/SlaKillControllerTest.java 
b/src/test/java/org/apache/aurora/scheduler/updater/SlaKillControllerTest.java
new file mode 100644
index 0000000..373fb83
--- /dev/null
+++ 
b/src/test/java/org/apache/aurora/scheduler/updater/SlaKillControllerTest.java
@@ -0,0 +1,449 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.util.BackoffStrategy;
+import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobInstanceUpdateEvent;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateAction;
+import org.apache.aurora.gen.JobUpdateDetails;
+import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.JobUpdateInstructions;
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.JobUpdateSettings;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.PercentageSlaPolicy;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.SlaPolicy;
+import org.apache.aurora.scheduler.base.InstanceKeys;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.config.types.TimeAmount;
+import org.apache.aurora.scheduler.sla.SlaManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ISlaPolicy;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import 
org.apache.aurora.scheduler.updater.UpdaterModule.UpdateActionBatchWorker;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static 
org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.newCapture;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SlaKillControllerTest extends EasyMockTest {
+
+  private static final ITaskConfig OLD_CONFIG = TaskTestUtil.makeConfig(JOB);
+  private static final SlaPolicy TEST_SLA_POLICY = 
SlaPolicy.percentageSlaPolicy(
+      new PercentageSlaPolicy()
+          .setPercentage(0)
+          .setDurationSecs(0));
+  private static final ITaskConfig NEW_CONFIG = ITaskConfig.build(
+      TaskTestUtil.makeConfig(JOB).newBuilder().setSlaPolicy(TEST_SLA_POLICY));
+  private static final IScheduledTask TASK = IScheduledTask.build(
+      makeTask("id", 
OLD_CONFIG).newBuilder().setStatus(ScheduleStatus.RUNNING));
+  private static final IAssignedTask ASSIGNED_TASK = TASK.getAssignedTask();
+  private static final IInstanceKey INSTANCE_KEY = InstanceKeys.from(
+      JOB,
+      ASSIGNED_TASK.getInstanceId());
+  private static final IJobUpdate UPDATE = IJobUpdate.build(
+      new JobUpdate()
+          .setInstructions(new JobUpdateInstructions()
+              .setDesiredState(new InstanceTaskConfig()
+                  .setTask(NEW_CONFIG.newBuilder()))
+              .setSettings(new JobUpdateSettings()
+                  .setSlaAware(true))));
+  private static final IJobUpdateKey UPDATE_ID =
+      IJobUpdateKey.build(new JobUpdateKey(JOB.newBuilder(), "update_id"));
+  private static final String KILL_ATTEMPTS_STAT_NAME = 
SlaKillController.SLA_KILL_ATTEMPT
+      + JobKeys.canonicalString(JOB);
+  private static final String KILL_SUCCESSES_STAT_NAME = 
SlaKillController.SLA_KILL_SUCCESS
+      + JobKeys.canonicalString(JOB);
+
+  private StorageTestUtil storageUtil;
+  private UpdateActionBatchWorker batchWorker;
+  private SlaManager slaManager;
+  private FakeScheduledExecutor clock;
+  private BackoffStrategy backoffStrategy;
+  private FakeStatsProvider statsProvider;
+  private SlaKillController slaKillController;
+  private CountDownLatch killCommandHasExecuted;
+  private Consumer<Storage.MutableStoreProvider> fakeKillCommand;
+
+  @Before
+  public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    batchWorker = createMock(UpdateActionBatchWorker.class);
+    slaManager = createMock(SlaManager.class);
+    ScheduledExecutorService executor = 
createMock(ScheduledExecutorService.class);
+    clock = FakeScheduledExecutor.scheduleExecutor(executor);
+    backoffStrategy = createMock(BackoffStrategy.class);
+    statsProvider = new FakeStatsProvider();
+    slaKillController = new SlaKillController(
+        executor,
+        batchWorker,
+        slaManager,
+        clock,
+        backoffStrategy,
+        statsProvider);
+    killCommandHasExecuted = new CountDownLatch(2);
+    fakeKillCommand = mutableStoreProvider -> 
killCommandHasExecuted.countDown();
+  }
+
+  @Test
+  public <T, E extends Exception> void testSlaKill() throws Exception {
+    IJobUpdateDetails updateDetails = IJobUpdateDetails.build(
+        new JobUpdateDetails(
+            UPDATE.newBuilder(),
+            ImmutableList.of(new 
JobUpdateEvent(JobUpdateStatus.ROLLING_FORWARD, 123L)),
+            ImmutableList.of()));
+    expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID))
+        .andReturn(Optional.of(updateDetails))
+        .anyTimes();
+    Capture<IJobInstanceUpdateEvent> instanceUpdateEventCapture = newCapture();
+    storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(
+        eq(UPDATE_ID),
+        capture(instanceUpdateEventCapture));
+    expectLastCall().times(2);
+    storageUtil.expectTaskFetch(TASK.getAssignedTask().getTaskId(), TASK);
+    Capture<Storage.MutateWork<T, E>> workCapture = createCapture();
+    slaManager.checkSlaThenAct(
+        eq(TASK),
+        eq(ISlaPolicy.build(TEST_SLA_POLICY)),
+        capture(workCapture),
+        eq(ImmutableMap.of()),
+        eq(false));
+    expectBatchExecute(batchWorker, storageUtil.storage, control);
+    expect(backoffStrategy.calculateBackoffMs(0)).andReturn(42L);
+
+    control.replay();
+
+    // Kill command has not been executed yet
+    assertEquals(2, killCommandHasExecuted.getCount());
+
+    // Start an SLA-aware kill
+    slaKillController.slaKill(
+        storageUtil.mutableStoreProvider,
+        INSTANCE_KEY,
+        TASK,
+        UPDATE_ID,
+        ISlaPolicy.build(TEST_SLA_POLICY),
+        JobUpdateStatus.ROLLING_FORWARD,
+        fakeKillCommand);
+
+    // Ensure the SLA_CHECKING_MESSAGE message is added
+    assertTrue(
+        checkInstanceEventMatches(
+            instanceUpdateEventCapture.getValue(),
+            INSTANCE_KEY,
+            JobUpdateAction.INSTANCE_UPDATING,
+            SlaKillController.SLA_CHECKING_MESSAGE));
+    instanceUpdateEventCapture.reset();
+    assertFalse(instanceUpdateEventCapture.hasCaptured());
+
+    // Pretend SLA passes, executes work
+    workCapture.getValue().apply(storageUtil.mutableStoreProvider);
+    assertEquals(1, killCommandHasExecuted.getCount());
+
+    // Ensure the SLA_PASSED_MESSAGE message is added
+    assertTrue(
+        checkInstanceEventMatches(
+            instanceUpdateEventCapture.getValue(),
+            INSTANCE_KEY,
+            JobUpdateAction.INSTANCE_UPDATING,
+            SlaKillController.SLA_PASSED_MESSAGE));
+  }
+
+  /**
+   * Test that SLA kills are retried in case the SLA check does not pass.
+   */
+  @Test
+  public <T, E extends Exception> void testSlaKillRetry() throws Exception {
+    IJobUpdateDetails updateDetails = IJobUpdateDetails.build(
+        new JobUpdateDetails(
+            UPDATE.newBuilder(),
+            ImmutableList.of(new 
JobUpdateEvent(JobUpdateStatus.ROLLING_FORWARD, 123L)),
+            ImmutableList.of()));
+    expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID))
+        .andReturn(Optional.of(updateDetails))
+        .anyTimes();
+    storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(eq(UPDATE_ID), 
anyObject());
+    expectLastCall().times(2);
+    storageUtil.expectTaskFetch(TASK.getAssignedTask().getTaskId(), 
TASK).times(2);
+    storageUtil.expectTaskFetch(
+        TASK.getAssignedTask().getTaskId(),
+        
IScheduledTask.build(TASK.newBuilder().setStatus(ScheduleStatus.KILLING)));
+    Capture<Storage.MutateWork<T, E>> workCapture = createCapture();
+    slaManager.checkSlaThenAct(
+        eq(TASK),
+        eq(ISlaPolicy.build(TEST_SLA_POLICY)),
+        capture(workCapture),
+        eq(ImmutableMap.of()),
+        eq(false));
+    expectLastCall().times(2);
+    expectBatchExecute(batchWorker, storageUtil.storage, control).times(3);
+    expect(backoffStrategy.calculateBackoffMs(0L)).andReturn(42L);
+    expect(backoffStrategy.calculateBackoffMs(42L)).andReturn(84L);
+
+    control.replay();
+
+    // Kill command has not been executed yet
+    
assertFalse(statsProvider.getAllValues().keySet().contains(KILL_ATTEMPTS_STAT_NAME));
+    
assertFalse(statsProvider.getAllValues().keySet().contains(KILL_SUCCESSES_STAT_NAME));
+    assertFalse(workCapture.hasCaptured());
+    assertEquals(killCommandHasExecuted.getCount(), 2);
+
+    // Start an SLA-aware kill
+    slaKillController.slaKill(
+        storageUtil.mutableStoreProvider,
+        INSTANCE_KEY,
+        TASK,
+        UPDATE_ID,
+        ISlaPolicy.build(TEST_SLA_POLICY),
+        JobUpdateStatus.ROLLING_FORWARD,
+        fakeKillCommand);
+
+    // SLA check is called and discarded, pretending it failed
+    assertEquals(1, statsProvider.getLongValue(KILL_ATTEMPTS_STAT_NAME));
+    
assertFalse(statsProvider.getAllValues().keySet().contains(KILL_SUCCESSES_STAT_NAME));
+    assertTrue(workCapture.hasCaptured());
+    workCapture.reset();
+    assertEquals(2, killCommandHasExecuted.getCount());
+    assertFalse(workCapture.hasCaptured());
+
+    // Another SLA kill is scheduled assuming the previous attempt failed
+    assertEquals(1, clock.countDeferredWork());
+    clock.advance(TimeAmount.of(42L, Time.MILLISECONDS));
+
+    // The second SLA check passes and the kill function is called
+    assertEquals(2, killCommandHasExecuted.getCount());
+    workCapture.getValue().apply(storageUtil.mutableStoreProvider);
+    assertEquals(1, killCommandHasExecuted.getCount());
+    assertEquals(2, statsProvider.getLongValue(KILL_ATTEMPTS_STAT_NAME));
+    assertEquals(1, statsProvider.getLongValue(KILL_SUCCESSES_STAT_NAME));
+
+    // One more SLA kill is scheduled assuming the previous attempt failed. 
Since the previous
+    // attempt did not fail, we do a NOOP since the task is already KILLING
+    clock.advance(TimeAmount.of(84L, Time.MILLISECONDS));
+    assertEquals(2, statsProvider.getLongValue(KILL_ATTEMPTS_STAT_NAME));
+    assertEquals(1, statsProvider.getLongValue(KILL_SUCCESSES_STAT_NAME));
+    assertEquals(1, killCommandHasExecuted.getCount());
+    assertEquals(0, clock.countDeferredWork());
+  }
+
+  @Test
+  public <T, E extends Exception> void testSlaKillRollingBack() throws 
Exception {
+    IJobUpdateDetails updateDetails = IJobUpdateDetails.build(
+        new JobUpdateDetails(
+            UPDATE.newBuilder(),
+            ImmutableList.of(new JobUpdateEvent(JobUpdateStatus.ROLLING_BACK, 
123L)),
+            ImmutableList.of()));
+    expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID))
+        .andReturn(Optional.of(updateDetails))
+        .anyTimes();
+    Capture<IJobInstanceUpdateEvent> instanceUpdateEventCapture = newCapture();
+    storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(
+        eq(UPDATE_ID),
+        capture(instanceUpdateEventCapture));
+    expectLastCall().times(2);
+    storageUtil.expectTaskFetch(TASK.getAssignedTask().getTaskId(), TASK);
+    Capture<Storage.MutateWork<T, E>> workCapture = createCapture();
+    slaManager.checkSlaThenAct(
+        eq(TASK),
+        eq(ISlaPolicy.build(TEST_SLA_POLICY)),
+        capture(workCapture),
+        eq(ImmutableMap.of()),
+        eq(false));
+    expectBatchExecute(batchWorker, storageUtil.storage, control);
+    expect(backoffStrategy.calculateBackoffMs(0)).andReturn(42L);
+
+    control.replay();
+
+    // Kill command has not been executed yet
+    assertEquals(2, killCommandHasExecuted.getCount());
+
+    // Start an SLA-aware kill
+    slaKillController.slaKill(
+        storageUtil.mutableStoreProvider,
+        INSTANCE_KEY,
+        TASK,
+        UPDATE_ID,
+        ISlaPolicy.build(TEST_SLA_POLICY),
+        JobUpdateStatus.ROLLING_BACK,
+        fakeKillCommand);
+
+    // Ensure the SLA_CHECKING_MESSAGE message is added with ROLLING_BACK 
action
+    assertTrue(
+        checkInstanceEventMatches(
+            instanceUpdateEventCapture.getValue(),
+            INSTANCE_KEY,
+            JobUpdateAction.INSTANCE_ROLLING_BACK,
+            SlaKillController.SLA_CHECKING_MESSAGE));
+    instanceUpdateEventCapture.reset();
+    assertFalse(instanceUpdateEventCapture.hasCaptured());
+
+    // Pretend SLA passes, executes work
+    workCapture.getValue().apply(storageUtil.mutableStoreProvider);
+    assertEquals(1, killCommandHasExecuted.getCount());
+
+    // Ensure the SLA_PASSED_MESSAGE message is added with ROLLING_BACK action
+    assertTrue(
+        checkInstanceEventMatches(
+            instanceUpdateEventCapture.getValue(),
+            INSTANCE_KEY,
+            JobUpdateAction.INSTANCE_ROLLING_BACK,
+            SlaKillController.SLA_PASSED_MESSAGE));
+  }
+
+  @Test
+  public void testSlaKillFailOnPause() throws Exception {
+    IJobUpdateDetails updateDetails = IJobUpdateDetails.build(
+        new JobUpdateDetails(
+            UPDATE.newBuilder(),
+            ImmutableList.of(new 
JobUpdateEvent(JobUpdateStatus.ROLLING_FORWARD, 123L)),
+            ImmutableList.of()));
+    IJobUpdateDetails pausedUpdateDetails = IJobUpdateDetails.build(
+        new JobUpdateDetails(
+            UPDATE.newBuilder(),
+            ImmutableList.of(new 
JobUpdateEvent(JobUpdateStatus.ROLL_FORWARD_PAUSED, 123L)),
+            ImmutableList.of()));
+    expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID))
+        .andReturn(Optional.of(updateDetails));
+    storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(eq(UPDATE_ID), 
anyObject());
+    expectLastCall();
+    expectBatchExecute(batchWorker, storageUtil.storage, control);
+    expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID))
+        .andReturn(Optional.of(pausedUpdateDetails));
+
+    control.replay();
+
+    // Kill command has not been executed yet
+    assertEquals(2, killCommandHasExecuted.getCount());
+
+    // Start an SLA-aware kill
+    slaKillController.slaKill(
+        storageUtil.mutableStoreProvider,
+        INSTANCE_KEY,
+        TASK,
+        UPDATE_ID,
+        ISlaPolicy.build(TEST_SLA_POLICY),
+        JobUpdateStatus.ROLLING_FORWARD,
+        fakeKillCommand);
+
+    // Nothing should happen since status has changed
+    assertEquals(2, killCommandHasExecuted.getCount());
+    assertEquals(0, clock.countDeferredWork());
+    
assertFalse(statsProvider.getAllValues().keySet().contains(KILL_ATTEMPTS_STAT_NAME));
+  }
+
+  @Test
+  public void testSlaKillNoDuplicateEvents() throws Exception {
+    IJobUpdateDetails updateDetails = IJobUpdateDetails.build(
+        new JobUpdateDetails(
+            UPDATE.newBuilder(),
+            ImmutableList.of(new 
JobUpdateEvent(JobUpdateStatus.ROLLING_FORWARD, 123L)),
+            ImmutableList.of(
+                new JobInstanceUpdateEvent()
+                    .setInstanceId(INSTANCE_KEY.getInstanceId())
+                    .setAction(JobUpdateAction.INSTANCE_UPDATING)
+                    .setMessage(SlaKillController.SLA_CHECKING_MESSAGE))));
+    IJobUpdateDetails pausedUpdateDetails = IJobUpdateDetails.build(
+        new JobUpdateDetails(
+            UPDATE.newBuilder(),
+            ImmutableList.of(new 
JobUpdateEvent(JobUpdateStatus.ROLL_FORWARD_PAUSED, 123L)),
+            ImmutableList.of()));
+    expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID))
+        .andReturn(Optional.of(updateDetails));
+    expectBatchExecute(batchWorker, storageUtil.storage, control);
+    expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID))
+        .andReturn(Optional.of(pausedUpdateDetails));
+
+    control.replay();
+
+    // Start an SLA-aware kill, update already contains event so we don't 
expect a save
+    slaKillController.slaKill(
+        storageUtil.mutableStoreProvider,
+        INSTANCE_KEY,
+        TASK,
+        UPDATE_ID,
+        ISlaPolicy.build(TEST_SLA_POLICY),
+        JobUpdateStatus.ROLLING_FORWARD,
+        fakeKillCommand);
+  }
+
+  @Test
+  public void testSlaKillInvalidStatus() {
+    control.replay();
+
+    // Start an SLA-aware kill, throws an exception since the kill was called 
while the update
+    // was not active
+    try {
+      slaKillController.slaKill(
+          storageUtil.mutableStoreProvider,
+          INSTANCE_KEY,
+          TASK,
+          UPDATE_ID,
+          ISlaPolicy.build(TEST_SLA_POLICY),
+          JobUpdateStatus.ROLL_FORWARD_PAUSED,
+          fakeKillCommand);
+    } catch (RuntimeException e) {
+      return;
+    }
+
+    fail();
+  }
+
+  private boolean checkInstanceEventMatches(IJobInstanceUpdateEvent event,
+                                            IInstanceKey instance,
+                                            JobUpdateAction action,
+                                            String message) {
+
+    return event.getInstanceId() == instance.getInstanceId()
+        && event.getAction() == action
+        && event.getMessage().equals(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/python/apache/aurora/client/cli/test_inspect.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_inspect.py 
b/src/test/python/apache/aurora/client/cli/test_inspect.py
index 2baba2a..58bcac1 100644
--- a/src/test/python/apache/aurora/client/cli/test_inspect.py
+++ b/src/test/python/apache/aurora/client/cli/test_inspect.py
@@ -112,7 +112,8 @@ Process 'process':
             "watch_secs": 45,
             "rollback_on_failure": True,
             "max_per_shard_failures": 0,
-            "max_total_failures": 0},
+            "max_total_failures": 0,
+            "sla_aware": False},
         "name": "the_job",
         "max_task_failures": 1,
         "cron_collision_policy": "KILL_EXISTING",

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobInstanceUpdateEvent
----------------------------------------------------------------------
diff --git 
a/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobInstanceUpdateEvent
 
b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobInstanceUpdateEvent
index 48902d3..8360998 100644
--- 
a/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobInstanceUpdateEvent
+++ 
b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobInstanceUpdateEvent
@@ -11,6 +11,9 @@
           },
           "3": {
             "i32": 1
+          },
+          "4": {
+            "str": "string-value"
           }
         }
       },

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobUpdate
----------------------------------------------------------------------
diff --git 
a/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobUpdate
 
b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobUpdate
index 08dfa5b..3876767 100644
--- 
a/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobUpdate
+++ 
b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobUpdate
@@ -485,6 +485,9 @@
                   },
                   "9": {
                     "i32": 2
+                  },
+                  "10": {
+                    "tf": 1
                   }
                 }
               }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/ui/src/main/js/components/UpdateInstanceEvents.js
----------------------------------------------------------------------
diff --git a/ui/src/main/js/components/UpdateInstanceEvents.js 
b/ui/src/main/js/components/UpdateInstanceEvents.js
index 8351f2c..ab6d3df 100644
--- a/ui/src/main/js/components/UpdateInstanceEvents.js
+++ b/ui/src/main/js/components/UpdateInstanceEvents.js
@@ -30,7 +30,8 @@ export class InstanceEvent extends React.Component {
           getClassForUpdateAction(e.action),
           (i === events.length - 1) ? ' active' : ''),
         state: UPDATE_ACTION[e.action],
-        timestamp: e.timestampMs
+        timestamp: e.timestampMs,
+        message: e.message
       };
     });
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/ui/src/main/js/components/UpdateSettings.js
----------------------------------------------------------------------
diff --git a/ui/src/main/js/components/UpdateSettings.js 
b/ui/src/main/js/components/UpdateSettings.js
index d756f59..d7fbe00 100644
--- a/ui/src/main/js/components/UpdateSettings.js
+++ b/ui/src/main/js/components/UpdateSettings.js
@@ -25,6 +25,10 @@ export default function UpdateSettings({ update }) {
         <td>Rollback On Failure?</td>
         <td>{settings.rollbackOnFailure ? 'yes' : 'no'}</td>
       </tr>
+      <tr>
+        <td>SLA-Aware?</td>
+        <td>{settings.slaAware ? 'yes' : 'no'}</td>
+      </tr>
     </table>
   </div>);
 }

Reply via email to