Repository: aurora Updated Branches: refs/heads/master f55f46743 -> d10d2d171
Resuming blocked updates on restart. Bugs closed: AURORA-1285 Reviewed at https://reviews.apache.org/r/33374/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/d10d2d17 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/d10d2d17 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/d10d2d17 Branch: refs/heads/master Commit: d10d2d17142dad405c8b154321931cef59512866 Parents: f55f467 Author: Maxim Khutornenko <[email protected]> Authored: Tue Apr 21 14:28:27 2015 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Tue Apr 21 14:28:27 2015 -0700 ---------------------------------------------------------------------- .../updater/JobUpdateControllerImpl.java | 16 +++-- .../updater/JobUpdateStateMachine.java | 9 ++- .../aurora/scheduler/updater/JobUpdaterIT.java | 72 ++++++++++++++++++++ 3 files changed, 88 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/d10d2d17/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java index ac15217..1ebfa64 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java @@ -83,6 +83,7 @@ import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE; import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import static org.apache.aurora.scheduler.storage.Storage.MutateWork; import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.ACTIVE_QUERY; +import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.AUTO_RESUME_STATES; import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.GET_ACTIVE_RESUME_STATE; import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.GET_BLOCKED_RESUME_STATE; import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.GET_PAUSE_STATE; @@ -270,16 +271,19 @@ class JobUpdateControllerImpl implements JobUpdateController { IJobUpdateKey key = summary.getKey(); JobUpdateStatus status = summary.getState().getStatus(); - LOG.info("Automatically resuming update " + key); - if (isCoordinatedUpdate(instructions)) { + LOG.info("Automatically restoring pulse state for " + key); pulseHandler.initializePulseState(details.getUpdate(), status); } - try { - changeJobUpdateStatus(storeProvider, key, newEvent(status), false); - } catch (UpdateStateException e) { - throw Throwables.propagate(e); + if (AUTO_RESUME_STATES.contains(status)) { + LOG.info("Automatically resuming update " + key); + + try { + changeJobUpdateStatus(storeProvider, key, newEvent(status), false); + } catch (UpdateStateException e) { + throw Throwables.propagate(e); + } } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/d10d2d17/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java index 74e915c..1dbab1e 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java @@ -14,6 +14,7 @@ package org.apache.aurora.scheduler.updater; import java.util.Map; +import java.util.Set; import com.google.common.base.Function; import com.google.common.base.Optional; @@ -21,8 +22,8 @@ import com.google.common.collect.BiMap; import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; import org.apache.aurora.gen.JobUpdateQuery; import org.apache.aurora.gen.JobUpdateStatus; @@ -97,8 +98,10 @@ final class JobUpdateStateMachine { ROLL_BACK_AWAITING_PULSE, ROLL_BACK_PAUSED); static final IJobUpdateQuery ACTIVE_QUERY = IJobUpdateQuery.build( - new JobUpdateQuery() - .setUpdateStatuses(ImmutableSet.copyOf(ACTIVE_TO_PAUSED_STATES.keySet()))); + new JobUpdateQuery().setUpdateStatuses(Updates.ACTIVE_JOB_UPDATE_STATES)); + + static final Set<JobUpdateStatus> AUTO_RESUME_STATES = + Sets.immutableEnumSet(ACTIVE_TO_PAUSED_STATES.keySet()); private static final Map<JobUpdateStatus, JobUpdateStatus> PAUSE_BEHAVIOR = ImmutableMap.<JobUpdateStatus, JobUpdateStatus>builder() http://git-wip-us.apache.org/repos/asf/aurora/blob/d10d2d17/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java index 6be0efa..802c090 100644 --- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java +++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java @@ -496,6 +496,78 @@ public class JobUpdaterIT extends EasyMockTest { } @Test + public void testRecoverAwaitingPulseFromStorage() throws Exception { + expectTaskKilled(); + + control.replay(); + + JobUpdate builder = + setInstanceCount(makeJobUpdate(makeInstanceConfig(0, 0, OLD_CONFIG)), 1).newBuilder(); + builder.getInstructions().getSettings().setBlockIfNoPulsesAfterMs((int) PULSE_TIMEOUT_MS); + final IJobUpdate update = IJobUpdate.build(builder); + insertInitialTasks(update); + changeState(JOB, 0, ASSIGNED, STARTING, RUNNING); + clock.advance(ONE_DAY); + + storage.write(new NoResult.Quiet() { + @Override + protected void execute(Storage.MutableStoreProvider storeProvider) { + saveJobUpdate(storeProvider.getJobUpdateStore(), update, ROLL_FORWARD_AWAITING_PULSE); + } + }); + + subscriber.startAsync().awaitRunning(); + ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder(); + + assertState(ROLL_FORWARD_AWAITING_PULSE, actions.build()); + assertEquals(JobUpdatePulseStatus.OK, updater.pulse(UPDATE_ID)); + + changeState(JOB, 0, KILLED, ASSIGNED, STARTING, RUNNING); + clock.advance(WATCH_TIMEOUT); + actions.putAll(0, INSTANCE_UPDATING, INSTANCE_UPDATED); + + assertState(ROLLED_FORWARD, actions.build()); + assertEquals(JobUpdatePulseStatus.FINISHED, updater.pulse(UPDATE_ID)); + } + + @Test + public void testRecoverCoordinatedPausedFromStorage() throws Exception { + expectTaskKilled(); + + control.replay(); + + JobUpdate builder = + setInstanceCount(makeJobUpdate(makeInstanceConfig(0, 0, OLD_CONFIG)), 1).newBuilder(); + builder.getInstructions().getSettings().setBlockIfNoPulsesAfterMs((int) PULSE_TIMEOUT_MS); + final IJobUpdate update = IJobUpdate.build(builder); + insertInitialTasks(update); + changeState(JOB, 0, ASSIGNED, STARTING, RUNNING); + clock.advance(ONE_DAY); + + storage.write(new NoResult.Quiet() { + @Override + protected void execute(Storage.MutableStoreProvider storeProvider) { + saveJobUpdate(storeProvider.getJobUpdateStore(), update, ROLL_FORWARD_PAUSED); + } + }); + + subscriber.startAsync().awaitRunning(); + ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder(); + + assertState(ROLL_FORWARD_PAUSED, actions.build()); + assertEquals(JobUpdatePulseStatus.OK, updater.pulse(UPDATE_ID)); + + updater.resume(UPDATE_ID, AUDIT); + + changeState(JOB, 0, KILLED, ASSIGNED, STARTING, RUNNING); + clock.advance(WATCH_TIMEOUT); + actions.putAll(0, INSTANCE_UPDATING, INSTANCE_UPDATED); + + assertState(ROLLED_FORWARD, actions.build()); + assertEquals(JobUpdatePulseStatus.FINISHED, updater.pulse(UPDATE_ID)); + } + + @Test public void testResumeToAwaitingPulse() throws Exception { expectTaskKilled().times(2);
