Repository: aurora Updated Branches: refs/heads/master 3e1f82359 -> c912c3459
Handling task event race in updater. Bugs closed: AURORA-1506, AURORA-1507 Reviewed at https://reviews.apache.org/r/41226/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/c912c345 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/c912c345 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/c912c345 Branch: refs/heads/master Commit: c912c3459f380e698aaf6010af3b78d43cae56b9 Parents: 3e1f823 Author: Maxim Khutornenko <[email protected]> Authored: Thu Dec 17 10:05:04 2015 -0800 Committer: Maxim Khutornenko <[email protected]> Committed: Thu Dec 17 10:05:04 2015 -0800 ---------------------------------------------------------------------- .../updater/InstanceActionHandler.java | 59 ++++++++---- .../aurora/scheduler/updater/AddTaskTest.java | 57 ++++++++++-- .../aurora/scheduler/updater/KillTaskTest.java | 95 ++++++++++++++++++++ 3 files changed, 187 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/c912c345/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java index d8686f1..0880cf2 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java @@ -31,6 +31,7 @@ import org.apache.aurora.scheduler.storage.entities.IInstanceKey; import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig; import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions; import org.apache.aurora.scheduler.storage.entities.IRange; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD; @@ -47,6 +48,14 @@ interface InstanceActionHandler { Logger LOG = Logger.getLogger(InstanceActionHandler.class.getName()); + static Optional<IScheduledTask> getExistingTask( + MutableStoreProvider storeProvider, + IInstanceKey instance) { + + return Optional.fromNullable(Iterables.getOnlyElement( + storeProvider.getTaskStore().fetchTasks(Query.instanceScoped(instance).active()), null)); + } + class AddTask implements InstanceActionHandler { private static ITaskConfig getTargetConfig( IJobUpdateInstructions instructions, @@ -77,16 +86,23 @@ interface InstanceActionHandler { StateManager stateManager, JobUpdateStatus status) { - LOG.info("Adding instance " + instance + " while " + status); - ITaskConfig replacement = getTargetConfig( - instructions, - status == ROLLING_FORWARD, - instance.getInstanceId()); - stateManager.insertPendingTasks( - storeProvider, - replacement, - ImmutableSet.of(instance.getInstanceId())); - return Amount.of( + Optional<IScheduledTask> task = getExistingTask(storeProvider, instance); + if (task.isPresent()) { + // Due to async event processing it's possible to have a race between task event + // and instance addition. This is a perfectly valid case. + LOG.info("Instance " + instance + " already exists while " + status); + } else { + LOG.info("Adding instance " + instance + " while " + status); + ITaskConfig replacement = getTargetConfig( + instructions, + status == ROLLING_FORWARD, + instance.getInstanceId()); + stateManager.insertPendingTasks( + storeProvider, + replacement, + ImmutableSet.of(instance.getInstanceId())); + } + return Amount.of( (long) instructions.getSettings().getMaxWaitToInstanceRunningMs(), Time.MILLISECONDS); } @@ -101,15 +117,20 @@ interface InstanceActionHandler { StateManager stateManager, JobUpdateStatus status) { - String taskId = Tasks.id(Iterables.getOnlyElement( - storeProvider.getTaskStore().fetchTasks(Query.instanceScoped(instance).active()))); - LOG.info("Killing " + instance + " while " + status); - stateManager.changeState( - storeProvider, - taskId, - Optional.absent(), - ScheduleStatus.KILLING, - Optional.of("Killed for job update.")); + Optional<IScheduledTask> task = getExistingTask(storeProvider, instance); + if (task.isPresent()) { + LOG.info("Killing " + instance + " while " + status); + stateManager.changeState( + storeProvider, + Tasks.id(task.get()), + Optional.absent(), + ScheduleStatus.KILLING, + Optional.of("Killed for job update.")); + } else { + // Due to async event processing it's possible to have a race between task event + // and it's deletion from the store. This is a perfectly valid case. + LOG.info("No active instance " + instance + " to kill while " + status); + } return Amount.of( (long) instructions.getSettings().getMaxWaitToInstanceRunningMs(), Time.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/aurora/blob/c912c345/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java b/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java index 0583a63..56c94b5 100644 --- a/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java +++ b/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java @@ -13,48 +13,95 @@ */ package org.apache.aurora.scheduler.updater; +import com.google.common.collect.ImmutableSet; + import org.apache.aurora.common.testing.easymock.EasyMockTest; import org.apache.aurora.gen.InstanceKey; +import org.apache.aurora.gen.InstanceTaskConfig; import org.apache.aurora.gen.JobUpdateInstructions; import org.apache.aurora.gen.JobUpdateSettings; import org.apache.aurora.gen.JobUpdateStatus; +import org.apache.aurora.gen.Range; +import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.base.JobKeys; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.entities.IInstanceKey; import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.junit.Before; import org.junit.Test; -import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; - public class AddTaskTest extends EasyMockTest { private static final IJobUpdateInstructions INSTRUCTIONS = IJobUpdateInstructions.build( new JobUpdateInstructions() + .setDesiredState(new InstanceTaskConfig() + .setTask(new TaskConfig()) + .setInstances(ImmutableSet.of(new Range(0, 0)))) .setSettings( new JobUpdateSettings() .setMinWaitInInstanceRunningMs(1000))); private static final IInstanceKey INSTANCE = IInstanceKey.build(new InstanceKey(JobKeys.from("role", "env", "job").newBuilder(), 0)); - private MutableStoreProvider storeProvider; + private StorageTestUtil storageUtil; private StateManager stateManager; private InstanceActionHandler handler; @Before public void setUp() { + storageUtil = new StorageTestUtil(this); + storageUtil.expectOperations(); stateManager = createMock(StateManager.class); - storeProvider = createMock(MutableStoreProvider.class); handler = new InstanceActionHandler.AddTask(); } + @Test + public void testAddInstance() throws Exception { + storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active()); + + stateManager.insertPendingTasks( + storageUtil.mutableStoreProvider, + INSTRUCTIONS.getDesiredState().getTask(), + ImmutableSet.of(0)); + + control.replay(); + + handler.getReevaluationDelay( + INSTANCE, + INSTRUCTIONS, + storageUtil.mutableStoreProvider, + stateManager, + JobUpdateStatus.ROLLING_FORWARD); + } + + @Test + public void testAddInstanceCollisionDoesNotThrow() throws Exception { + storageUtil.expectTaskFetch( + Query.instanceScoped(INSTANCE).active(), + TaskTestUtil.makeTask("id", INSTANCE.getJobKey())); + + control.replay(); + + handler.getReevaluationDelay( + INSTANCE, + INSTRUCTIONS, + storageUtil.mutableStoreProvider, + stateManager, + JobUpdateStatus.ROLLING_FORWARD); + } + @Test(expected = IllegalStateException.class) public void testInstanceNotFound() throws Exception { + storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active()); + control.replay(); handler.getReevaluationDelay( INSTANCE, INSTRUCTIONS, - storeProvider, + storageUtil.mutableStoreProvider, stateManager, JobUpdateStatus.ROLLING_BACK); } http://git-wip-us.apache.org/repos/asf/aurora/blob/c912c345/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java b/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java new file mode 100644 index 0000000..e5935f6 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java @@ -0,0 +1,95 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.updater; + +import com.google.common.base.Optional; + +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.gen.InstanceKey; +import org.apache.aurora.gen.JobUpdateInstructions; +import org.apache.aurora.gen.JobUpdateSettings; +import org.apache.aurora.gen.JobUpdateStatus; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.scheduler.base.JobKeys; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.state.StateChangeResult; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.storage.entities.IInstanceKey; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.junit.Before; +import org.junit.Test; + +import static org.easymock.EasyMock.expect; + +public class KillTaskTest extends EasyMockTest { + private static final IJobUpdateInstructions INSTRUCTIONS = IJobUpdateInstructions.build( + new JobUpdateInstructions() + .setSettings( + new JobUpdateSettings() + .setMinWaitInInstanceRunningMs(1000))); + private static final IInstanceKey INSTANCE = + IInstanceKey.build(new InstanceKey(JobKeys.from("role", "env", "job").newBuilder(), 0)); + + private StorageTestUtil storageUtil; + private StateManager stateManager; + private InstanceActionHandler handler; + + @Before + public void setUp() { + storageUtil = new StorageTestUtil(this); + storageUtil.expectOperations(); + stateManager = createMock(StateManager.class); + handler = new InstanceActionHandler.KillTask(); + } + + @Test + public void testInstanceKill() throws Exception { + String id = "task_id"; + storageUtil.expectTaskFetch( + Query.instanceScoped(INSTANCE).active(), + TaskTestUtil.makeTask(id, INSTANCE.getJobKey())); + + expect(stateManager.changeState( + storageUtil.mutableStoreProvider, + id, + Optional.absent(), + ScheduleStatus.KILLING, + Optional.of("Killed for job update."))).andReturn(StateChangeResult.SUCCESS); + + control.replay(); + + handler.getReevaluationDelay( + INSTANCE, + INSTRUCTIONS, + storageUtil.mutableStoreProvider, + stateManager, + JobUpdateStatus.ROLLING_BACK); + } + + @Test + public void testInstanceNotFoundDoesNotThrow() throws Exception { + storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active()); + + control.replay(); + + handler.getReevaluationDelay( + INSTANCE, + INSTRUCTIONS, + storageUtil.mutableStoreProvider, + stateManager, + JobUpdateStatus.ROLLING_BACK); + } +}
