Repository: aurora Updated Branches: refs/heads/master 288f00be2 -> c34e50c79
Add update wait and update start --wait flag. Bugs closed: AURORA-1239 Reviewed at https://reviews.apache.org/r/33959/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/c34e50c7 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/c34e50c7 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/c34e50c7 Branch: refs/heads/master Commit: c34e50c799a8b77e65a3902e173129eee6df49fa Parents: 288f00b Author: Bill Farner <[email protected]> Authored: Fri May 8 14:34:15 2015 -0700 Committer: Bill Farner <[email protected]> Committed: Fri May 8 14:34:15 2015 -0700 ---------------------------------------------------------------------- .../python/apache/aurora/client/api/__init__.py | 13 +- .../python/apache/aurora/client/cli/update.py | 91 ++++++++++-- .../apache/aurora/client/cli/test_supdate.py | 141 +++++++++++++++---- .../sh/org/apache/aurora/e2e/test_end_to_end.sh | 22 +-- 4 files changed, 201 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/c34e50c7/src/main/python/apache/aurora/client/api/__init__.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/api/__init__.py b/src/main/python/apache/aurora/client/api/__init__.py index 0ae1d9f..4b9c48e 100644 --- a/src/main/python/apache/aurora/client/api/__init__.py +++ b/src/main/python/apache/aurora/client/api/__init__.py @@ -206,7 +206,13 @@ class AuroraClientAPI(object): """ return self._scheduler_proxy.abortJobUpdate(update_key, message) - def query_job_updates(self, role=None, job_key=None, user=None, update_statuses=None): + def query_job_updates( + self, + role=None, + job_key=None, + user=None, + update_statuses=None, + update_key=None): """Returns all job updates matching the query. Arguments: @@ -214,15 +220,18 @@ class AuroraClientAPI(object): job_key -- job key. user -- user who initiated an update. update_statuses -- set of JobUpdateStatus to match. + update_key -- JobUpdateKey to match. Returns response object with all matching job update summaries. """ + # TODO(wfarner): Consider accepting JobUpdateQuery in this function instead of kwargs. return self._scheduler_proxy.getJobUpdateSummaries( JobUpdateQuery( role=role, jobKey=job_key.to_thrift() if job_key else None, user=user, - updateStatuses=update_statuses)) + updateStatuses=update_statuses, + key=update_key)) def get_job_update_details(self, key): """Gets JobUpdateDetails for the specified job update ID. http://git-wip-us.apache.org/repos/asf/aurora/blob/c34e50c7/src/main/python/apache/aurora/client/cli/update.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/cli/update.py b/src/main/python/apache/aurora/client/cli/update.py index 7bd1eb5..58f0db0 100644 --- a/src/main/python/apache/aurora/client/cli/update.py +++ b/src/main/python/apache/aurora/client/cli/update.py @@ -17,6 +17,7 @@ from __future__ import print_function import datetime import json import textwrap +import time from collections import namedtuple from apache.aurora.client.api import AuroraClientAPI @@ -117,6 +118,15 @@ class StartUpdate(Verb): UPDATE_MSG_TEMPLATE = "Job update has started. View your update progress at %s" + WAIT_OPTION = CommandOption( + '--wait', + default=False, + action='store_true', + help='Wait until the update completes') + + def __init__(self, clock=time): + self._clock = clock + @property def name(self): return 'start' @@ -130,7 +140,8 @@ class StartUpdate(Verb): MESSAGE_OPTION, STRICT_OPTION, INSTANCES_SPEC_ARGUMENT, - CONFIG_ARGUMENT + CONFIG_ARGUMENT, + self.WAIT_OPTION ] @property @@ -166,13 +177,73 @@ class StartUpdate(Verb): err_msg="Failed to start update due to error:") if resp.result: + update_key = resp.result.startJobUpdateResult.key url = get_update_page( api, - AuroraJobKey.from_thrift(config.cluster(), resp.result.startJobUpdateResult.key.job), + AuroraJobKey.from_thrift(config.cluster(), update_key.job), resp.result.startJobUpdateResult.key.id) context.print_out(self.UPDATE_MSG_TEMPLATE % url) + + if context.options.wait: + wait_for_update(context, self._clock, api, update_key) else: context.print_out(combine_messages(resp)) + + return EXIT_OK + + +def wait_for_update(context, clock, api, update_key): + cur_state = None + + while True: + resp = api.query_job_updates(update_key=update_key) + context.log_response_and_raise(resp) + summaries = resp.result.getJobUpdateSummariesResult.updateSummaries + if len(summaries) == 1: + new_state = summaries[0].state.status + if new_state != cur_state: + cur_state = new_state + context.print_out('Current state %s' % JobUpdateStatus._VALUES_TO_NAMES[cur_state]) + if cur_state not in ACTIVE_JOB_UPDATE_STATES: + break + clock.sleep(5) + elif len(summaries) == 0: + raise context.CommandError(EXIT_INVALID_PARAMETER, 'Job update not found.') + else: + raise context.CommandError( + EXIT_API_ERROR, + 'Scheduler returned multiple updates: %s' % summaries) + + +UPDATE_ID_ARGUMENT = CommandOption( + 'id', + type=str, + nargs='?', + metavar='ID', + help='Update identifier provided by the scheduler when an update was started.') + + +class UpdateWait(Verb): + def __init__(self, clock=time): + self._clock = clock + + @property + def name(self): + return 'wait' + + def get_options(self): + return [JOBSPEC_ARGUMENT, UPDATE_ID_ARGUMENT] + + @property + def help(self): + return 'Block until an update has entered a terminal state.' + + def execute(self, context): + wait_for_update( + context, + self._clock, + context.get_api(context.options.jobspec.cluster), + JobUpdateKey(job=context.options.jobspec.to_thrift(), id=context.options.id)) return EXIT_OK @@ -186,7 +257,7 @@ class PauseUpdate(Verb): @property def help(self): - return """Pause an update.""" + return 'Pause an update.' def execute(self, context): job_key = context.options.jobspec @@ -205,7 +276,7 @@ class ResumeUpdate(Verb): @property def help(self): - return """Resume an update.""" + return 'Resume an update.' def execute(self, context): job_key = context.options.jobspec @@ -224,7 +295,7 @@ class AbortUpdate(Verb): @property def help(self): - return """Abort an in-progress update.""" + return 'Abort an in-progress update.' def execute(self, context): job_key = context.options.jobspec @@ -365,19 +436,12 @@ updates matching any of the specified statuses will be included."""), class UpdateInfo(Verb): - UPDATE_ID_ARGUMENT = CommandOption( - 'id', - type=str, - nargs='?', - metavar='ID', - help='Update identifier provided by the scheduler when an update was started.') - @property def name(self): return 'info' def get_options(self): - return [JSON_WRITE_OPTION, JOBSPEC_ARGUMENT, self.UPDATE_ID_ARGUMENT] + return [JSON_WRITE_OPTION, JOBSPEC_ARGUMENT, UPDATE_ID_ARGUMENT] @property def help(self): @@ -484,3 +548,4 @@ class Update(Noun): self.register_verb(AbortUpdate()) self.register_verb(ListUpdates()) self.register_verb(UpdateInfo()) + self.register_verb(UpdateWait()) http://git-wip-us.apache.org/repos/asf/aurora/blob/c34e50c7/src/test/python/apache/aurora/client/cli/test_supdate.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/cli/test_supdate.py b/src/test/python/apache/aurora/client/cli/test_supdate.py index 968d456..158d330 100644 --- a/src/test/python/apache/aurora/client/cli/test_supdate.py +++ b/src/test/python/apache/aurora/client/cli/test_supdate.py @@ -12,12 +12,14 @@ # limitations under the License. # import json +import time +import mock import pytest from mock import ANY, call, create_autospec, Mock from pystachio import Empty -from apache.aurora.client.cli import Context, EXIT_INVALID_PARAMETER, EXIT_OK +from apache.aurora.client.cli import Context, EXIT_API_ERROR, EXIT_INVALID_PARAMETER, EXIT_OK from apache.aurora.client.cli.options import TaskInstanceKey from apache.aurora.client.cli.update import ( AbortUpdate, @@ -26,7 +28,8 @@ from apache.aurora.client.cli.update import ( ResumeUpdate, StartUpdate, UpdateFilter, - UpdateInfo + UpdateInfo, + UpdateWait ) from apache.aurora.common.aurora_job_key import AuroraJobKey from apache.aurora.config import AuroraConfig @@ -58,7 +61,7 @@ from gen.apache.aurora.api.ttypes import ( UPDATE_KEY = JobUpdateKey(job=AuroraClientCommandTest.TEST_JOBKEY.to_thrift(), id="update_id") -def get_status_query_response(count=1): +def get_status_query_response(count=1, status=JobUpdateStatus.ROLLED_FORWARD): return Response( responseCode=ResponseCode.OK, result=Result( @@ -68,7 +71,7 @@ def get_status_query_response(count=1): key=UPDATE_KEY, user="me", state=JobUpdateState( - status=JobUpdateStatus.ROLLED_FORWARD, + status=status, createdTimestampMs=1411404927, lastModifiedTimestampMs=14114056030)) for i in range(count) ] @@ -78,12 +81,12 @@ def get_status_query_response(count=1): class TestStartUpdate(AuroraClientCommandTest): - def setUp(self): self._command = StartUpdate() self._job_key = AuroraJobKey.from_thrift("cluster", UPDATE_KEY.job) self._mock_options = mock_verb_options(self._command) self._mock_options.instance_spec = TaskInstanceKey(self._job_key, None) + self._mock_options.wait = False self._fake_context = FakeAuroraCommandContext() self._fake_context.set_options(self._mock_options) self._mock_api = self._fake_context.get_api('UNUSED') @@ -144,13 +147,38 @@ class TestStartUpdate(AuroraClientCommandTest): self._mock_options.message = 'hello' assert self._command.execute(self._fake_context) == EXIT_OK - update_url_msg = StartUpdate.UPDATE_MSG_TEMPLATE % ( - 'http://something_or_other/scheduler/role/env/name/id') - assert self._mock_api.start_job_update.mock_calls == [ - call(ANY, 'hello', None) + call(ANY, 'hello', None) + ] + assert self._fake_context.get_out() == [ + StartUpdate.UPDATE_MSG_TEMPLATE % ('http://something_or_other/scheduler/role/env/name/id') + ] + assert self._fake_context.get_err() == [] + + def test_start_update_and_wait(self): + mock_config = self.create_mock_config() + self._fake_context.get_job_config = Mock(return_value=mock_config) + self._mock_options.wait = True + + resp = self.create_simple_success_response() + resp.result = Result(startJobUpdateResult=StartJobUpdateResult( + key=JobUpdateKey(job=JobKey(role="role", environment="env", name="name"), id="id"))) + self._mock_api.start_job_update.return_value = resp + self._mock_api.query_job_updates.side_effect = [ + get_status_query_response(status=JobUpdateStatus.ROLLED_FORWARD) + ] + + assert self._command.execute(self._fake_context) == EXIT_OK + + assert self._mock_api.start_job_update.mock_calls == [call(ANY, None, None)] + assert self._mock_api.query_job_updates.mock_calls == [ + call(update_key=resp.result.startJobUpdateResult.key) + ] + + assert self._fake_context.get_out() == [ + StartUpdate.UPDATE_MSG_TEMPLATE % ('http://something_or_other/scheduler/role/env/name/id'), + 'Current state ROLLED_FORWARD' ] - assert self._fake_context.get_out() == [update_url_msg] assert self._fake_context.get_err() == [] def test_start_update_command_line_succeeds_noop_update(self): @@ -162,9 +190,7 @@ class TestStartUpdate(AuroraClientCommandTest): result = self._command.execute(self._fake_context) assert result == EXIT_OK - assert self._mock_api.start_job_update.mock_calls == [ - call(ANY, None, None) - ] + assert self._mock_api.start_job_update.mock_calls == [call(ANY, None, None)] assert self._fake_context.get_out() == ["Noop update."] assert self._fake_context.get_err() == [] @@ -179,9 +205,7 @@ class TestStartUpdate(AuroraClientCommandTest): self._command.execute(self._fake_context) assert e.value == error - assert self._mock_api.start_job_update.mock_calls == [ - call(ANY, None, None) - ] + assert self._mock_api.start_job_update.mock_calls == [call(ANY, None, None)] class TestListUpdates(AuroraClientCommandTest): @@ -202,7 +226,7 @@ class TestListUpdates(AuroraClientCommandTest): assert self._command.execute(self._fake_context) == EXIT_OK assert self._mock_api.query_job_updates.mock_calls == [ - call(role=None, user="me", job_key=None, update_statuses=None) + call(role=None, user="me", job_key=None, update_statuses=None) ] # Ideally we would use a resource file for this, but i was unable to find a way to make that @@ -222,11 +246,11 @@ west/bozo/test/hello update_id self._mock_api.query_job_updates.return_value = get_status_query_response(count=3) assert self._command.execute(self._fake_context) == EXIT_OK assert self._mock_api.query_job_updates.mock_calls == [ - call( - role=None, - user=None, - job_key=None, - update_statuses=set(JobUpdateStatus._VALUES_TO_NAMES.keys())) + call( + role=None, + user=None, + job_key=None, + update_statuses=set(JobUpdateStatus._VALUES_TO_NAMES.keys())) ] def test_list_updates_by_env(self): @@ -235,7 +259,7 @@ west/bozo/test/hello update_id self._mock_api.query_job_updates.return_value = get_status_query_response(count=3) assert self._command.execute(self._fake_context) == EXIT_OK assert self._mock_api.query_job_updates.mock_calls == [ - call(role="role", user=None, job_key=None, update_statuses=None) + call(role="role", user=None, job_key=None, update_statuses=None) ] # None of the returned values matched the env filter, so there is no output. assert self._fake_context.get_out_str() == '' @@ -274,7 +298,6 @@ west/bozo/test/hello update_id class TestUpdateStatus(AuroraClientCommandTest): - def setUp(self): self._command = UpdateInfo() self._mock_options = mock_verb_options(self._command) @@ -310,7 +333,7 @@ class TestPauseUpdate(AuroraClientCommandTest): self._mock_options.message = 'hello' assert self._command.execute(self._fake_context) == EXIT_OK assert self._mock_api.query_job_updates.mock_calls == [ - call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)] + call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)] assert self._mock_api.pause_job_update.mock_calls == [call(UPDATE_KEY, 'hello')] assert self._fake_context.get_out() == ["Update has been paused."] assert self._fake_context.get_err() == [] @@ -344,7 +367,7 @@ class TestAbortUpdate(AuroraClientCommandTest): assert self._command.execute(self._fake_context) == EXIT_OK assert self._mock_api.query_job_updates.mock_calls == [ - call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)] + call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)] assert self._mock_api.abort_job_update.mock_calls == [call(UPDATE_KEY, 'hello')] assert self._fake_context.get_out() == ["Update has been aborted."] assert self._fake_context.get_err() == [] @@ -356,7 +379,7 @@ class TestAbortUpdate(AuroraClientCommandTest): with pytest.raises(Context.CommandError): self._command.execute(self._fake_context) assert self._mock_api.query_job_updates.mock_calls == [ - call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)] + call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)] assert self._mock_api.abort_job_update.mock_calls == [call(UPDATE_KEY, None)] assert self._fake_context.get_out() == [] assert self._fake_context.get_err() == ["Failed to abort update due to error:", "\tWhoops"] @@ -371,7 +394,7 @@ class TestAbortUpdate(AuroraClientCommandTest): 'scheduler returned multiple active updates for this job.') assert self._mock_api.query_job_updates.mock_calls == [ - call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)] + call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)] assert self._mock_api.abort_job_update.mock_calls == [] assert self._fake_context.get_out() == [] assert self._fake_context.get_err() == [] @@ -392,7 +415,7 @@ class TestResumeUpdate(AuroraClientCommandTest): self._mock_options.message = 'hello' assert self._command.execute(self._fake_context) == EXIT_OK assert self._mock_api.query_job_updates.mock_calls == [ - call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)] + call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)] assert self._mock_api.resume_job_update.mock_calls == [call(UPDATE_KEY, 'hello')] assert self._fake_context.get_out() == ["Update has been resumed."] assert self._fake_context.get_err() == [] @@ -403,7 +426,7 @@ class TestResumeUpdate(AuroraClientCommandTest): with pytest.raises(Context.CommandError): self._command.execute(self._fake_context) assert self._mock_api.query_job_updates.mock_calls == [ - call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)] + call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)] assert self._mock_api.resume_job_update.mock_calls == [call(UPDATE_KEY, None)] assert self._fake_context.get_out() == [] assert self._fake_context.get_err() == ["Failed to resume update due to error:", "\tWhoops"] @@ -549,3 +572,61 @@ Instance events: } ] } + + +class TestUpdateWait(AuroraClientCommandTest): + def setUp(self): + self._command = UpdateWait(clock=mock.create_autospec(spec=time)) + self._mock_options = mock_verb_options(self._command) + self._mock_options.jobspec = self.TEST_JOBKEY + self._mock_options.id = 'update_id' + self._fake_context = FakeAuroraCommandContext() + self._fake_context.set_options(self._mock_options) + self._mock_api = self._fake_context.get_api('UNUSED') + self._fetch_call = call( + update_key=JobUpdateKey(job=self.TEST_JOBKEY.to_thrift(), id=self._mock_options.id)) + + def test_wait(self): + updating_response = get_status_query_response(status=JobUpdateStatus.ROLLING_FORWARD) + updated_response = get_status_query_response(status=JobUpdateStatus.ROLLED_FORWARD) + + self._mock_api.query_job_updates.side_effect = [updating_response, updated_response] + + assert self._command.execute(self._fake_context) == EXIT_OK + assert self._fake_context.get_out() == ['Current state ROLLING_FORWARD', + 'Current state ROLLED_FORWARD'] + + assert self._mock_api.query_job_updates.mock_calls == [self._fetch_call, self._fetch_call] + + def test_wait_non_ok_response(self): + self._mock_api.query_job_updates.return_value = Response( + responseCode=ResponseCode.INVALID_REQUEST) + + with pytest.raises(Context.CommandError) as e: + self._command.execute(self._fake_context) + assert e.value.code == EXIT_API_ERROR + + assert self._fake_context.get_out() == [] + assert self._mock_api.query_job_updates.mock_calls == [self._fetch_call] + + def test_update_wait_not_found(self): + self._mock_api.query_job_updates.return_value = Response( + responseCode=ResponseCode.OK, + result=Result(getJobUpdateSummariesResult=GetJobUpdateSummariesResult(updateSummaries=[]))) + + with pytest.raises(Context.CommandError) as e: + self._command.execute(self._fake_context) + assert e.value.code == EXIT_INVALID_PARAMETER + + assert self._fake_context.get_out() == [] + assert self._mock_api.query_job_updates.mock_calls == [self._fetch_call] + + def test_wait_scheduler_returns_multiple_summaries(self): + self._mock_api.query_job_updates.return_value = get_status_query_response(count=2) + + with pytest.raises(Context.CommandError) as e: + self._command.execute(self._fake_context) + assert e.value.code == EXIT_API_ERROR + + assert self._fake_context.get_out() == [] + assert self._mock_api.query_job_updates.mock_calls == [self._fetch_call] http://git-wip-us.apache.org/repos/asf/aurora/blob/c34e50c7/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh ---------------------------------------------------------------------- diff --git a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh index 501d111..018efbe 100755 --- a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh +++ b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh @@ -146,25 +146,6 @@ assert_update_state() { fi } -await_update_finished() { - local _jobkey=$1 - - local _matched=0 - for i in $(seq 1 120); do - if [[ $(aurora update list $_jobkey --status active | wc -l) -eq 0 ]]; then - _matched=1 - break - else - sleep 1 - fi - done - - if [[ $_matched -ne 1 ]]; then - echo "Timed out while waiting for update $_jobkey" - exit 1 - fi -} - test_update() { local _jobkey=$1 _config=$2 _cluster=$3 @@ -178,8 +159,7 @@ test_update() { assert_update_state $_jobkey 'ROLL_FORWARD_PAUSED' aurora update resume $_jobkey assert_update_state $_jobkey 'ROLLING_FORWARD' - - await_update_finished $_jobkey + aurora update wait $_jobkey $_update_id # Check that the update ended in ROLLED_FORWARD state. Assumes the status is the last column. local status=$(aurora update info $_jobkey $_update_id | grep 'Current status' | awk '{print $NF}')
