Repository: aurora Updated Branches: refs/heads/master 90846640b -> a4fdf284d
Add rollback functionality to the scheduler For active job updates in ROLLING_FORWARD, ROLL_BACK_PAUSED, ROLL_BACK_AWAITING_PULSE, ROLL_FORWARD_PAUSED or ROLL_FORWARD_AWAITING_PULSE state it is possible now to initiate a rollback by calling a corresponding API function. Rollback is also supported in aurora CLI tool via new command: aurora update rollback CLUSTER/ROLE/ENV/NAME Bugs closed: AURORA-1721 Reviewed at https://reviews.apache.org/r/50168/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/a4fdf284 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/a4fdf284 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/a4fdf284 Branch: refs/heads/master Commit: a4fdf284d878c9430f267e2051a1ae97355d0ddb Parents: 9084664 Author: Igor Morozov <[email protected]> Authored: Thu Aug 11 13:56:48 2016 -0700 Committer: Zameer Manji <[email protected]> Committed: Thu Aug 11 13:56:48 2016 -0700 ---------------------------------------------------------------------- CHANGELOG | 3 +- RELEASE-NOTES.md | 2 + .../thrift/org/apache/aurora/gen/api.thrift | 9 + .../thrift/SchedulerThriftInterface.java | 8 + .../thrift/aop/AnnotatedAuroraAdmin.java | 5 + .../scheduler/updater/JobUpdateController.java | 20 ++ .../updater/JobUpdateControllerImpl.java | 7 + .../updater/JobUpdateStateMachine.java | 14 +- .../python/apache/aurora/client/api/__init__.py | 11 + .../python/apache/aurora/client/cli/update.py | 97 ++++++-- .../updater/JobUpdateStateMachineTest.java | 2 + .../aurora/scheduler/updater/JobUpdaterIT.java | 220 ++++++++++++++++++- .../aurora/client/api/test_scheduler_client.py | 5 + .../apache/aurora/client/cli/test_supdate.py | 86 ++++++++ 14 files changed, 459 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index fc6a46d..b1dce58 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,11 +1,10 @@ Aurora 0.15.0 -------------------------------------------------------------------------------- ## Task - * [AURORA-1725] - Expose tier configurations as a debug page + * [AURORA-1725] - Expose tier configurations as a debug page * [AURORA-1458] - Add tier into the UI "show config" summary * [AURORA-1720] - Broken link in http://aurora.apache.org/ - Aurora 0.14.0 -------------------------------------------------------------------------------- ## Bug http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/RELEASE-NOTES.md ---------------------------------------------------------------------- diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 50f9b83..1819eaa 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -29,6 +29,8 @@ documentation. - The `ExecutorInfo.source` field is deprecated and has been replaced with a label named `source`. It will be removed from Mesos in a future release. +- Add rollback API to the scheduler and new client command to support rolling back + active update jobs to their initial state. ### Deprecations and removals: http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/api/src/main/thrift/org/apache/aurora/gen/api.thrift ---------------------------------------------------------------------- diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift index b799cce..c5765b7 100644 --- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift +++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift @@ -1106,6 +1106,15 @@ service AuroraSchedulerManager extends ReadOnlyScheduler { 3: string message) /** + * Rollbacks the specified active job update to the initial state. + */ + Response rollbackJobUpdate( + /** The update to rollback. */ + 1: JobUpdateKey key, + /** A user-specified message to include with the induced job update state change. */ + 2: string message) + + /** * Allows progress of the job update in case blockIfNoPulsesAfterMs is specified in * JobUpdateSettings. Unblocks progress if the update was previously blocked. * Responds with ResponseCode.INVALID_REQUEST in case an unknown update key is specified. http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java index b534abf..929d021 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java @@ -979,6 +979,14 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { } @Override + public Response rollbackJobUpdate(JobUpdateKey mutableKey, @Nullable String message) { + return changeJobUpdateState( + mutableKey, + JobUpdateController::rollback, + Optional.fromNullable(message)); + } + + @Override public Response pulseJobUpdate(JobUpdateKey mutableUpdateKey) { IJobUpdateKey updateKey = validateJobUpdateKey(mutableUpdateKey); try { http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java index 9243c92..bfc3dc8 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java @@ -95,4 +95,9 @@ public interface AnnotatedAuroraAdmin extends AuroraAdmin.Iface { @Override Response pulseJobUpdate( @AuthorizingParam @Nullable JobUpdateKey key) throws TException; + + @Override + Response rollbackJobUpdate( + @AuthorizingParam @Nullable JobUpdateKey key, + @Nullable String message) throws TException; } http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java index f8357c4..c2ec1b3 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java @@ -123,6 +123,26 @@ public interface JobUpdateController { void abort(IJobUpdateKey key, AuditData auditData) throws UpdateStateException; /** + * Rollbacks an active job update. + * <p> + * This will rollback the update to its initial state effectively 'undoing' it. + * The rollback is possible if update is in following states: + * <ul> + * <li>ROLLING_FORWARD</li> + * <li>ROLL_BACK_PAUSED</li> + * <li>ROLL_BACK_AWAITING_PULSE</li> + * <li>ROLL_FORWARD_PAUSED</li> + * <li>ROLL_FORWARD_AWAITING_PULSE</li> + * </ul> + * has not reached its terminal state yet. + * + * @param key Update to rollback. + * @param auditData Details about the origin of this state change. + * @throws UpdateStateException If pre-condition is not met. + */ + void rollback(IJobUpdateKey key, AuditData auditData) throws UpdateStateException; + + /** * Notifies the updater that the state of an instance has changed. A state change could also mean * deletion. * http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java index 364c5c7..594bb62 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java @@ -246,6 +246,13 @@ class JobUpdateControllerImpl implements JobUpdateController { Functions.compose(createAuditedEvent(auditData), Functions.constant(ABORTED))); } + @Override + public void rollback(IJobUpdateKey key, AuditData auditData) throws UpdateStateException { + unscopedChangeUpdateStatus( + key, + Functions.compose(createAuditedEvent(auditData), Functions.constant(ROLLING_BACK))); + } + private static Function<JobUpdateStatus, JobUpdateEvent> createAuditedEvent( final AuditData auditData) { http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java index 7ab739a..959a5a5 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java @@ -70,9 +70,19 @@ final class JobUpdateStateMachine { ABORTED, ERROR, FAILED) - .putAll(ROLL_FORWARD_PAUSED, ROLLING_FORWARD, ROLL_FORWARD_AWAITING_PULSE, ABORTED, ERROR) + .putAll(ROLL_FORWARD_PAUSED, + ROLLING_BACK, + ROLLING_FORWARD, + ROLL_FORWARD_AWAITING_PULSE, + ABORTED, + ERROR) .putAll(ROLL_BACK_PAUSED, ROLLING_BACK, ROLL_BACK_AWAITING_PULSE, ABORTED, ERROR) - .putAll(ROLL_FORWARD_AWAITING_PULSE, ROLLING_FORWARD, ROLL_FORWARD_PAUSED, ABORTED, ERROR) + .putAll(ROLL_FORWARD_AWAITING_PULSE, + ROLLING_BACK, + ROLLING_FORWARD, + ROLL_FORWARD_PAUSED, + ABORTED, + ERROR) .putAll(ROLL_BACK_AWAITING_PULSE, ROLLING_BACK, ROLL_BACK_PAUSED, ABORTED, ERROR) .build(); http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/python/apache/aurora/client/api/__init__.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/api/__init__.py b/src/main/python/apache/aurora/client/api/__init__.py index ec2c786..9149c30 100644 --- a/src/main/python/apache/aurora/client/api/__init__.py +++ b/src/main/python/apache/aurora/client/api/__init__.py @@ -198,6 +198,17 @@ class AuroraClientAPI(object): """ return self._scheduler_proxy.abortJobUpdate(update_key, message) + def rollback_job_update(self, update_key, message): + """Requests Scheduler to rollback active job update. + + Arguments: + update_key -- Update identifier. + message -- Audit message to include with the change. + + Returns response object. + """ + return self._scheduler_proxy.rollbackJobUpdate(update_key, message) + def get_job_update_diff(self, config, instances=None): """Requests scheduler to calculate difference between scheduler and client job views. http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/python/apache/aurora/client/cli/update.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/cli/update.py b/src/main/python/apache/aurora/client/cli/update.py index bb526f7..23aaa2c 100644 --- a/src/main/python/apache/aurora/client/cli/update.py +++ b/src/main/python/apache/aurora/client/cli/update.py @@ -76,11 +76,18 @@ class UpdateController(object): update_key = self.get_update_key(job_key) if update_key is None: self.context.print_err("No active update found for this job.") - return EXIT_INVALID_PARAMETER + return EXIT_INVALID_PARAMETER, update_key resp = mutate_fn(update_key) self.context.log_response_and_raise(resp, err_code=EXIT_API_ERROR, err_msg=error_msg) self.context.print_out(success_msg) - return EXIT_OK + return EXIT_OK, update_key + + def rollback(self, job_key, message): + return self._modify_update( + job_key, + lambda key: self.api.rollback_job_update(key, message), + "Failed to rollback update due to error:", + "Update rollback has started.") def pause(self, job_key, message): return self._modify_update( @@ -103,7 +110,6 @@ class UpdateController(object): "Failed to abort update due to error:", "Update has been aborted.") - def format_timestamp(stamp_millis): return datetime.datetime.utcfromtimestamp(stamp_millis / 1000).isoformat() @@ -116,16 +122,16 @@ MESSAGE_OPTION = CommandOption( help='Message to include with the update state transition') +WAIT_OPTION = lambda help_msg: CommandOption( + '--wait', + default=False, + action='store_true', + help=help_msg) + class StartUpdate(Verb): UPDATE_MSG_TEMPLATE = "Job update has started. View your update progress at %s" - WAIT_OPTION = CommandOption( - '--wait', - default=False, - action='store_true', - help='Wait until the update completes') - def __init__(self, clock=time): self._clock = clock @@ -143,7 +149,7 @@ class StartUpdate(Verb): STRICT_OPTION, INSTANCES_SPEC_ARGUMENT, CONFIG_ARGUMENT, - self.WAIT_OPTION + WAIT_OPTION('Wait until the update completes') ] @property @@ -189,14 +195,26 @@ class StartUpdate(Verb): context.print_out(self.UPDATE_MSG_TEMPLATE % url) if context.options.wait: - return wait_for_update(context, self._clock, api, update_key) + return wait_for_update(context, self._clock, api, update_key, update_state_to_err_code) else: context.print_out(combine_messages(resp)) return EXIT_OK -def wait_for_update(context, clock, api, update_key): +def rollback_state_to_err_code(state): + return (EXIT_OK if state == JobUpdateStatus.ROLLED_BACK else + EXIT_COMMAND_FAILURE if state == JobUpdateStatus.ROLLED_FORWARD else + EXIT_UNKNOWN_ERROR) + + +def update_state_to_err_code(state): + return (EXIT_OK if state == JobUpdateStatus.ROLLED_FORWARD else + EXIT_COMMAND_FAILURE if state == JobUpdateStatus.ROLLED_BACK else + EXIT_UNKNOWN_ERROR) + + +def wait_for_update(context, clock, api, update_key, state_to_err_code_func): cur_state = None while True: @@ -209,12 +227,7 @@ def wait_for_update(context, clock, api, update_key): cur_state = new_state context.print_out('Current state %s' % JobUpdateStatus._VALUES_TO_NAMES[cur_state]) if cur_state not in ACTIVE_JOB_UPDATE_STATES: - if cur_state == JobUpdateStatus.ROLLED_FORWARD: - return EXIT_OK - elif cur_state == JobUpdateStatus.ROLLED_BACK: - return EXIT_COMMAND_FAILURE - else: - return EXIT_UNKNOWN_ERROR + return state_to_err_code_func(cur_state) clock.sleep(5) elif len(summaries) == 0: raise context.CommandError(EXIT_INVALID_PARAMETER, 'Job update not found.') @@ -252,7 +265,9 @@ class UpdateWait(Verb): context, self._clock, context.get_api(context.options.jobspec.cluster), - JobUpdateKey(job=context.options.jobspec.to_thrift(), id=context.options.id)) + JobUpdateKey(job=context.options.jobspec.to_thrift(), id=context.options.id), + update_state_to_err_code + ) class PauseUpdate(Verb): @@ -269,9 +284,10 @@ class PauseUpdate(Verb): def execute(self, context): job_key = context.options.jobspec - return UpdateController(context.get_api(job_key.cluster), context).pause( + err_code, _ = UpdateController(context.get_api(job_key.cluster), context).pause( job_key, context.options.message) + return err_code class ResumeUpdate(Verb): @@ -288,9 +304,10 @@ class ResumeUpdate(Verb): def execute(self, context): job_key = context.options.jobspec - return UpdateController(context.get_api(job_key.cluster), context).resume( + err_code, _ = UpdateController(context.get_api(job_key.cluster), context).resume( job_key, context.options.message) + return err_code class AbortUpdate(Verb): @@ -307,9 +324,44 @@ class AbortUpdate(Verb): def execute(self, context): job_key = context.options.jobspec - return UpdateController(context.get_api(job_key.cluster), context).abort( + err_code, _ = UpdateController(context.get_api(job_key.cluster), context).abort( job_key, context.options.message) + return err_code + + +class RollbackUpdate(Verb): + def __init__(self, clock=time): + self._clock = clock + + @property + def name(self): + return 'rollback' + + def get_options(self): + return [JOBSPEC_ARGUMENT, MESSAGE_OPTION, WAIT_OPTION('Wait until the update rolls back')] + + @property + def help(self): + return 'Rollback an in-progress update.' + + def execute(self, context): + job_key = context.options.jobspec + update_controller = UpdateController( + context.get_api(job_key.cluster), context) + + err_code, update_key = update_controller.rollback( + job_key, context.options.message) + if err_code == EXIT_OK and context.options.wait: + return wait_for_update( + context, + self._clock, + context.get_api(job_key.cluster), + update_key, + rollback_state_to_err_code + ) + + return err_code UpdateFilter = namedtuple('UpdateFilter', ['cluster', 'role', 'env', 'job']) @@ -554,6 +606,7 @@ class Update(Noun): self.register_verb(PauseUpdate()) self.register_verb(ResumeUpdate()) self.register_verb(AbortUpdate()) + self.register_verb(RollbackUpdate()) self.register_verb(ListUpdates()) self.register_verb(UpdateInfo()) self.register_verb(UpdateWait()) http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java index 8d78bae..3522dcc 100644 --- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java +++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java @@ -60,11 +60,13 @@ public class JobUpdateStateMachineTest { .put(Pair.of(ROLL_FORWARD_PAUSED, ROLL_FORWARD_AWAITING_PULSE), STOP_WATCHING) .put(Pair.of(ROLL_FORWARD_PAUSED, ABORTED), STOP_WATCHING) .put(Pair.of(ROLL_FORWARD_PAUSED, ERROR), STOP_WATCHING) + .put(Pair.of(ROLL_FORWARD_PAUSED, ROLLING_BACK), ROLL_BACK) .put(Pair.of(ROLL_BACK_PAUSED, ROLLING_BACK), ROLL_BACK) .put(Pair.of(ROLL_BACK_PAUSED, ROLL_BACK_AWAITING_PULSE), STOP_WATCHING) .put(Pair.of(ROLL_BACK_PAUSED, ABORTED), STOP_WATCHING) .put(Pair.of(ROLL_BACK_PAUSED, ERROR), STOP_WATCHING) .put(Pair.of(ROLL_FORWARD_AWAITING_PULSE, ROLLING_FORWARD), ROLL_FORWARD) + .put(Pair.of(ROLL_FORWARD_AWAITING_PULSE, ROLLING_BACK), ROLL_BACK) .put(Pair.of(ROLL_FORWARD_AWAITING_PULSE, ROLL_FORWARD_PAUSED), STOP_WATCHING) .put(Pair.of(ROLL_FORWARD_AWAITING_PULSE, ABORTED), STOP_WATCHING) .put(Pair.of(ROLL_FORWARD_AWAITING_PULSE, ERROR), STOP_WATCHING) http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java index e157c0d..04551f1 100644 --- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java +++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java @@ -263,6 +263,11 @@ public class JobUpdaterIT extends EasyMockTest { storeProvider -> storeProvider.getJobUpdateStore().fetchJobUpdateDetails(UPDATE_ID).get()); } + private IJobUpdateDetails getDetails(IJobUpdateKey key) { + return storage.read( + storeProvider -> storeProvider.getJobUpdateStore().fetchJobUpdateDetails(key).get()); + } + private void assertLatestUpdateMessage(String expected) { IJobUpdateDetails details = getDetails(); assertEquals(expected, Iterables.getLast(details.getUpdateEvents()).getMessage()); @@ -272,7 +277,15 @@ public class JobUpdaterIT extends EasyMockTest { JobUpdateStatus expected, Multimap<Integer, JobUpdateAction> expectedActions) { - IJobUpdateDetails details = getDetails(); + assertStateUpdate(UPDATE_ID, expected, expectedActions); + } + + private void assertStateUpdate( + IJobUpdateKey key, + JobUpdateStatus expected, + Multimap<Integer, JobUpdateAction> expectedActions) { + + IJobUpdateDetails details = getDetails(key); Iterable<IJobInstanceUpdateEvent> orderedEvents = EVENT_ORDER.sortedCopy(details.getInstanceEvents()); Multimap<Integer, IJobInstanceUpdateEvent> eventsByInstance = @@ -1365,15 +1378,214 @@ public class JobUpdaterIT extends EasyMockTest { updater.resume(UPDATE_ID, AUDIT); } - private static IJobUpdateSummary makeUpdateSummary() { + @Test + public void testFailToRollbackCompletedUpdate() throws Exception { + expectTaskKilled().times(3); + + control.replay(); + + JobUpdate builder = makeJobUpdate(makeInstanceConfig(0, 2, OLD_CONFIG)).newBuilder(); + builder.getInstructions().getSettings() + .setWaitForBatchCompletion(true) + .setUpdateGroupSize(2); + IJobUpdate update = IJobUpdate.build(builder); + insertInitialTasks(update); + + changeState(JOB, 0, ASSIGNED, STARTING, RUNNING); + changeState(JOB, 1, ASSIGNED, STARTING, RUNNING); + changeState(JOB, 2, ASSIGNED, STARTING, RUNNING); + clock.advance(WATCH_TIMEOUT); + + ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder(); + + // Instances 0 and 1 are updated. + updater.start(update, AUDIT); + actions.putAll(0, INSTANCE_UPDATING) + .putAll(1, INSTANCE_UPDATING); + assertState(ROLLING_FORWARD, actions.build()); + changeState(JOB, 1, FINISHED, ASSIGNED, STARTING, RUNNING); + clock.advance(Amount.of(WATCH_TIMEOUT.getValue() / 2, Time.MILLISECONDS)); + changeState(JOB, 0, FINISHED, ASSIGNED, STARTING, RUNNING); + clock.advance(Amount.of(WATCH_TIMEOUT.getValue() / 2, Time.MILLISECONDS)); + + // Instance 1 finished first, but update does not yet proceed until 0 finishes. + actions.putAll(1, INSTANCE_UPDATED); + assertState(ROLLING_FORWARD, actions.build()); + clock.advance(WATCH_TIMEOUT); + actions.putAll(0, INSTANCE_UPDATED); + + // Instance 2 is updated. + changeState(JOB, 2, FINISHED, ASSIGNED, STARTING, RUNNING); + clock.advance(WATCH_TIMEOUT); + actions.putAll(2, INSTANCE_UPDATING, INSTANCE_UPDATED); + assertState(ROLLED_FORWARD, actions.build()); + + assertJobState( + JOB, + ImmutableMap.of(0, NEW_CONFIG, 1, NEW_CONFIG, 2, NEW_CONFIG)); + + try { + updater.rollback(UPDATE_ID, AUDIT); + fail(); + } catch (UpdateStateException e) { + // Expected. + } + } + + @Test + public void testRollbackDuringUpgrade() throws Exception { + expectTaskKilled().times(5); + + control.replay(); + + JobUpdate builder = makeJobUpdate(makeInstanceConfig(0, 2, OLD_CONFIG)).newBuilder(); + builder.getInstructions().getSettings() + .setWaitForBatchCompletion(true) + .setUpdateGroupSize(2); + IJobUpdate update = IJobUpdate.build(builder); + insertInitialTasks(update); + + changeState(JOB, 0, ASSIGNED, STARTING, RUNNING); + changeState(JOB, 1, ASSIGNED, STARTING, RUNNING); + changeState(JOB, 2, ASSIGNED, STARTING, RUNNING); + clock.advance(WATCH_TIMEOUT); + + ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder(); + + // Instances 0 and 1 are updated. + updater.start(update, AUDIT); + actions.putAll(0, INSTANCE_UPDATING) + .putAll(1, INSTANCE_UPDATING); + assertState(ROLLING_FORWARD, actions.build()); + changeState(JOB, 1, FINISHED, ASSIGNED, STARTING, RUNNING); + changeState(JOB, 0, FINISHED, ASSIGNED, STARTING, RUNNING); + clock.advance(WATCH_TIMEOUT); + + actions.putAll(0, INSTANCE_UPDATED) + .putAll(1, INSTANCE_UPDATED) + .putAll(2, INSTANCE_UPDATING); + assertState(ROLLING_FORWARD, actions.build()); + clock.advance(WATCH_TIMEOUT); + + updater.rollback(UPDATE_ID, AUDIT); + + actions.putAll(1, INSTANCE_ROLLING_BACK); + actions.putAll(2, INSTANCE_ROLLING_BACK); + changeState(JOB, 1, KILLED); + changeState(JOB, 2, KILLED); + clock.advance(WATCH_TIMEOUT); + + assertState(ROLLING_BACK, actions.build()); + clock.advance(WATCH_TIMEOUT); + + changeState(JOB, 2, ASSIGNED, STARTING, RUNNING); + changeState(JOB, 1, ASSIGNED, STARTING, RUNNING); + clock.advance(WATCH_TIMEOUT); + + actions.putAll(2, INSTANCE_ROLLED_BACK) + .putAll(1, INSTANCE_ROLLED_BACK); + changeState(JOB, 0, KILLED); + actions.putAll(0, INSTANCE_ROLLING_BACK); + clock.advance(WATCH_TIMEOUT); + + assertState(ROLLING_BACK, actions.build()); + + changeState(JOB, 0, ASSIGNED, STARTING, RUNNING); + actions.putAll(0, INSTANCE_ROLLED_BACK); + clock.advance(WATCH_TIMEOUT); + + assertState(ROLLED_BACK, actions.build()); + + assertJobState( + JOB, + ImmutableMap.of(0, OLD_CONFIG, 1, OLD_CONFIG, 2, OLD_CONFIG)); + } + + @Test + public void testRollbackCoordinatedUpdate() throws Exception { + control.replay(); + + JobUpdate builder = makeJobUpdate( + // No-op - task is already matching the new config. + makeInstanceConfig(0, 0, NEW_CONFIG), + // Tasks needing update. + makeInstanceConfig(1, 2, OLD_CONFIG)).newBuilder(); + + builder.getInstructions().getSettings().setBlockIfNoPulsesAfterMs((int) PULSE_TIMEOUT_MS); + insertInitialTasks(IJobUpdate.build(builder)); + + changeState(JOB, 0, ASSIGNED, STARTING, RUNNING); + changeState(JOB, 1, ASSIGNED, STARTING, RUNNING); + changeState(JOB, 2, ASSIGNED, STARTING, RUNNING); + clock.advance(WATCH_TIMEOUT); + + ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder(); + updater.start(IJobUpdate.build(builder), AUDIT); + + // The update is blocked initially waiting for a pulse. + assertState(ROLL_FORWARD_AWAITING_PULSE, actions.build()); + + updater.rollback(UPDATE_ID, AUDIT); + + clock.advance(WATCH_TIMEOUT); + assertState(ROLLED_BACK, actions.build()); + } + + @Test + public void testRollbackPausedForwardUpdate() throws Exception { + expectTaskKilled().times(2); + + control.replay(); + + JobUpdate builder = makeJobUpdate( + // No-op - task is already matching the new config. + makeInstanceConfig(0, 0, NEW_CONFIG), + // Tasks needing update. + makeInstanceConfig(1, 2, OLD_CONFIG)).newBuilder(); + + insertInitialTasks(IJobUpdate.build(builder)); + + changeState(JOB, 0, ASSIGNED, STARTING, RUNNING); + changeState(JOB, 1, ASSIGNED, STARTING, RUNNING); + changeState(JOB, 2, ASSIGNED, STARTING, RUNNING); + clock.advance(WATCH_TIMEOUT); + + ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder(); + updater.start(IJobUpdate.build(builder), AUDIT); + + actions.putAll(1, INSTANCE_UPDATING); + assertState(ROLLING_FORWARD, actions.build()); + clock.advance(WATCH_TIMEOUT); + changeState(JOB, 1, KILLED, ASSIGNED, STARTING, RUNNING); + + updater.pause(UPDATE_ID, AUDIT); + assertState(ROLL_FORWARD_PAUSED, actions.build()); + + updater.rollback(UPDATE_ID, AUDIT); + + actions.putAll(1, INSTANCE_ROLLING_BACK); + clock.advance(WATCH_TIMEOUT); + assertState(ROLLING_BACK, actions.build()); + + actions.putAll(1, INSTANCE_ROLLED_BACK); + changeState(JOB, 1, KILLED, ASSIGNED, STARTING, RUNNING); + clock.advance(WATCH_TIMEOUT); + assertState(ROLLED_BACK, actions.build()); + + assertJobState( + JOB, + ImmutableMap.of(0, NEW_CONFIG, 1, OLD_CONFIG, 2, OLD_CONFIG)); + } + + private static IJobUpdateSummary makeUpdateSummary(IJobUpdateKey key) { return IJobUpdateSummary.build(new JobUpdateSummary() .setUser("user") - .setKey(UPDATE_ID.newBuilder())); + .setKey(key.newBuilder())); } private static IJobUpdate makeJobUpdate(IInstanceTaskConfig... configs) { JobUpdate builder = new JobUpdate() - .setSummary(makeUpdateSummary().newBuilder()) + .setSummary(makeUpdateSummary(UPDATE_ID).newBuilder()) .setInstructions(new JobUpdateInstructions() .setDesiredState(new InstanceTaskConfig() .setTask(NEW_CONFIG.newBuilder()) http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/test/python/apache/aurora/client/api/test_scheduler_client.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_client.py b/src/test/python/apache/aurora/client/api/test_scheduler_client.py index afbd385..afac250 100644 --- a/src/test/python/apache/aurora/client/api/test_scheduler_client.py +++ b/src/test/python/apache/aurora/client/api/test_scheduler_client.py @@ -183,6 +183,11 @@ class TestSchedulerProxyInjection(unittest.TestCase): self.mox.ReplayAll() self.make_scheduler_proxy().abortJobUpdate('update_id') + def test_rollbackJobUpdate(self): + self.mock_thrift_client.rollbackJobUpdate('update_id').AndReturn(DEFAULT_RESPONSE) + self.mox.ReplayAll() + self.make_scheduler_proxy().rollbackJobUpdate('update_id') + def test_pulseJobUpdate(self): self.mock_thrift_client.pulseJobUpdate('update_id').AndReturn(DEFAULT_RESPONSE) self.mox.ReplayAll() http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/test/python/apache/aurora/client/cli/test_supdate.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/cli/test_supdate.py b/src/test/python/apache/aurora/client/cli/test_supdate.py index 317b175..92fb039 100644 --- a/src/test/python/apache/aurora/client/cli/test_supdate.py +++ b/src/test/python/apache/aurora/client/cli/test_supdate.py @@ -34,6 +34,7 @@ from apache.aurora.client.cli.update import ( ListUpdates, PauseUpdate, ResumeUpdate, + RollbackUpdate, StartUpdate, UpdateFilter, UpdateInfo, @@ -705,3 +706,88 @@ class TestUpdateWait(AuroraClientCommandTest): assert self._fake_context.get_out() == [] assert self._mock_api.query_job_updates.mock_calls == [self._fetch_call] + + +class TestRollbackUpdate(AuroraClientCommandTest): + def setUp(self): + self._command = RollbackUpdate() + self._mock_options = mock_verb_options(self._command) + self._mock_options.jobspec = self.TEST_JOBKEY + self._mock_options.wait = False + self._fake_context = FakeAuroraCommandContext() + self._fake_context.set_options(self._mock_options) + self._mock_api = self._fake_context.get_api('UNUSED') + + def test_rollback_update_command_line_succeeds(self): + self._mock_api.query_job_updates.return_value = get_status_query_response() + self._mock_api.rollback_job_update.return_value = self.create_simple_success_response() + self._mock_options.message = 'hello' + assert self._command.execute(self._fake_context) == EXIT_OK + + assert self._mock_api.query_job_updates.mock_calls == [ + call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)] + assert self._mock_api.rollback_job_update.mock_calls == [call(UPDATE_KEY, 'hello')] + assert self._fake_context.get_out() == ["Update rollback has started."] + assert self._fake_context.get_err() == [] + + def test_rollback_update_command_line_error(self): + self._mock_api.query_job_updates.return_value = get_status_query_response() + self._mock_api.rollback_job_update.return_value = self.create_error_response() + + with pytest.raises(Context.CommandError): + self._command.execute(self._fake_context) + assert self._mock_api.query_job_updates.mock_calls == [ + call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)] + assert self._mock_api.rollback_job_update.mock_calls == [call(UPDATE_KEY, None)] + + assert self._fake_context.get_out() == [] + assert self._fake_context.get_err() == ["Failed to rollback update due to error:", "\tWhoops"] + + def test_rollback_invalid_api_response(self): + # Mimic the API returning two active updates for one job, which should be impossible. + self._mock_api.query_job_updates.return_value = get_status_query_response(count=2) + self._mock_api.rollback_job_update.return_value = self.create_error_response() + with pytest.raises(Context.CommandError) as error: + self._command.execute(self._fake_context) + assert error.message == ( + 'scheduler returned multiple active updates for this job.') + + assert self._mock_api.query_job_updates.mock_calls == [ + call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)] + assert self._mock_api.rollback_job_update.mock_calls == [] + assert self._fake_context.get_out() == [] + assert self._fake_context.get_err() == [] + + def test_rollback_and_wait_success(self): + self._mock_options.wait = True + + updating_response = get_status_query_response(status=JobUpdateStatus.ROLLING_FORWARD) + updated_response = get_status_query_response(status=JobUpdateStatus.ROLLED_BACK) + + self._mock_api.query_job_updates.side_effect = [updating_response, updated_response] + self._mock_api.rollback_job_update.return_value = self.create_simple_success_response() + + assert self._command.execute(self._fake_context) == EXIT_OK + assert self._mock_api.rollback_job_update.mock_calls == [call(UPDATE_KEY, None)] + assert self._fake_context.get_err() == [] + + def test_rollback_and_wait_rolled_forward(self): + self._mock_options.wait = True + + updating_response = get_status_query_response(status=JobUpdateStatus.ROLLING_FORWARD) + updated_response = get_status_query_response(status=JobUpdateStatus.ROLLED_FORWARD) + + self._mock_api.query_job_updates.side_effect = [updating_response, updated_response] + + self._mock_api.rollback_job_update.return_value = self.create_simple_success_response() + assert self._command.execute(self._fake_context) == EXIT_COMMAND_FAILURE + + def test_rollback_and_wait_error(self): + self._mock_options.wait = True + updating_response = get_status_query_response(status=JobUpdateStatus.ROLLING_FORWARD) + failed_response = get_status_query_response(status=JobUpdateStatus.ERROR) + + self._mock_api.query_job_updates.side_effect = [updating_response, failed_response] + + self._mock_api.rollback_job_update.return_value = self.create_simple_success_response() + assert self._command.execute(self._fake_context) == EXIT_UNKNOWN_ERROR
