Repository: aurora Updated Branches: refs/heads/master 21ad18ec7 -> d4ebb56ba
Reduce storage write lock contention by adopting Double-Checked Locking pattern in TimedOutTaskHandler. `TimedOutTaskHandler` acquires storage write lock for every task every time they transition to a transient state. It then verifies after a default time-out period of 5 minutes if the task has transitioned out of the transient state. The verification step takes place while holding the storage write lock. In over 99% of cases the logic short-circuits and returns from `StateManagerImpl.updateTaskAndExternalState()` once it learns task has transitioned out of the transient state. This patch reduces storage write lock contention by adopting Double-Checked Locking pattern in `TimedOutTaskHandler.run()`. Bugs closed: AURORA-1820 Reviewed at https://reviews.apache.org/r/55179/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/d4ebb56b Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/d4ebb56b Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/d4ebb56b Branch: refs/heads/master Commit: d4ebb56ba7f02b4f921d37518185af20f253a44f Parents: 21ad18e Author: Mehrdad Nurolahzade <[email protected]> Authored: Wed Jan 4 15:50:46 2017 -0600 Committer: Joshua Cohen <[email protected]> Committed: Wed Jan 4 15:50:46 2017 -0600 ---------------------------------------------------------------------- .../scheduler/reconciliation/TaskTimeout.java | 36 ++++++++++++-------- .../reconciliation/TaskTimeoutTest.java | 36 ++++++++++---------- 2 files changed, 39 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/d4ebb56b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java index 2dc9bc2..8e9a0d3 100644 --- a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java +++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java @@ -35,6 +35,7 @@ import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.state.StateChangeResult; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,21 +113,26 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber { @Override public void run() { if (isRunning()) { - // This query acts as a CAS by including the state that we expect the task to be in - // if the timeout is still valid. Ideally, the future would have already been - // canceled, but in the event of a state transition race, including transientState - // prevents an unintended task timeout. - // Note: This requires LOST transitions trigger Driver.killTask. - StateChangeResult result = storage.write(storeProvider -> stateManager.changeState( - storeProvider, - taskId, - Optional.of(newState), - ScheduleStatus.LOST, - TIMEOUT_MESSAGE)); - - if (result == StateChangeResult.SUCCESS) { - LOG.info("Timeout reached for task " + taskId + ":" + taskId); - timedOutTasks.incrementAndGet(); + Optional<IScheduledTask> task = storage.read( + storeProvider -> storeProvider.getTaskStore().fetchTask(taskId)); + // Double-Checked Locking: acquire storage write lock only if necessary + if (task.isPresent() && task.get().getStatus() == newState) { + // This query acts as a CAS by including the state that we expect the task to be in + // if the timeout is still valid. Ideally, the future would have already been + // canceled, but in the event of a state transition race, including transientState + // prevents an unintended task timeout. + // Note: This requires LOST transitions trigger Driver.killTask. + StateChangeResult result = storage.write(storeProvider -> stateManager.changeState( + storeProvider, + taskId, + Optional.of(newState), + ScheduleStatus.LOST, + TIMEOUT_MESSAGE)); + + if (result == StateChangeResult.SUCCESS) { + LOG.info("Timeout reached for task " + taskId + ":" + taskId); + timedOutTasks.incrementAndGet(); + } } } else { // Our service is not yet started. We don't want to lose track of the task, so http://git-wip-us.apache.org/repos/asf/aurora/blob/d4ebb56b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java index 1006ddb..9da99c6 100644 --- a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java +++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java @@ -70,7 +70,6 @@ public class TaskTimeoutTest extends EasyMockTest { public void setUp() { executor = createMock(DelayExecutor.class); storageUtil = new StorageTestUtil(this); - storageUtil.expectOperations(); stateManager = createMock(StateManager.class); clock = new FakeClock(); statsProvider = createMock(StatsProvider.class); @@ -130,24 +129,25 @@ public class TaskTimeoutTest extends EasyMockTest { public void testTransientToTransient() { expectTaskWatch(); Capture<Runnable> killingTimeout = expectTaskWatch(); - expect(stateManager.changeState( - storageUtil.mutableStoreProvider, - TASK_ID, - Optional.of(KILLING), - LOST, - TaskTimeout.TIMEOUT_MESSAGE)) - .andReturn(StateChangeResult.SUCCESS); + expect(storageUtil.storeProvider.getTaskStore()).andReturn(storageUtil.taskStore); + storageUtil.expectRead(); + storageUtil.expectTaskFetch(TASK_ID, makeTask(TASK_ID, ASSIGNED)); replayAndCreate(); changeState(PENDING, ASSIGNED); changeState(ASSIGNED, KILLING); killingTimeout.getValue().run(); + assertEquals(0, timedOutTaskCounter.intValue()); } @Test public void testTimeout() throws Exception { Capture<Runnable> assignedTimeout = expectTaskWatch(); + expect(storageUtil.storeProvider.getTaskStore()).andReturn(storageUtil.taskStore); + storageUtil.expectRead(); + storageUtil.expectTaskFetch(TASK_ID, makeTask(TASK_ID, ASSIGNED)); + storageUtil.expectWrite(); expect(stateManager.changeState( storageUtil.mutableStoreProvider, TASK_ID, @@ -161,26 +161,26 @@ public class TaskTimeoutTest extends EasyMockTest { changeState(INIT, PENDING); changeState(PENDING, ASSIGNED); assignedTimeout.getValue().run(); - assertEquals(timedOutTaskCounter.intValue(), 1); + assertEquals(1, timedOutTaskCounter.intValue()); } @Test public void testTaskDeleted() throws Exception { Capture<Runnable> assignedTimeout = expectTaskWatch(); - expect(stateManager.changeState( - storageUtil.mutableStoreProvider, - TASK_ID, - Optional.of(KILLING), - LOST, - TaskTimeout.TIMEOUT_MESSAGE)) - .andReturn(StateChangeResult.ILLEGAL); + expect(storageUtil.storeProvider.getTaskStore()).andReturn(storageUtil.taskStore); + storageUtil.expectRead(); + storageUtil.expectTaskFetch(TASK_ID); replayAndCreate(); changeState(INIT, PENDING); changeState(PENDING, KILLING); assignedTimeout.getValue().run(); - assertEquals(timedOutTaskCounter.intValue(), 0); + assertEquals(0, timedOutTaskCounter.intValue()); + } + + private static IScheduledTask makeTask(String taskId, ScheduleStatus status) { + return makeTask(taskId, status, 0L); } private static IScheduledTask makeTask( @@ -231,6 +231,6 @@ public class TaskTimeoutTest extends EasyMockTest { changeState(INIT, PENDING); changeState(PENDING, ASSIGNED); assignedTimeout.getValue().run(); - assertEquals(timedOutTaskCounter.intValue(), 0); + assertEquals(0, timedOutTaskCounter.intValue()); } }
