Repository: aurora Updated Branches: refs/heads/master f054e9b10 -> 4e28e73bb
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java b/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java index 2c27ec7..efe0c06 100644 --- a/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java +++ b/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java @@ -14,47 +14,101 @@ package org.apache.aurora.scheduler.updater; import java.util.Optional; +import java.util.function.Consumer; + +import com.google.common.collect.ImmutableSet; import org.apache.aurora.common.testing.easymock.EasyMockTest; import org.apache.aurora.gen.InstanceKey; +import org.apache.aurora.gen.InstanceTaskConfig; import org.apache.aurora.gen.JobUpdateInstructions; import org.apache.aurora.gen.JobUpdateKey; import org.apache.aurora.gen.JobUpdateSettings; import org.apache.aurora.gen.JobUpdateStatus; +import org.apache.aurora.gen.PercentageSlaPolicy; +import org.apache.aurora.gen.Range; import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.SlaPolicy; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.state.StateChangeResult; import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IInstanceKey; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions; import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.updater.InstanceActionHandler.KillTask; +import org.easymock.Capture; import org.junit.Before; import org.junit.Test; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class KillTaskTest extends EasyMockTest { + private static final String TASK_ID = "task_id"; + private static final IJobKey JOB = JobKeys.from("role", "env", "job"); + private static final IInstanceKey INSTANCE = + IInstanceKey.build(new InstanceKey(JOB.newBuilder(), 0)); + private static final IJobUpdateKey UPDATE_ID = + IJobUpdateKey.build(new JobUpdateKey(JOB.newBuilder(), "update_id")); + private static final SlaPolicy TEST_SLA_POLICY_OLD = SlaPolicy.percentageSlaPolicy( + new PercentageSlaPolicy() + .setPercentage(95) + .setDurationSecs(3600)); + private static final SlaPolicy TEST_SLA_POLICY_NEW = SlaPolicy.percentageSlaPolicy( + new PercentageSlaPolicy() + .setPercentage(0) + .setDurationSecs(0)); + private static final ITaskConfig CONFIG_NO_SLA = ITaskConfig.build( + TaskTestUtil.makeConfig(JOB, true, Optional.empty()).newBuilder()); + private static final ITaskConfig OLD_CONFIG_WITH_SLA = + TaskTestUtil.makeConfig(JOB, true, Optional.of(TEST_SLA_POLICY_OLD)); + private static final ITaskConfig NEW_CONFIG_WITH_SLA = + TaskTestUtil.makeConfig(JOB, true, Optional.of(TEST_SLA_POLICY_NEW)); private static final IJobUpdateInstructions INSTRUCTIONS = IJobUpdateInstructions.build( new JobUpdateInstructions() .setSettings( new JobUpdateSettings() .setMinWaitInInstanceRunningMs(1000))); - private static final IJobKey JOB = JobKeys.from("role", "env", "job"); - private static final IInstanceKey INSTANCE = - IInstanceKey.build(new InstanceKey(JOB.newBuilder(), 0)); - private static final IJobUpdateKey UPDATE_ID = - IJobUpdateKey.build(new JobUpdateKey(JOB.newBuilder(), "update_id")); + private static final IJobUpdateInstructions INSTRUCTIONS_SLA_AWARE = IJobUpdateInstructions.build( + new JobUpdateInstructions() + .setSettings( + new JobUpdateSettings() + .setMinWaitInInstanceRunningMs(1000) + .setSlaAware(true)) + .setInitialState(ImmutableSet.of(new InstanceTaskConfig( + OLD_CONFIG_WITH_SLA.newBuilder(), + ImmutableSet.of(new Range(0, 0))))) + .setDesiredState(new InstanceTaskConfig( + NEW_CONFIG_WITH_SLA.newBuilder(), + ImmutableSet.of(new Range(0, 0))))); + private static final IJobUpdateInstructions INSTRUCTIONS_SLA_AWARE_NO_POLICY + = IJobUpdateInstructions.build( + new JobUpdateInstructions() + .setSettings( + new JobUpdateSettings() + .setMinWaitInInstanceRunningMs(1000) + .setSlaAware(true)) + .setDesiredState(new InstanceTaskConfig( + CONFIG_NO_SLA.newBuilder(), + ImmutableSet.of(new Range(0, 0))))); private StorageTestUtil storageUtil; private StateManager stateManager; private InstanceActionHandler handler; private UpdateAgentReserver updateAgentReserver; + private SlaKillController slaKillController; @Before public void setUp() { @@ -62,19 +116,19 @@ public class KillTaskTest extends EasyMockTest { storageUtil.expectOperations(); stateManager = createMock(StateManager.class); updateAgentReserver = createMock(UpdateAgentReserver.class); - handler = new InstanceActionHandler.KillTask(false); + handler = new KillTask(false); + slaKillController = createMock(SlaKillController.class); } @Test public void testInstanceKill() throws Exception { - String id = "task_id"; storageUtil.expectTaskFetch( Query.instanceScoped(INSTANCE).active(), - TaskTestUtil.makeTask(id, INSTANCE.getJobKey())); + TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey())); expect(stateManager.changeState( storageUtil.mutableStoreProvider, - id, + TASK_ID, Optional.empty(), ScheduleStatus.KILLING, Optional.of("Killed for job update " + UPDATE_ID.getId()))) @@ -89,36 +143,36 @@ public class KillTaskTest extends EasyMockTest { stateManager, updateAgentReserver, JobUpdateStatus.ROLLING_BACK, - UPDATE_ID); + UPDATE_ID, + slaKillController); } @Test public void testKillForUpdateReservesAgentForInstance() throws Exception { - String id = "task_id"; - IScheduledTask task = TaskTestUtil.makeTask(id, INSTANCE.getJobKey(), 1, "agent01"); + IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 1, "agent01"); storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task); expect(stateManager.changeState( storageUtil.mutableStoreProvider, - id, + TASK_ID, Optional.empty(), ScheduleStatus.KILLING, Optional.of("Killed for job update " + UPDATE_ID.getId()))) .andReturn(StateChangeResult.SUCCESS); - updateAgentReserver.reserve(task.getAssignedTask().getSlaveId(), INSTANCE); expectLastCall(); control.replay(); - new InstanceActionHandler.KillTask(true).getReevaluationDelay( + new KillTask(true).getReevaluationDelay( INSTANCE, INSTRUCTIONS, storageUtil.mutableStoreProvider, stateManager, updateAgentReserver, JobUpdateStatus.ROLLING_BACK, - UPDATE_ID); + UPDATE_ID, + slaKillController); } @Test @@ -134,6 +188,177 @@ public class KillTaskTest extends EasyMockTest { stateManager, updateAgentReserver, JobUpdateStatus.ROLLING_BACK, - UPDATE_ID); + UPDATE_ID, + slaKillController); + } + + /** + * Ensures that if an instance is killed with code {@code slaAware} option in the instructions, + * then the kill is sent to be handled by the {@link SlaKillController}. + */ + @Test + public void testInstanceKillSlaAware() throws Exception { + IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey()); + storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task); + + slaKillController.slaKill( + eq(storageUtil.mutableStoreProvider), + eq(INSTANCE), + eq(task), + eq(UPDATE_ID), + eq(INSTRUCTIONS_SLA_AWARE.getDesiredState().getTask().getSlaPolicy()), + eq(JobUpdateStatus.ROLLING_FORWARD), + anyObject()); + expectLastCall(); + + control.replay(); + + handler.getReevaluationDelay( + INSTANCE, + INSTRUCTIONS_SLA_AWARE, + storageUtil.mutableStoreProvider, + stateManager, + updateAgentReserver, + JobUpdateStatus.ROLLING_FORWARD, + UPDATE_ID, + slaKillController); + } + + /** + * Ensures that if an instance killed while {@link JobUpdateStatus#ROLLING_BACK}, it uses the old + * configuration. + */ + @Test + public void testInstanceKillSlaAwareRollback() throws Exception { + IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 1, "agent01"); + storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task); + + Capture<Consumer<Storage.MutableStoreProvider>> killCommandCapture = createCapture(); + slaKillController.slaKill( + eq(storageUtil.mutableStoreProvider), + eq(INSTANCE), + eq(task), + eq(UPDATE_ID), + eq(INSTRUCTIONS_SLA_AWARE.getInitialState().asList().get(0).getTask().getSlaPolicy()), + eq(JobUpdateStatus.ROLLING_BACK), + capture(killCommandCapture)); + expectLastCall(); + + control.replay(); + + handler.getReevaluationDelay( + INSTANCE, + INSTRUCTIONS_SLA_AWARE, + storageUtil.mutableStoreProvider, + stateManager, + updateAgentReserver, + JobUpdateStatus.ROLLING_BACK, + UPDATE_ID, + slaKillController); + } + + /** + * Ensure the correct behavior of the consumer passed into + * {@link SlaKillController#slaKill}. It should behave as + * {@link KillTask#killAndMaybeReserve}. + */ + @Test + public void testInstanceKillSlaAwareKillCommand() throws Exception { + IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 1, "agent01"); + storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task); + + Capture<Consumer<Storage.MutableStoreProvider>> killCommandCapture = createCapture(); + slaKillController.slaKill( + eq(storageUtil.mutableStoreProvider), + eq(INSTANCE), + eq(task), + eq(UPDATE_ID), + eq(INSTRUCTIONS_SLA_AWARE.getDesiredState().getTask().getSlaPolicy()), + eq(JobUpdateStatus.ROLLING_FORWARD), + capture(killCommandCapture)); + expectLastCall(); + + // Ensure the correct kill command consumer has been passed in and expected behavior occurs + // when executed. + expect(stateManager.changeState( + storageUtil.mutableStoreProvider, + TASK_ID, + Optional.empty(), + ScheduleStatus.KILLING, + Optional.of("Killed for job update " + UPDATE_ID.getId()))) + .andReturn(StateChangeResult.SUCCESS); + updateAgentReserver.reserve(task.getAssignedTask().getSlaveId(), INSTANCE); + expectLastCall(); + + control.replay(); + + new KillTask(true).getReevaluationDelay( + INSTANCE, + INSTRUCTIONS_SLA_AWARE, + storageUtil.mutableStoreProvider, + stateManager, + updateAgentReserver, + JobUpdateStatus.ROLLING_FORWARD, + UPDATE_ID, + slaKillController); + + assertTrue(killCommandCapture.hasCaptured()); + killCommandCapture.getValue().accept(storageUtil.mutableStoreProvider); + } + + /** + * Ensures that if an instance is killed with code {@code slaAware} option in the instructions + * but no {@link org.apache.aurora.gen.SlaPolicy} with the task, then the kill does not fail but + * instead continues as a non-sla-aware kill. + */ + @Test + public void testInstanceKillSlaAwareMissingSlaPolicy() throws Exception { + IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 1, "agent01"); + storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task); + expect(stateManager.changeState( + storageUtil.mutableStoreProvider, + TASK_ID, + Optional.empty(), + ScheduleStatus.KILLING, + Optional.of("Killed for job update " + UPDATE_ID.getId()))) + .andReturn(StateChangeResult.SUCCESS); + + control.replay(); + + handler.getReevaluationDelay( + INSTANCE, + INSTRUCTIONS_SLA_AWARE_NO_POLICY, + storageUtil.mutableStoreProvider, + stateManager, + updateAgentReserver, + JobUpdateStatus.ROLLING_FORWARD, + UPDATE_ID, + slaKillController); + } + + /** + * Ensures that if SLA-aware kill is called while not in an active state we throw an exception. + */ + @Test + public void testInstanceKillSlaAwareBadStatus() { + IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 1, "agent01"); + storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task); + + control.replay(); + + try { + handler.getReevaluationDelay( + INSTANCE, + INSTRUCTIONS_SLA_AWARE_NO_POLICY, + storageUtil.mutableStoreProvider, + stateManager, + updateAgentReserver, + JobUpdateStatus.ROLL_FORWARD_PAUSED, + UPDATE_ID, + slaKillController); + fail(); + } catch (UpdateStateException e) { + // Expected + } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/java/org/apache/aurora/scheduler/updater/SlaKillControllerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/updater/SlaKillControllerTest.java b/src/test/java/org/apache/aurora/scheduler/updater/SlaKillControllerTest.java new file mode 100644 index 0000000..373fb83 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/updater/SlaKillControllerTest.java @@ -0,0 +1,449 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.updater; + +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Consumer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.common.util.BackoffStrategy; +import org.apache.aurora.gen.InstanceTaskConfig; +import org.apache.aurora.gen.JobInstanceUpdateEvent; +import org.apache.aurora.gen.JobUpdate; +import org.apache.aurora.gen.JobUpdateAction; +import org.apache.aurora.gen.JobUpdateDetails; +import org.apache.aurora.gen.JobUpdateEvent; +import org.apache.aurora.gen.JobUpdateInstructions; +import org.apache.aurora.gen.JobUpdateKey; +import org.apache.aurora.gen.JobUpdateSettings; +import org.apache.aurora.gen.JobUpdateStatus; +import org.apache.aurora.gen.PercentageSlaPolicy; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.SlaPolicy; +import org.apache.aurora.scheduler.base.InstanceKeys; +import org.apache.aurora.scheduler.base.JobKeys; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.config.types.TimeAmount; +import org.apache.aurora.scheduler.sla.SlaManager; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.IInstanceKey; +import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; +import org.apache.aurora.scheduler.storage.entities.IJobUpdate; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ISlaPolicy; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.apache.aurora.scheduler.updater.UpdaterModule.UpdateActionBatchWorker; +import org.easymock.Capture; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB; +import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; +import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.newCapture; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class SlaKillControllerTest extends EasyMockTest { + + private static final ITaskConfig OLD_CONFIG = TaskTestUtil.makeConfig(JOB); + private static final SlaPolicy TEST_SLA_POLICY = SlaPolicy.percentageSlaPolicy( + new PercentageSlaPolicy() + .setPercentage(0) + .setDurationSecs(0)); + private static final ITaskConfig NEW_CONFIG = ITaskConfig.build( + TaskTestUtil.makeConfig(JOB).newBuilder().setSlaPolicy(TEST_SLA_POLICY)); + private static final IScheduledTask TASK = IScheduledTask.build( + makeTask("id", OLD_CONFIG).newBuilder().setStatus(ScheduleStatus.RUNNING)); + private static final IAssignedTask ASSIGNED_TASK = TASK.getAssignedTask(); + private static final IInstanceKey INSTANCE_KEY = InstanceKeys.from( + JOB, + ASSIGNED_TASK.getInstanceId()); + private static final IJobUpdate UPDATE = IJobUpdate.build( + new JobUpdate() + .setInstructions(new JobUpdateInstructions() + .setDesiredState(new InstanceTaskConfig() + .setTask(NEW_CONFIG.newBuilder())) + .setSettings(new JobUpdateSettings() + .setSlaAware(true)))); + private static final IJobUpdateKey UPDATE_ID = + IJobUpdateKey.build(new JobUpdateKey(JOB.newBuilder(), "update_id")); + private static final String KILL_ATTEMPTS_STAT_NAME = SlaKillController.SLA_KILL_ATTEMPT + + JobKeys.canonicalString(JOB); + private static final String KILL_SUCCESSES_STAT_NAME = SlaKillController.SLA_KILL_SUCCESS + + JobKeys.canonicalString(JOB); + + private StorageTestUtil storageUtil; + private UpdateActionBatchWorker batchWorker; + private SlaManager slaManager; + private FakeScheduledExecutor clock; + private BackoffStrategy backoffStrategy; + private FakeStatsProvider statsProvider; + private SlaKillController slaKillController; + private CountDownLatch killCommandHasExecuted; + private Consumer<Storage.MutableStoreProvider> fakeKillCommand; + + @Before + public void setUp() { + storageUtil = new StorageTestUtil(this); + storageUtil.expectOperations(); + batchWorker = createMock(UpdateActionBatchWorker.class); + slaManager = createMock(SlaManager.class); + ScheduledExecutorService executor = createMock(ScheduledExecutorService.class); + clock = FakeScheduledExecutor.scheduleExecutor(executor); + backoffStrategy = createMock(BackoffStrategy.class); + statsProvider = new FakeStatsProvider(); + slaKillController = new SlaKillController( + executor, + batchWorker, + slaManager, + clock, + backoffStrategy, + statsProvider); + killCommandHasExecuted = new CountDownLatch(2); + fakeKillCommand = mutableStoreProvider -> killCommandHasExecuted.countDown(); + } + + @Test + public <T, E extends Exception> void testSlaKill() throws Exception { + IJobUpdateDetails updateDetails = IJobUpdateDetails.build( + new JobUpdateDetails( + UPDATE.newBuilder(), + ImmutableList.of(new JobUpdateEvent(JobUpdateStatus.ROLLING_FORWARD, 123L)), + ImmutableList.of())); + expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID)) + .andReturn(Optional.of(updateDetails)) + .anyTimes(); + Capture<IJobInstanceUpdateEvent> instanceUpdateEventCapture = newCapture(); + storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent( + eq(UPDATE_ID), + capture(instanceUpdateEventCapture)); + expectLastCall().times(2); + storageUtil.expectTaskFetch(TASK.getAssignedTask().getTaskId(), TASK); + Capture<Storage.MutateWork<T, E>> workCapture = createCapture(); + slaManager.checkSlaThenAct( + eq(TASK), + eq(ISlaPolicy.build(TEST_SLA_POLICY)), + capture(workCapture), + eq(ImmutableMap.of()), + eq(false)); + expectBatchExecute(batchWorker, storageUtil.storage, control); + expect(backoffStrategy.calculateBackoffMs(0)).andReturn(42L); + + control.replay(); + + // Kill command has not been executed yet + assertEquals(2, killCommandHasExecuted.getCount()); + + // Start an SLA-aware kill + slaKillController.slaKill( + storageUtil.mutableStoreProvider, + INSTANCE_KEY, + TASK, + UPDATE_ID, + ISlaPolicy.build(TEST_SLA_POLICY), + JobUpdateStatus.ROLLING_FORWARD, + fakeKillCommand); + + // Ensure the SLA_CHECKING_MESSAGE message is added + assertTrue( + checkInstanceEventMatches( + instanceUpdateEventCapture.getValue(), + INSTANCE_KEY, + JobUpdateAction.INSTANCE_UPDATING, + SlaKillController.SLA_CHECKING_MESSAGE)); + instanceUpdateEventCapture.reset(); + assertFalse(instanceUpdateEventCapture.hasCaptured()); + + // Pretend SLA passes, executes work + workCapture.getValue().apply(storageUtil.mutableStoreProvider); + assertEquals(1, killCommandHasExecuted.getCount()); + + // Ensure the SLA_PASSED_MESSAGE message is added + assertTrue( + checkInstanceEventMatches( + instanceUpdateEventCapture.getValue(), + INSTANCE_KEY, + JobUpdateAction.INSTANCE_UPDATING, + SlaKillController.SLA_PASSED_MESSAGE)); + } + + /** + * Test that SLA kills are retried in case the SLA check does not pass. + */ + @Test + public <T, E extends Exception> void testSlaKillRetry() throws Exception { + IJobUpdateDetails updateDetails = IJobUpdateDetails.build( + new JobUpdateDetails( + UPDATE.newBuilder(), + ImmutableList.of(new JobUpdateEvent(JobUpdateStatus.ROLLING_FORWARD, 123L)), + ImmutableList.of())); + expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID)) + .andReturn(Optional.of(updateDetails)) + .anyTimes(); + storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(eq(UPDATE_ID), anyObject()); + expectLastCall().times(2); + storageUtil.expectTaskFetch(TASK.getAssignedTask().getTaskId(), TASK).times(2); + storageUtil.expectTaskFetch( + TASK.getAssignedTask().getTaskId(), + IScheduledTask.build(TASK.newBuilder().setStatus(ScheduleStatus.KILLING))); + Capture<Storage.MutateWork<T, E>> workCapture = createCapture(); + slaManager.checkSlaThenAct( + eq(TASK), + eq(ISlaPolicy.build(TEST_SLA_POLICY)), + capture(workCapture), + eq(ImmutableMap.of()), + eq(false)); + expectLastCall().times(2); + expectBatchExecute(batchWorker, storageUtil.storage, control).times(3); + expect(backoffStrategy.calculateBackoffMs(0L)).andReturn(42L); + expect(backoffStrategy.calculateBackoffMs(42L)).andReturn(84L); + + control.replay(); + + // Kill command has not been executed yet + assertFalse(statsProvider.getAllValues().keySet().contains(KILL_ATTEMPTS_STAT_NAME)); + assertFalse(statsProvider.getAllValues().keySet().contains(KILL_SUCCESSES_STAT_NAME)); + assertFalse(workCapture.hasCaptured()); + assertEquals(killCommandHasExecuted.getCount(), 2); + + // Start an SLA-aware kill + slaKillController.slaKill( + storageUtil.mutableStoreProvider, + INSTANCE_KEY, + TASK, + UPDATE_ID, + ISlaPolicy.build(TEST_SLA_POLICY), + JobUpdateStatus.ROLLING_FORWARD, + fakeKillCommand); + + // SLA check is called and discarded, pretending it failed + assertEquals(1, statsProvider.getLongValue(KILL_ATTEMPTS_STAT_NAME)); + assertFalse(statsProvider.getAllValues().keySet().contains(KILL_SUCCESSES_STAT_NAME)); + assertTrue(workCapture.hasCaptured()); + workCapture.reset(); + assertEquals(2, killCommandHasExecuted.getCount()); + assertFalse(workCapture.hasCaptured()); + + // Another SLA kill is scheduled assuming the previous attempt failed + assertEquals(1, clock.countDeferredWork()); + clock.advance(TimeAmount.of(42L, Time.MILLISECONDS)); + + // The second SLA check passes and the kill function is called + assertEquals(2, killCommandHasExecuted.getCount()); + workCapture.getValue().apply(storageUtil.mutableStoreProvider); + assertEquals(1, killCommandHasExecuted.getCount()); + assertEquals(2, statsProvider.getLongValue(KILL_ATTEMPTS_STAT_NAME)); + assertEquals(1, statsProvider.getLongValue(KILL_SUCCESSES_STAT_NAME)); + + // One more SLA kill is scheduled assuming the previous attempt failed. Since the previous + // attempt did not fail, we do a NOOP since the task is already KILLING + clock.advance(TimeAmount.of(84L, Time.MILLISECONDS)); + assertEquals(2, statsProvider.getLongValue(KILL_ATTEMPTS_STAT_NAME)); + assertEquals(1, statsProvider.getLongValue(KILL_SUCCESSES_STAT_NAME)); + assertEquals(1, killCommandHasExecuted.getCount()); + assertEquals(0, clock.countDeferredWork()); + } + + @Test + public <T, E extends Exception> void testSlaKillRollingBack() throws Exception { + IJobUpdateDetails updateDetails = IJobUpdateDetails.build( + new JobUpdateDetails( + UPDATE.newBuilder(), + ImmutableList.of(new JobUpdateEvent(JobUpdateStatus.ROLLING_BACK, 123L)), + ImmutableList.of())); + expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID)) + .andReturn(Optional.of(updateDetails)) + .anyTimes(); + Capture<IJobInstanceUpdateEvent> instanceUpdateEventCapture = newCapture(); + storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent( + eq(UPDATE_ID), + capture(instanceUpdateEventCapture)); + expectLastCall().times(2); + storageUtil.expectTaskFetch(TASK.getAssignedTask().getTaskId(), TASK); + Capture<Storage.MutateWork<T, E>> workCapture = createCapture(); + slaManager.checkSlaThenAct( + eq(TASK), + eq(ISlaPolicy.build(TEST_SLA_POLICY)), + capture(workCapture), + eq(ImmutableMap.of()), + eq(false)); + expectBatchExecute(batchWorker, storageUtil.storage, control); + expect(backoffStrategy.calculateBackoffMs(0)).andReturn(42L); + + control.replay(); + + // Kill command has not been executed yet + assertEquals(2, killCommandHasExecuted.getCount()); + + // Start an SLA-aware kill + slaKillController.slaKill( + storageUtil.mutableStoreProvider, + INSTANCE_KEY, + TASK, + UPDATE_ID, + ISlaPolicy.build(TEST_SLA_POLICY), + JobUpdateStatus.ROLLING_BACK, + fakeKillCommand); + + // Ensure the SLA_CHECKING_MESSAGE message is added with ROLLING_BACK action + assertTrue( + checkInstanceEventMatches( + instanceUpdateEventCapture.getValue(), + INSTANCE_KEY, + JobUpdateAction.INSTANCE_ROLLING_BACK, + SlaKillController.SLA_CHECKING_MESSAGE)); + instanceUpdateEventCapture.reset(); + assertFalse(instanceUpdateEventCapture.hasCaptured()); + + // Pretend SLA passes, executes work + workCapture.getValue().apply(storageUtil.mutableStoreProvider); + assertEquals(1, killCommandHasExecuted.getCount()); + + // Ensure the SLA_PASSED_MESSAGE message is added with ROLLING_BACK action + assertTrue( + checkInstanceEventMatches( + instanceUpdateEventCapture.getValue(), + INSTANCE_KEY, + JobUpdateAction.INSTANCE_ROLLING_BACK, + SlaKillController.SLA_PASSED_MESSAGE)); + } + + @Test + public void testSlaKillFailOnPause() throws Exception { + IJobUpdateDetails updateDetails = IJobUpdateDetails.build( + new JobUpdateDetails( + UPDATE.newBuilder(), + ImmutableList.of(new JobUpdateEvent(JobUpdateStatus.ROLLING_FORWARD, 123L)), + ImmutableList.of())); + IJobUpdateDetails pausedUpdateDetails = IJobUpdateDetails.build( + new JobUpdateDetails( + UPDATE.newBuilder(), + ImmutableList.of(new JobUpdateEvent(JobUpdateStatus.ROLL_FORWARD_PAUSED, 123L)), + ImmutableList.of())); + expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID)) + .andReturn(Optional.of(updateDetails)); + storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(eq(UPDATE_ID), anyObject()); + expectLastCall(); + expectBatchExecute(batchWorker, storageUtil.storage, control); + expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID)) + .andReturn(Optional.of(pausedUpdateDetails)); + + control.replay(); + + // Kill command has not been executed yet + assertEquals(2, killCommandHasExecuted.getCount()); + + // Start an SLA-aware kill + slaKillController.slaKill( + storageUtil.mutableStoreProvider, + INSTANCE_KEY, + TASK, + UPDATE_ID, + ISlaPolicy.build(TEST_SLA_POLICY), + JobUpdateStatus.ROLLING_FORWARD, + fakeKillCommand); + + // Nothing should happen since status has changed + assertEquals(2, killCommandHasExecuted.getCount()); + assertEquals(0, clock.countDeferredWork()); + assertFalse(statsProvider.getAllValues().keySet().contains(KILL_ATTEMPTS_STAT_NAME)); + } + + @Test + public void testSlaKillNoDuplicateEvents() throws Exception { + IJobUpdateDetails updateDetails = IJobUpdateDetails.build( + new JobUpdateDetails( + UPDATE.newBuilder(), + ImmutableList.of(new JobUpdateEvent(JobUpdateStatus.ROLLING_FORWARD, 123L)), + ImmutableList.of( + new JobInstanceUpdateEvent() + .setInstanceId(INSTANCE_KEY.getInstanceId()) + .setAction(JobUpdateAction.INSTANCE_UPDATING) + .setMessage(SlaKillController.SLA_CHECKING_MESSAGE)))); + IJobUpdateDetails pausedUpdateDetails = IJobUpdateDetails.build( + new JobUpdateDetails( + UPDATE.newBuilder(), + ImmutableList.of(new JobUpdateEvent(JobUpdateStatus.ROLL_FORWARD_PAUSED, 123L)), + ImmutableList.of())); + expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID)) + .andReturn(Optional.of(updateDetails)); + expectBatchExecute(batchWorker, storageUtil.storage, control); + expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID)) + .andReturn(Optional.of(pausedUpdateDetails)); + + control.replay(); + + // Start an SLA-aware kill, update already contains event so we don't expect a save + slaKillController.slaKill( + storageUtil.mutableStoreProvider, + INSTANCE_KEY, + TASK, + UPDATE_ID, + ISlaPolicy.build(TEST_SLA_POLICY), + JobUpdateStatus.ROLLING_FORWARD, + fakeKillCommand); + } + + @Test + public void testSlaKillInvalidStatus() { + control.replay(); + + // Start an SLA-aware kill, throws an exception since the kill was called while the update + // was not active + try { + slaKillController.slaKill( + storageUtil.mutableStoreProvider, + INSTANCE_KEY, + TASK, + UPDATE_ID, + ISlaPolicy.build(TEST_SLA_POLICY), + JobUpdateStatus.ROLL_FORWARD_PAUSED, + fakeKillCommand); + } catch (RuntimeException e) { + return; + } + + fail(); + } + + private boolean checkInstanceEventMatches(IJobInstanceUpdateEvent event, + IInstanceKey instance, + JobUpdateAction action, + String message) { + + return event.getInstanceId() == instance.getInstanceId() + && event.getAction() == action + && event.getMessage().equals(message); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/python/apache/aurora/client/cli/test_inspect.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/cli/test_inspect.py b/src/test/python/apache/aurora/client/cli/test_inspect.py index 2baba2a..58bcac1 100644 --- a/src/test/python/apache/aurora/client/cli/test_inspect.py +++ b/src/test/python/apache/aurora/client/cli/test_inspect.py @@ -112,7 +112,8 @@ Process 'process': "watch_secs": 45, "rollback_on_failure": True, "max_per_shard_failures": 0, - "max_total_failures": 0}, + "max_total_failures": 0, + "sla_aware": False}, "name": "the_job", "max_task_failures": 1, "cron_collision_policy": "KILL_EXISTING", http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobInstanceUpdateEvent ---------------------------------------------------------------------- diff --git a/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobInstanceUpdateEvent b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobInstanceUpdateEvent index 48902d3..8360998 100644 --- a/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobInstanceUpdateEvent +++ b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobInstanceUpdateEvent @@ -11,6 +11,9 @@ }, "3": { "i32": 1 + }, + "4": { + "str": "string-value" } } }, http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobUpdate ---------------------------------------------------------------------- diff --git a/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobUpdate b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobUpdate index 08dfa5b..3876767 100644 --- a/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobUpdate +++ b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobUpdate @@ -485,6 +485,9 @@ }, "9": { "i32": 2 + }, + "10": { + "tf": 1 } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/ui/src/main/js/components/UpdateInstanceEvents.js ---------------------------------------------------------------------- diff --git a/ui/src/main/js/components/UpdateInstanceEvents.js b/ui/src/main/js/components/UpdateInstanceEvents.js index 8351f2c..ab6d3df 100644 --- a/ui/src/main/js/components/UpdateInstanceEvents.js +++ b/ui/src/main/js/components/UpdateInstanceEvents.js @@ -30,7 +30,8 @@ export class InstanceEvent extends React.Component { getClassForUpdateAction(e.action), (i === events.length - 1) ? ' active' : ''), state: UPDATE_ACTION[e.action], - timestamp: e.timestampMs + timestamp: e.timestampMs, + message: e.message }; }); http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/ui/src/main/js/components/UpdateSettings.js ---------------------------------------------------------------------- diff --git a/ui/src/main/js/components/UpdateSettings.js b/ui/src/main/js/components/UpdateSettings.js index d756f59..d7fbe00 100644 --- a/ui/src/main/js/components/UpdateSettings.js +++ b/ui/src/main/js/components/UpdateSettings.js @@ -25,6 +25,10 @@ export default function UpdateSettings({ update }) { <td>Rollback On Failure?</td> <td>{settings.rollbackOnFailure ? 'yes' : 'no'}</td> </tr> + <tr> + <td>SLA-Aware?</td> + <td>{settings.slaAware ? 'yes' : 'no'}</td> + </tr> </table> </div>); }
