Repository: aurora
Updated Branches:
  refs/heads/master 3e1f82359 -> c912c3459


Handling task event race in updater.

Bugs closed: AURORA-1506, AURORA-1507

Reviewed at https://reviews.apache.org/r/41226/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/c912c345
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/c912c345
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/c912c345

Branch: refs/heads/master
Commit: c912c3459f380e698aaf6010af3b78d43cae56b9
Parents: 3e1f823
Author: Maxim Khutornenko <[email protected]>
Authored: Thu Dec 17 10:05:04 2015 -0800
Committer: Maxim Khutornenko <[email protected]>
Committed: Thu Dec 17 10:05:04 2015 -0800

----------------------------------------------------------------------
 .../updater/InstanceActionHandler.java          | 59 ++++++++----
 .../aurora/scheduler/updater/AddTaskTest.java   | 57 ++++++++++--
 .../aurora/scheduler/updater/KillTaskTest.java  | 95 ++++++++++++++++++++
 3 files changed, 187 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/c912c345/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java 
b/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
index d8686f1..0880cf2 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
@@ -31,6 +31,7 @@ import 
org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
 import org.apache.aurora.scheduler.storage.entities.IRange;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
@@ -47,6 +48,14 @@ interface InstanceActionHandler {
 
   Logger LOG = Logger.getLogger(InstanceActionHandler.class.getName());
 
+  static Optional<IScheduledTask> getExistingTask(
+      MutableStoreProvider storeProvider,
+      IInstanceKey instance) {
+
+    return Optional.fromNullable(Iterables.getOnlyElement(
+        
storeProvider.getTaskStore().fetchTasks(Query.instanceScoped(instance).active()),
 null));
+  }
+
   class AddTask implements InstanceActionHandler {
     private static ITaskConfig getTargetConfig(
         IJobUpdateInstructions instructions,
@@ -77,16 +86,23 @@ interface InstanceActionHandler {
         StateManager stateManager,
         JobUpdateStatus status) {
 
-      LOG.info("Adding instance " + instance + " while " + status);
-      ITaskConfig replacement = getTargetConfig(
-          instructions,
-          status == ROLLING_FORWARD,
-          instance.getInstanceId());
-      stateManager.insertPendingTasks(
-          storeProvider,
-          replacement,
-          ImmutableSet.of(instance.getInstanceId()));
-      return  Amount.of(
+      Optional<IScheduledTask> task = getExistingTask(storeProvider, instance);
+      if (task.isPresent()) {
+        // Due to async event processing it's possible to have a race between 
task event
+        // and instance addition. This is a perfectly valid case.
+        LOG.info("Instance " + instance + " already exists while " + status);
+      } else {
+        LOG.info("Adding instance " + instance + " while " + status);
+        ITaskConfig replacement = getTargetConfig(
+            instructions,
+            status == ROLLING_FORWARD,
+            instance.getInstanceId());
+        stateManager.insertPendingTasks(
+            storeProvider,
+            replacement,
+            ImmutableSet.of(instance.getInstanceId()));
+      }
+      return Amount.of(
           (long) instructions.getSettings().getMaxWaitToInstanceRunningMs(),
           Time.MILLISECONDS);
     }
@@ -101,15 +117,20 @@ interface InstanceActionHandler {
         StateManager stateManager,
         JobUpdateStatus status) {
 
-      String taskId = Tasks.id(Iterables.getOnlyElement(
-          
storeProvider.getTaskStore().fetchTasks(Query.instanceScoped(instance).active())));
-      LOG.info("Killing " + instance + " while " + status);
-      stateManager.changeState(
-          storeProvider,
-          taskId,
-          Optional.absent(),
-          ScheduleStatus.KILLING,
-          Optional.of("Killed for job update."));
+      Optional<IScheduledTask> task = getExistingTask(storeProvider, instance);
+      if (task.isPresent()) {
+        LOG.info("Killing " + instance + " while " + status);
+        stateManager.changeState(
+            storeProvider,
+            Tasks.id(task.get()),
+            Optional.absent(),
+            ScheduleStatus.KILLING,
+            Optional.of("Killed for job update."));
+      } else {
+        // Due to async event processing it's possible to have a race between 
task event
+        // and it's deletion from the store. This is a perfectly valid case.
+        LOG.info("No active instance " + instance + " to kill while " + 
status);
+      }
       return Amount.of(
           (long) instructions.getSettings().getMaxWaitToInstanceRunningMs(),
           Time.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/aurora/blob/c912c345/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java 
b/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java
index 0583a63..56c94b5 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java
@@ -13,48 +13,95 @@
  */
 package org.apache.aurora.scheduler.updater;
 
+import com.google.common.collect.ImmutableSet;
+
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.gen.InstanceKey;
+import org.apache.aurora.gen.InstanceTaskConfig;
 import org.apache.aurora.gen.JobUpdateInstructions;
 import org.apache.aurora.gen.JobUpdateSettings;
 import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.Range;
+import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-
 public class AddTaskTest extends EasyMockTest {
   private static final IJobUpdateInstructions INSTRUCTIONS = 
IJobUpdateInstructions.build(
       new JobUpdateInstructions()
+          .setDesiredState(new InstanceTaskConfig()
+              .setTask(new TaskConfig())
+              .setInstances(ImmutableSet.of(new Range(0, 0))))
           .setSettings(
               new JobUpdateSettings()
                   .setMinWaitInInstanceRunningMs(1000)));
   private static final IInstanceKey INSTANCE =
       IInstanceKey.build(new InstanceKey(JobKeys.from("role", "env", 
"job").newBuilder(), 0));
 
-  private MutableStoreProvider storeProvider;
+  private StorageTestUtil storageUtil;
   private StateManager stateManager;
   private InstanceActionHandler handler;
 
   @Before
   public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
     stateManager = createMock(StateManager.class);
-    storeProvider = createMock(MutableStoreProvider.class);
     handler = new InstanceActionHandler.AddTask();
   }
 
+  @Test
+  public void testAddInstance() throws Exception {
+    storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active());
+
+    stateManager.insertPendingTasks(
+        storageUtil.mutableStoreProvider,
+        INSTRUCTIONS.getDesiredState().getTask(),
+        ImmutableSet.of(0));
+
+    control.replay();
+
+    handler.getReevaluationDelay(
+        INSTANCE,
+        INSTRUCTIONS,
+        storageUtil.mutableStoreProvider,
+        stateManager,
+        JobUpdateStatus.ROLLING_FORWARD);
+  }
+
+  @Test
+  public void testAddInstanceCollisionDoesNotThrow() throws Exception {
+    storageUtil.expectTaskFetch(
+        Query.instanceScoped(INSTANCE).active(),
+        TaskTestUtil.makeTask("id", INSTANCE.getJobKey()));
+
+    control.replay();
+
+    handler.getReevaluationDelay(
+        INSTANCE,
+        INSTRUCTIONS,
+        storageUtil.mutableStoreProvider,
+        stateManager,
+        JobUpdateStatus.ROLLING_FORWARD);
+  }
+
   @Test(expected = IllegalStateException.class)
   public void testInstanceNotFound() throws Exception {
+    storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active());
+
     control.replay();
 
     handler.getReevaluationDelay(
         INSTANCE,
         INSTRUCTIONS,
-        storeProvider,
+        storageUtil.mutableStoreProvider,
         stateManager,
         JobUpdateStatus.ROLLING_BACK);
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/c912c345/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java 
b/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java
new file mode 100644
index 0000000..e5935f6
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import com.google.common.base.Optional;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.InstanceKey;
+import org.apache.aurora.gen.JobUpdateInstructions;
+import org.apache.aurora.gen.JobUpdateSettings;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.state.StateChangeResult;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+
+public class KillTaskTest extends EasyMockTest {
+  private static final IJobUpdateInstructions INSTRUCTIONS = 
IJobUpdateInstructions.build(
+      new JobUpdateInstructions()
+          .setSettings(
+              new JobUpdateSettings()
+                  .setMinWaitInInstanceRunningMs(1000)));
+  private static final IInstanceKey INSTANCE =
+      IInstanceKey.build(new InstanceKey(JobKeys.from("role", "env", 
"job").newBuilder(), 0));
+
+  private StorageTestUtil storageUtil;
+  private StateManager stateManager;
+  private InstanceActionHandler handler;
+
+  @Before
+  public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    stateManager = createMock(StateManager.class);
+    handler = new InstanceActionHandler.KillTask();
+  }
+
+  @Test
+  public void testInstanceKill() throws Exception {
+    String id = "task_id";
+    storageUtil.expectTaskFetch(
+        Query.instanceScoped(INSTANCE).active(),
+        TaskTestUtil.makeTask(id, INSTANCE.getJobKey()));
+
+    expect(stateManager.changeState(
+        storageUtil.mutableStoreProvider,
+        id,
+        Optional.absent(),
+        ScheduleStatus.KILLING,
+        Optional.of("Killed for job 
update."))).andReturn(StateChangeResult.SUCCESS);
+
+    control.replay();
+
+    handler.getReevaluationDelay(
+        INSTANCE,
+        INSTRUCTIONS,
+        storageUtil.mutableStoreProvider,
+        stateManager,
+        JobUpdateStatus.ROLLING_BACK);
+  }
+
+  @Test
+  public void testInstanceNotFoundDoesNotThrow() throws Exception {
+    storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active());
+
+    control.replay();
+
+    handler.getReevaluationDelay(
+        INSTANCE,
+        INSTRUCTIONS,
+        storageUtil.mutableStoreProvider,
+        stateManager,
+        JobUpdateStatus.ROLLING_BACK);
+  }
+}

Reply via email to