Repository: aurora Updated Branches: refs/heads/master 633948ab0 -> 5069f93be
Introduce UpdateMetadata fields in JobUpdateRequest. Bugs closed: AURORA-1711 Reviewed at https://reviews.apache.org/r/51384/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/5069f93b Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/5069f93b Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/5069f93b Branch: refs/heads/master Commit: 5069f93be36c0a8720415c073e662fe430ca06f1 Parents: 633948a Author: Santhosh Kumar Shanmugham <[email protected]> Authored: Tue Sep 13 16:55:20 2016 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Tue Sep 13 16:55:20 2016 -0700 ---------------------------------------------------------------------- RELEASE-NOTES.md | 1 + .../thrift/org/apache/aurora/gen/api.thrift | 9 ++ .../org/apache/aurora/benchmark/JobUpdates.java | 15 +++- .../aurora/benchmark/UpdateStoreBenchmarks.java | 40 +++++++++ .../scheduler/storage/db/DbJobUpdateStore.java | 7 ++ .../storage/db/JobUpdateDetailsMapper.java | 11 +++ .../V008_CreateUpdateMetadataTable.java | 45 ++++++++++ .../thrift/SchedulerThriftInterface.java | 13 ++- .../updater/JobUpdateControllerImpl.java | 12 ++- .../updater/UpdateInProgressException.java | 32 +++++++ .../python/apache/aurora/client/api/__init__.py | 11 ++- .../python/apache/aurora/client/cli/update.py | 42 +++++++++- .../apache/aurora/client/hooks/hooked_api.py | 4 +- .../storage/db/JobUpdateDetailsMapper.xml | 33 ++++++++ .../aurora/scheduler/storage/db/schema.sql | 7 ++ .../storage/db/DbJobUpdateStoreTest.java | 25 +++++- .../thrift/ReadOnlySchedulerImplTest.java | 7 +- .../thrift/SchedulerThriftInterfaceTest.java | 48 +++++++++-- .../aurora/scheduler/updater/JobUpdaterIT.java | 33 +++++++- .../apache/aurora/client/cli/test_supdate.py | 87 ++++++++++++++++++-- .../python/apache/aurora/client/cli/util.py | 10 +++ 21 files changed, 462 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/RELEASE-NOTES.md ---------------------------------------------------------------------- diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 23e8168..ad2c68a 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -39,6 +39,7 @@ - Add a new MTTS (Median Time To Starting) metric in addition to MTTA and MTTR. - In addition to CPU resources, RAM resources can now be treated as revocable via the scheduler commandline flag `-enable_revocable_ram`. +- Introduce UpdateMetadata fields in JobUpdateRequest to allow clients to store metadata on update. ### Deprecations and removals: http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/api/src/main/thrift/org/apache/aurora/gen/api.thrift ---------------------------------------------------------------------- diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift index ca014f3..a045a21 100644 --- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift +++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift @@ -770,6 +770,9 @@ struct JobUpdateSummary { /** Current job update state. */ 4: JobUpdateState state + + /** Update metadata supplied by the client. */ + 6: optional set<Metadata> metadata } /** Update configuration and setting details. */ @@ -814,6 +817,9 @@ struct JobUpdateRequest { /** Update settings and limits. */ 3: JobUpdateSettings settings + + /** Update metadata supplied by the client issuing the JobUpdateRequest. */ + 4: optional set<Metadata> metadata } /** @@ -887,6 +893,9 @@ struct GetPendingReasonResult { struct StartJobUpdateResult { /** Unique identifier for the job update. */ 1: JobUpdateKey key + + /** Summary of the update that is in progress for the given JobKey. */ + 2: optional JobUpdateSummary updateSummary } /** Result of the getJobUpdateSummaries call. */ http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java b/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java index f4f8d00..cedddf4 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java +++ b/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java @@ -35,6 +35,7 @@ import org.apache.aurora.gen.JobUpdateStatus; import org.apache.aurora.gen.JobUpdateSummary; import org.apache.aurora.gen.Lock; import org.apache.aurora.gen.LockKey; +import org.apache.aurora.gen.Metadata; import org.apache.aurora.gen.Range; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.base.TaskTestUtil; @@ -96,6 +97,7 @@ final class JobUpdates { private int numEvents = 1; private int numInstanceEvents = 5000; private int numInstanceOverrides = 1; + private int numUpdateMetadata = 10; Builder setNumEvents(int newCount) { numEvents = newCount; @@ -112,6 +114,11 @@ final class JobUpdates { return this; } + Builder setNumUpdateMetadata(int newCount) { + numUpdateMetadata = newCount; + return this; + } + Set<IJobUpdateDetails> build(int count) { ImmutableSet.Builder<IJobUpdateDetails> result = ImmutableSet.builder(); for (int i = 0; i < count; i++) { @@ -121,10 +128,16 @@ final class JobUpdates { TaskConfig task = TaskTestUtil.makeConfig(IJobKey.build(job)).newBuilder(); task.getExecutorConfig().setData(string(10000)); + ImmutableSet.Builder<Metadata> metadata = ImmutableSet.builder(); + for (int k = 0; k < numUpdateMetadata; k++) { + metadata.add(new Metadata("key-" + k, "value=" + k)); + } + JobUpdate update = new JobUpdate() .setSummary(new JobUpdateSummary() .setKey(key) - .setUser(USER)) + .setUser(USER) + .setMetadata(metadata.build())) .setInstructions(new JobUpdateInstructions() .setSettings(new JobUpdateSettings() .setUpdateGroupSize(100) http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java index e5228ae..cac02a5 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java @@ -118,4 +118,44 @@ public class UpdateStoreBenchmarks { Iterables.getOnlyElement(keys)).get()); } } + + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) + @Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) + @Fork(1) + @State(Scope.Thread) + public static class JobUpdateMetadataBenchmark { + private Storage storage; + private Set<IJobUpdateKey> keys; + + @Param({"10", "100", "1000", "10000"}) + private int metadata; + + @Setup(Level.Trial) + public void setUp() { + storage = DbUtil.createStorage(); + } + + @Setup(Level.Iteration) + public void setUpIteration() { + keys = JobUpdates.saveUpdates( + storage, + new JobUpdates.Builder().setNumUpdateMetadata(metadata).build(1)); + } + + @TearDown(Level.Iteration) + public void tearDownIteration() { + storage.write((NoResult.Quiet) storeProvider -> { + storeProvider.getJobUpdateStore().deleteAllUpdatesAndEvents(); + storeProvider.getLockStore().deleteLocks(); + }); + } + + @Benchmark + public IJobUpdateDetails run() throws TException { + return storage.read(store -> store.getJobUpdateStore().fetchJobUpdateDetails( + Iterables.getOnlyElement(keys)).get()); + } + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java index d2673e6..e0698a3 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java @@ -41,6 +41,7 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions; import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery; import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary; +import org.apache.aurora.scheduler.storage.entities.IMetadata; import org.apache.aurora.scheduler.storage.entities.IRange; import static java.util.Objects.requireNonNull; @@ -94,6 +95,12 @@ public class DbJobUpdateStore implements JobUpdateStore.Mutable { detailsMapper.insertLockToken(key, lockToken.get()); } + if (!update.getSummary().getMetadata().isEmpty()) { + detailsMapper.insertJobUpdateMetadata( + key, + IMetadata.toBuildersSet(update.getSummary().getMetadata())); + } + // Insert optional instance update overrides. Set<IRange> instanceOverrides = update.getInstructions().getSettings().getUpdateOnlyTheseInstances(); http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java index a3b0494..222ac2d 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java @@ -22,6 +22,7 @@ import org.apache.aurora.gen.JobInstanceUpdateEvent; import org.apache.aurora.gen.JobUpdate; import org.apache.aurora.gen.JobUpdateQuery; import org.apache.aurora.gen.JobUpdateSummary; +import org.apache.aurora.gen.Metadata; import org.apache.aurora.gen.Range; import org.apache.aurora.scheduler.storage.db.views.DbJobUpdate; import org.apache.aurora.scheduler.storage.db.views.DbJobUpdateInstructions; @@ -68,6 +69,16 @@ interface JobUpdateDetailsMapper { @Param("result") InsertResult result); /** + * Inserts the job update metadata entries for an update. + * + * @param key Update to insert task configs for. + * @param metadata Set of metadata (key, value) pairs. + */ + void insertJobUpdateMetadata( + @Param("key") IJobUpdateKey key, + @Param("metadata") Set<Metadata> metadata); + + /** * Maps inserted task config with a set of associated instance ranges. * * @param configId ID of the task config stored. http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/src/main/java/org/apache/aurora/scheduler/storage/db/migration/V008_CreateUpdateMetadataTable.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/migration/V008_CreateUpdateMetadataTable.java b/src/main/java/org/apache/aurora/scheduler/storage/db/migration/V008_CreateUpdateMetadataTable.java new file mode 100644 index 0000000..bc86271 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/migration/V008_CreateUpdateMetadataTable.java @@ -0,0 +1,45 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db.migration; + +import java.math.BigDecimal; + +import org.apache.ibatis.migration.MigrationScript; + +public class V008_CreateUpdateMetadataTable implements MigrationScript { + @Override + public BigDecimal getId() { + return BigDecimal.valueOf(8L); + } + + @Override + public String getDescription() { + return "Create the job_update_metadata table."; + } + + @Override + public String getUpScript() { + return "CREATE TABLE IF NOT EXISTS job_update_metadata(" + + "id IDENTITY," + + "update_row_id BIGINT NOT NULL REFERENCES job_updates(id) ON DELETE CASCADE," + + "key VARCHAR NOT NULL," + + "value VARCHAR NOT NULL" + + ");"; + } + + @Override + public String getDownScript() { + return "DROP TABLE IF EXISTS job_update_metadata;"; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java index 26c45fd..16b1b52 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java @@ -108,6 +108,7 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; import org.apache.aurora.scheduler.storage.entities.IJobUpdateRequest; import org.apache.aurora.scheduler.storage.entities.IJobUpdateSettings; import org.apache.aurora.scheduler.storage.entities.ILockKey; +import org.apache.aurora.scheduler.storage.entities.IMetadata; import org.apache.aurora.scheduler.storage.entities.IRange; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; @@ -117,6 +118,7 @@ import org.apache.aurora.scheduler.thrift.auth.DecoratedThrift; import org.apache.aurora.scheduler.updater.JobDiff; import org.apache.aurora.scheduler.updater.JobUpdateController; import org.apache.aurora.scheduler.updater.JobUpdateController.AuditData; +import org.apache.aurora.scheduler.updater.UpdateInProgressException; import org.apache.aurora.scheduler.updater.UpdateStateException; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -938,7 +940,8 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { IJobUpdate update = IJobUpdate.build(new JobUpdate() .setSummary(new JobUpdateSummary() .setKey(new JobUpdateKey(job.newBuilder(), updateId)) - .setUser(remoteUserName)) + .setUser(remoteUserName) + .setMetadata(IMetadata.toBuildersSet(request.getMetadata()))) .setInstructions(instructions)); Response response = empty(); @@ -953,7 +956,13 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { new AuditData(remoteUserName, Optional.fromNullable(message))); return response.setResponseCode(OK) .setResult(Result.startJobUpdateResult( - new StartJobUpdateResult(update.getSummary().getKey().newBuilder()))); + new StartJobUpdateResult(update.getSummary().getKey().newBuilder()) + .setUpdateSummary(update.getSummary().newBuilder()))); + } catch (UpdateInProgressException e) { + return error(INVALID_REQUEST, e) + .setResult(Result.startJobUpdateResult( + new StartJobUpdateResult(e.getInProgressUpdateSummary().getKey().newBuilder()) + .setUpdateSummary(e.getInProgressUpdateSummary().newBuilder()))); } catch (UpdateStateException | TaskValidationException e) { return error(INVALID_REQUEST, e); } http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java index 594bb62..ef6253e 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java @@ -168,9 +168,17 @@ class JobUpdateControllerImpl implements JobUpdateController { List<IJobUpdateSummary> activeJobUpdates = storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(queryActiveByJob(job)); if (!activeJobUpdates.isEmpty()) { - throw new UpdateStateException("An active update already exists for this job, " + if (activeJobUpdates.size() > 1) { + LOG.error("Multiple active updates exist for this job. {}", activeJobUpdates); + throw new UpdateStateException( + String.format("Multiple active updates exist for this job. %s", activeJobUpdates)); + } + + IJobUpdateSummary activeJobUpdate = activeJobUpdates.get(0); + throw new UpdateInProgressException("An active update already exists for this job, " + "please terminate it before starting another. " - + "Active updates are those in states " + Updates.ACTIVE_JOB_UPDATE_STATES); + + "Active updates are those in states " + Updates.ACTIVE_JOB_UPDATE_STATES, + activeJobUpdate); } LOG.info("Starting update for job " + job); http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/src/main/java/org/apache/aurora/scheduler/updater/UpdateInProgressException.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdateInProgressException.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdateInProgressException.java new file mode 100644 index 0000000..5759585 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdateInProgressException.java @@ -0,0 +1,32 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.updater; + +import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary; + +/** + * Thrown when there is an in-progress update for the job already. + */ +public class UpdateInProgressException extends UpdateStateException { + private final IJobUpdateSummary inProgressUpdateSummary; + + public UpdateInProgressException(String message, IJobUpdateSummary inProgressUpdateSummary) { + super(message); + this.inProgressUpdateSummary = inProgressUpdateSummary; + } + + public IJobUpdateSummary getInProgressUpdateSummary() { + return inProgressUpdateSummary; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/src/main/python/apache/aurora/client/api/__init__.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/api/__init__.py b/src/main/python/apache/aurora/client/api/__init__.py index f8b9690..e1dde63 100644 --- a/src/main/python/apache/aurora/client/api/__init__.py +++ b/src/main/python/apache/aurora/client/api/__init__.py @@ -32,6 +32,7 @@ from gen.apache.aurora.api.ttypes import ( JobUpdateKey, JobUpdateQuery, JobUpdateRequest, + Metadata, Resource, ResourceAggregate, TaskQuery @@ -140,7 +141,7 @@ class AuroraClientAPI(object): except SchedulerProxy.ThriftInternalError as e: raise self.ThriftInternalError(e.args[0]) - def _job_update_request(self, config, instances=None): + def _job_update_request(self, config, instances=None, metadata=None): try: settings = UpdaterConfig(**config.update_config().get()).to_thrift_update_settings(instances) except ValueError as e: @@ -149,20 +150,22 @@ class AuroraClientAPI(object): return JobUpdateRequest( instanceCount=config.instances(), settings=settings, - taskConfig=config.job().taskConfig + taskConfig=config.job().taskConfig, + metadata={Metadata(k, v) for k, v in metadata.items()} if metadata else None ) - def start_job_update(self, config, message, instances=None): + def start_job_update(self, config, message, instances=None, metadata=None): """Requests Scheduler to start job update process. Arguments: config -- AuroraConfig instance with update details. message -- Audit message to include with the change. instances -- Optional list of instances to restrict update to. + metadata -- Optional set of metadata (key, value) to associate with the update. Returns response object with update ID and acquired job lock. """ - request = self._job_update_request(config, instances) + request = self._job_update_request(config, instances, metadata) log.info("Starting update for: %s" % config.name()) return self._scheduler_proxy.startJobUpdate(request, message) http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/src/main/python/apache/aurora/client/cli/update.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/cli/update.py b/src/main/python/apache/aurora/client/cli/update.py index a634c44..c701196 100644 --- a/src/main/python/apache/aurora/client/cli/update.py +++ b/src/main/python/apache/aurora/client/cli/update.py @@ -18,6 +18,7 @@ import datetime import json import textwrap import time +import uuid from collections import namedtuple from apache.aurora.client.api import AuroraClientAPI @@ -50,7 +51,12 @@ from apache.aurora.client.cli.options import ( from apache.aurora.common.aurora_job_key import AuroraJobKey from gen.apache.aurora.api.constants import ACTIVE_JOB_UPDATE_STATES -from gen.apache.aurora.api.ttypes import JobUpdateAction, JobUpdateKey, JobUpdateStatus +from gen.apache.aurora.api.ttypes import ( + JobUpdateAction, + JobUpdateKey, + JobUpdateStatus, + ResponseCode +) class UpdateController(object): @@ -129,11 +135,15 @@ WAIT_OPTION = lambda help_msg: CommandOption( action='store_true', help=help_msg) +CLIENT_UPDATE_ID = 'org.apache.aurora.client.update_id' + class StartUpdate(Verb): UPDATE_MSG_TEMPLATE = "Job update has started. View your update progress at %s" + FAILED_TO_START_UPDATE_ERROR_MSG = """Failed to start update due to error:""" + def __init__(self, clock=time): self._clock = clock @@ -171,6 +181,7 @@ class StartUpdate(Verb): job = context.options.instance_spec.jobkey instances = (None if context.options.instance_spec.instance == ALL_INSTANCES else context.options.instance_spec.instance) + update_id = str(uuid.uuid4()) config = context.get_job_config(job, context.options.config_file) if config.raw().has_cron_schedule(): raise context.CommandError( @@ -180,13 +191,16 @@ class StartUpdate(Verb): api = context.get_api(config.cluster()) formatter = DiffFormatter(context, config) formatter.show_job_update_diff(instances) + try: - resp = api.start_job_update(config, context.options.message, instances) + resp = api.start_job_update(config, context.options.message, instances, + {CLIENT_UPDATE_ID: update_id}) except AuroraClientAPI.UpdateConfigError as e: raise context.CommandError(EXIT_INVALID_CONFIGURATION, e.message) - context.log_response_and_raise(resp, err_code=EXIT_API_ERROR, - err_msg="Failed to start update due to error:") + if not self._is_update_already_in_progress(resp, update_id): + context.log_response_and_raise(resp, err_code=EXIT_API_ERROR, + err_msg=self.FAILED_TO_START_UPDATE_ERROR_MSG) if resp.result: update_key = resp.result.startJobUpdateResult.key @@ -203,6 +217,26 @@ class StartUpdate(Verb): return EXIT_OK + def _is_update_already_in_progress(self, resp, update_id): + """Returns True if the response indicates that there was an active update + already in progress that matches the requested update. + + This can happen, if the client and scheduler have diverged state, where the scheduler + already accepted the update, while the client thinks it has not been accepted by + the scheduler and retries.""" + if resp.responseCode == ResponseCode.INVALID_REQUEST and resp.result: + start_update_result = resp.result.startJobUpdateResult + if start_update_result and start_update_result.updateSummary: + in_progress_update_summary = start_update_result.updateSummary + if in_progress_update_summary.metadata: + active_update_id = [x.value for x in in_progress_update_summary.metadata if x.key == + CLIENT_UPDATE_ID] + if active_update_id and len(active_update_id) == 1: + if active_update_id[0] == update_id: + return True + + return False + def rollback_state_to_err_code(state): return (EXIT_OK if state == JobUpdateStatus.ROLLED_BACK else http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/src/main/python/apache/aurora/client/hooks/hooked_api.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/hooks/hooked_api.py b/src/main/python/apache/aurora/client/hooks/hooked_api.py index 542f165..86c3a09 100644 --- a/src/main/python/apache/aurora/client/hooks/hooked_api.py +++ b/src/main/python/apache/aurora/client/hooks/hooked_api.py @@ -181,7 +181,7 @@ class HookedAuroraClientAPI(NonHookedAuroraClientAPI): _partial(super(HookedAuroraClientAPI, self).start_cronjob, job_key, config=config)) - def start_job_update(self, config, message, instances=None): + def start_job_update(self, config, message, instances=None, metadata=None): return self._hooked_call(config, None, _partial(super(HookedAuroraClientAPI, self).start_job_update, - config, message, instances=instances)) + config, message, instances=instances, metadata=metadata)) http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml ---------------------------------------------------------------------- diff --git a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml index 8563630..d91c650 100644 --- a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml +++ b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml @@ -95,6 +95,19 @@ ) </insert> + <insert id="insertJobUpdateMetadata"> + INSERT INTO job_update_metadata ( + update_row_id, + key, + value + ) VALUES + <foreach item="element" collection="metadata" open="(" separator="),(" close=")"> + <include refid="select_update_row_id"/>, + #{element.key}, + #{element.value} + </foreach> + </insert> + <sql id="insert_instance_ranges"> <foreach item="element" collection="ranges" open="(" separator="),(" close=")"> <include refid="select_update_row_id"/>, @@ -149,6 +162,11 @@ <association property="state" resultMap="jobUpdateStateMap" columnPrefix="just_" /> + <collection property="metadata" + select="selectJobUpdateMetadata" + column="id" + foreignColumn="update_row_id"> + </collection> </resultMap> <resultMap id="rangeMap" type="org.apache.aurora.gen.Range"> @@ -193,6 +211,10 @@ <association property="instructions" resultMap="jobUpdateInstructionMap" columnPrefix="jui_"/> </resultMap> + <resultMap id="metadataMap" type="org.apache.aurora.gen.Metadata"> + <id column="id" /> + </resultMap> + <resultMap id="jobInstanceUpdateMap" type="org.apache.aurora.gen.JobInstanceUpdateEvent"> <id column="id" /> <result property="action" @@ -308,6 +330,7 @@ <select id="selectSummaries" resultMap="jobUpdateSummaryMap"> SELECT + u.id AS id, u.update_id AS update_id, u.user AS user, max_status.status AS just_status, @@ -328,6 +351,7 @@ For example: jusm_just_status maps to JobUpdateSummary/JobUpdateState/status field.--> <sql id="job_update_columns"> u.id AS u_id, + u.id AS jusm_id, u.update_id AS jusm_update_id, u.user AS jusm_user, max_status.status AS jusm_just_status, @@ -490,6 +514,15 @@ ORDER BY e.timestamp_ms </select> + <select id="selectJobUpdateMetadata" resultMap="metadataMap"> + SELECT + m.id, + m.key, + m.value + FROM job_update_metadata as m + WHERE update_row_id = #{id} + </select> + <delete id="truncate"> DELETE FROM job_updates; </delete> http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql ---------------------------------------------------------------------- diff --git a/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql b/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql index a40830f..e943c64 100644 --- a/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql +++ b/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql @@ -309,6 +309,13 @@ CREATE TABLE job_updates( UNIQUE(update_id, job_key_id) ); +CREATE TABLE job_update_metadata( + id IDENTITY, + update_row_id BIGINT NOT NULL REFERENCES job_updates(id) ON DELETE CASCADE, + key VARCHAR NOT NULL, + value VARCHAR NOT NULL +); + CREATE TABLE job_update_locks( id IDENTITY, update_row_id BIGINT NOT NULL REFERENCES job_updates(id) ON DELETE CASCADE, http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/src/test/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStoreTest.java index 0853039..d01dc3f 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStoreTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStoreTest.java @@ -42,6 +42,7 @@ import org.apache.aurora.gen.JobUpdateStatus; import org.apache.aurora.gen.JobUpdateSummary; import org.apache.aurora.gen.Lock; import org.apache.aurora.gen.LockKey; +import org.apache.aurora.gen.Metadata; import org.apache.aurora.gen.Range; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.storage.StoredJobUpdateDetails; @@ -92,6 +93,8 @@ public class DbJobUpdateStoreTest { private static final long CREATED_MS = 111L; private static final IJobUpdateEvent FIRST_EVENT = makeJobUpdateEvent(ROLLING_FORWARD, CREATED_MS); + private static final ImmutableSet<Metadata> METADATA = + ImmutableSet.of(new Metadata("k1", "v1"), new Metadata("k2", "v2"), new Metadata("k3", "v3")); private Storage storage; private FakeStatsProvider stats; @@ -603,6 +606,22 @@ public class DbJobUpdateStoreTest { } @Test + public void testSaveJobUpdateWithDuplicateMetadataKeys() { + IJobUpdateKey updateId = makeKey(JobKeys.from("role", "env", "name1"), "u1"); + + ImmutableSet<Metadata> duplicatedMetadata = + ImmutableSet.of(new Metadata("k1", "v1"), new Metadata("k1", "v2")); + JobUpdate builder = makeJobUpdate(updateId).newBuilder(); + builder.getSummary().setMetadata(duplicatedMetadata); + IJobUpdate update = IJobUpdate.build(builder); + + assertEquals(Optional.absent(), getUpdate(updateId)); + + saveUpdate(update, Optional.of("lock1")); + assertUpdate(update); + } + + @Test public void testLockCleared() { IJobUpdate update = makeJobUpdate(makeKey("update1")); saveUpdate(update, Optional.of("lock1")); @@ -987,7 +1006,8 @@ public class DbJobUpdateStoreTest { private static IJobUpdateSummary makeSummary(IJobUpdateKey key, String user) { return IJobUpdateSummary.build(new JobUpdateSummary() .setKey(key.newBuilder()) - .setUser(user)); + .setUser(user) + .setMetadata(METADATA)); } private IJobUpdateSummary saveSummary( @@ -999,7 +1019,8 @@ public class DbJobUpdateStoreTest { IJobUpdateSummary summary = IJobUpdateSummary.build(new JobUpdateSummary() .setKey(key.newBuilder()) - .setUser(user)); + .setUser(user) + .setMetadata(METADATA)); IJobUpdate update = makeJobUpdate(summary); saveUpdate(update, lockToken); http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/src/test/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImplTest.java index 7935432..6d0e9bc 100644 --- a/src/test/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImplTest.java @@ -50,6 +50,7 @@ import org.apache.aurora.gen.JobUpdateQuery; import org.apache.aurora.gen.JobUpdateRequest; import org.apache.aurora.gen.JobUpdateSettings; import org.apache.aurora.gen.JobUpdateSummary; +import org.apache.aurora.gen.Metadata; import org.apache.aurora.gen.PendingReason; import org.apache.aurora.gen.PopulateJobResult; import org.apache.aurora.gen.Range; @@ -121,6 +122,9 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class ReadOnlySchedulerImplTest extends EasyMockTest { + private static final ImmutableSet<Metadata> METADATA = + ImmutableSet.of(new Metadata("k1", "v1"), new Metadata("k2", "v2"), new Metadata("k3", "v3")); + private StorageTestUtil storageUtil; private NearestFit nearestFit; private CronPredictor cronPredictor; @@ -630,7 +634,8 @@ public class ReadOnlySchedulerImplTest extends EasyMockTest { for (int i = 0; i < count; i++) { builder.add(new JobUpdateSummary() .setKey(new JobUpdateKey(JOB_KEY.newBuilder(), "id" + 1)) - .setUser(USER)); + .setUser(USER) + .setMetadata(METADATA)); } return builder.build(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java index fa32750..b28cd24 100644 --- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java +++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java @@ -56,6 +56,7 @@ import org.apache.aurora.gen.ListBackupsResult; import org.apache.aurora.gen.LockKey; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.MesosContainer; +import org.apache.aurora.gen.Metadata; import org.apache.aurora.gen.PulseJobUpdateResult; import org.apache.aurora.gen.QueryRecoveryResult; import org.apache.aurora.gen.Range; @@ -100,6 +101,7 @@ import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IJobUpdate; import org.apache.aurora.scheduler.storage.entities.ILockKey; +import org.apache.aurora.scheduler.storage.entities.IMetadata; import org.apache.aurora.scheduler.storage.entities.IRange; import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -107,6 +109,7 @@ import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.apache.aurora.scheduler.updater.JobUpdateController; import org.apache.aurora.scheduler.updater.JobUpdateController.AuditData; +import org.apache.aurora.scheduler.updater.UpdateInProgressException; import org.apache.aurora.scheduler.updater.UpdateStateException; import org.apache.thrift.TException; import org.easymock.EasyMock; @@ -171,6 +174,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { private static final AuditData AUDIT = new AuditData(USER, Optional.of(AUDIT_MESSAGE)); private static final Thresholds THRESHOLDS = new Thresholds(1000, 2000); private static final String EXECUTOR_NAME = apiConstants.AURORA_EXECUTOR_NAME; + private static final ImmutableSet<Metadata> METADATA = + ImmutableSet.of(new Metadata("k1", "v1"), new Metadata("k2", "v2")); private StorageTestUtil storageUtil; private LockManager lockManager; @@ -1482,8 +1487,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { Response response = assertOkResponse(thrift.startJobUpdate(buildJobUpdateRequest(update), AUDIT_MESSAGE)); assertEquals( - new StartJobUpdateResult(UPDATE_KEY.newBuilder()), - response.getResult().getStartJobUpdateResult()); + new StartJobUpdateResult(UPDATE_KEY.newBuilder()).setUpdateSummary( + update.getSummary().newBuilder()), response.getResult().getStartJobUpdateResult()); } private void expectJobUpdateQuotaCheck(QuotaCheckResult result) { @@ -1530,7 +1535,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { Response response = assertOkResponse( thrift.startJobUpdate(buildJobUpdateRequest(update), AUDIT_MESSAGE)); assertEquals( - new StartJobUpdateResult(UPDATE_KEY.newBuilder()), + new StartJobUpdateResult(UPDATE_KEY.newBuilder()).setUpdateSummary(expected.getSummary()), response.getResult().getStartJobUpdateResult()); } @@ -1809,6 +1814,37 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { } @Test + public void testStartUpdateFailsInControllerWhenUpdateInProgress() throws Exception { + expectGetRemoteUser(); + expectNoCronJob(); + + IScheduledTask oldTask = buildTaskForJobUpdate(0, "old"); + ITaskConfig newTask = buildTaskForJobUpdate(0, "new").getAssignedTask().getTask(); + + IJobUpdate update = buildJobUpdate( + 1, + newTask, + ImmutableMap.of(oldTask.getAssignedTask().getTask(), ImmutableSet.of(new Range(0, 0)))); + + expect(uuidGenerator.createNew()).andReturn(UU_ID); + expect(taskIdGenerator.generate(newTask, 1)).andReturn(TASK_ID); + expectJobUpdateQuotaCheck(ENOUGH_QUOTA); + + storageUtil.expectTaskFetch(Query.unscoped().byJob(JOB_KEY).active(), oldTask); + jobUpdateController.start(update, AUDIT); + expectLastCall().andThrow(new UpdateInProgressException("failed", update.getSummary())); + + control.replay(); + + Response response = thrift.startJobUpdate(buildJobUpdateRequest(update), AUDIT_MESSAGE); + + assertResponse(INVALID_REQUEST, response); + assertEquals( + new StartJobUpdateResult(UPDATE_KEY.newBuilder()).setUpdateSummary( + update.getSummary().newBuilder()), response.getResult().getStartJobUpdateResult()); + } + + @Test public void testPauseJobUpdateByCoordinator() throws Exception { expectGetRemoteUser(); jobUpdateController.pause(UPDATE_KEY, AUDIT); @@ -2011,7 +2047,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { .setInstanceCount(rangesToInstanceCount( update.getInstructions().getDesiredState().getInstances())) .setSettings(update.getInstructions().getSettings().newBuilder()) - .setTaskConfig(update.getInstructions().getDesiredState().getTask().newBuilder()); + .setTaskConfig(update.getInstructions().getDesiredState().getTask().newBuilder()) + .setMetadata(IMetadata.toBuildersSet(update.getSummary().getMetadata())); } private static IJobUpdate buildJobUpdate( @@ -2027,7 +2064,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { return IJobUpdate.build(new JobUpdate() .setSummary(new JobUpdateSummary() .setKey(UPDATE_KEY.newBuilder()) - .setUser(IDENTITY.getUser())) + .setUser(IDENTITY.getUser()) + .setMetadata(METADATA)) .setInstructions(new JobUpdateInstructions() .setSettings(buildJobUpdateSettings()) .setDesiredState(new InstanceTaskConfig() http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java index 196df47..f879827 100644 --- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java +++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java @@ -51,9 +51,11 @@ import org.apache.aurora.gen.JobUpdateInstructions; import org.apache.aurora.gen.JobUpdateKey; import org.apache.aurora.gen.JobUpdatePulseStatus; import org.apache.aurora.gen.JobUpdateSettings; +import org.apache.aurora.gen.JobUpdateState; import org.apache.aurora.gen.JobUpdateStatus; import org.apache.aurora.gen.JobUpdateSummary; import org.apache.aurora.gen.LockKey; +import org.apache.aurora.gen.Metadata; import org.apache.aurora.gen.Range; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; @@ -142,6 +144,8 @@ public class JobUpdaterIT extends EasyMockTest { setExecutorData(TaskTestUtil.makeConfig(JOB), "olddata"); private static final ITaskConfig NEW_CONFIG = setExecutorData(OLD_CONFIG, "newdata"); private static final long PULSE_TIMEOUT_MS = 10000; + private static final ImmutableSet<Metadata> METADATA = ImmutableSet.of( + new Metadata("k1", "v1"), new Metadata("k2", "v2")); private FakeScheduledExecutor clock; private JobUpdateController updater; @@ -306,6 +310,11 @@ public class JobUpdaterIT extends EasyMockTest { stateManager.insertPendingTasks(storeProvider, task, instanceIds)); } + private ILock insertInProgressUpdate(IJobUpdate update) { + return storage.write( + storeProvider -> saveJobUpdate(storeProvider.getJobUpdateStore(), update, ROLLING_FORWARD)); + } + private void insertInitialTasks(IJobUpdate update) { storage.write((NoResult.Quiet) storeProvider -> { for (IInstanceTaskConfig config : update.getInstructions().getInitialState()) { @@ -1577,6 +1586,28 @@ public class JobUpdaterIT extends EasyMockTest { ImmutableMap.of(0, NEW_CONFIG, 1, OLD_CONFIG, 2, OLD_CONFIG)); } + @Test + public void testInProgressUpdate() throws Exception { + control.replay(); + + IJobUpdate inProgress = makeJobUpdate(); + ILock lock = insertInProgressUpdate(inProgress); + + IJobUpdate anotherUpdate = makeJobUpdate(); + try { + updater.start(anotherUpdate, AUDIT); + fail("update cannot start when another is in-progress"); + } catch (UpdateInProgressException e) { + // Expected. + assertEquals( + inProgress.getSummary().newBuilder().setState(new JobUpdateState(ROLLING_FORWARD, 0, 0)), + e.getInProgressUpdateSummary().newBuilder()); + assertEquals(ImmutableList.of(lock), ImmutableList.copyOf(lockManager.getLocks())); + } finally { + lockManager.releaseLock(lock); + } + } + private static IJobUpdateSummary makeUpdateSummary(IJobUpdateKey key) { return IJobUpdateSummary.build(new JobUpdateSummary() .setUser("user") @@ -1585,7 +1616,7 @@ public class JobUpdaterIT extends EasyMockTest { private static IJobUpdate makeJobUpdate(IInstanceTaskConfig... configs) { JobUpdate builder = new JobUpdate() - .setSummary(makeUpdateSummary(UPDATE_ID).newBuilder()) + .setSummary(makeUpdateSummary(UPDATE_ID).newBuilder().setMetadata(METADATA)) .setInstructions(new JobUpdateInstructions() .setDesiredState(new InstanceTaskConfig() .setTask(NEW_CONFIG.newBuilder()) http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/src/test/python/apache/aurora/client/cli/test_supdate.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/cli/test_supdate.py b/src/test/python/apache/aurora/client/cli/test_supdate.py index 2dfee1f..92ab375 100644 --- a/src/test/python/apache/aurora/client/cli/test_supdate.py +++ b/src/test/python/apache/aurora/client/cli/test_supdate.py @@ -30,6 +30,7 @@ from apache.aurora.client.cli import ( from apache.aurora.client.cli.diff_formatter import DiffFormatter from apache.aurora.client.cli.options import TaskInstanceKey from apache.aurora.client.cli.update import ( + CLIENT_UPDATE_ID, AbortUpdate, ListUpdates, PauseUpdate, @@ -60,6 +61,7 @@ from gen.apache.aurora.api.ttypes import ( JobUpdateState, JobUpdateStatus, JobUpdateSummary, + Metadata, Response, ResponseCode, ResponseDetail, @@ -122,10 +124,83 @@ class TestStartUpdate(AuroraClientCommandTest): self._command.execute(self._fake_context) assert self._mock_api.start_job_update.mock_calls == [ - call(mock_config, None, self._mock_options.instance_spec.instance) + call(mock_config, None, self._mock_options.instance_spec.instance, ANY) ] self.assert_lock_message(self._fake_context) + def test_start_update_invalid_request(self): + mock_config = self.create_mock_config() + self._fake_context.get_job_config = Mock(return_value=mock_config) + + err_msg = "Error." + self._mock_api.start_job_update.return_value = AuroraClientCommandTest.create_blank_response( + ResponseCode.INVALID_REQUEST, + err_msg) + + with patch('apache.aurora.client.cli.update.DiffFormatter'): + with pytest.raises(Context.CommandError): + self._command.execute(self._fake_context) + + assert self._mock_api.start_job_update.mock_calls == [ + call(mock_config, None, self._mock_options.instance_spec.instance, ANY) + ] + assert self._fake_context.get_err() == [ + StartUpdate.FAILED_TO_START_UPDATE_ERROR_MSG, "\t%s" % err_msg + ] + + def test_start_update_already_inprogress(self): + mock_config = self.create_mock_config() + self._fake_context.get_job_config = Mock(return_value=mock_config) + + update_id = 'some-mocked-uuid' + + err_msg = "Active updates exist for this job." + return_value = AuroraClientCommandTest.create_start_job_update_result( + ResponseCode.INVALID_REQUEST, err_msg, UPDATE_KEY, {Metadata(CLIENT_UPDATE_ID, update_id)}) + self._mock_api.start_job_update.return_value = return_value + + with patch('apache.aurora.client.cli.update.DiffFormatter'): + with patch('apache.aurora.client.cli.update.uuid') as mock_uuid: + mock_uuid.uuid4.return_value = update_id + self._command.execute(self._fake_context) + + assert self._mock_api.start_job_update.mock_calls == [ + call(mock_config, None, self._mock_options.instance_spec.instance, + {CLIENT_UPDATE_ID: update_id}) + ] + assert self._fake_context.get_out() == [ + StartUpdate.UPDATE_MSG_TEMPLATE % + ('http://something_or_other/scheduler/bozo/test/hello/update/update_id'), + ] + assert self._fake_context.get_err() == [] + + def test_start_update_different_update_inprogress(self): + mock_config = self.create_mock_config() + self._fake_context.get_job_config = Mock(return_value=mock_config) + + update_id = 'some-mocked-uuid' + update_id_2 = 'some-other-mocked-uuid' + + err_msg = "Active updates exist for this job." + return_value = AuroraClientCommandTest.create_start_job_update_result( + ResponseCode.INVALID_REQUEST, err_msg, UPDATE_KEY, {Metadata(CLIENT_UPDATE_ID, update_id)}) + self._mock_api.start_job_update.return_value = return_value + + with patch('apache.aurora.client.cli.update.DiffFormatter'): + with pytest.raises(Context.CommandError): + with patch('apache.aurora.client.cli.update.uuid') as mock_uuid: + mock_uuid.uuid4.return_value = update_id_2 + self._command.execute(self._fake_context) + + assert self._mock_api.start_job_update.mock_calls == [ + call(mock_config, None, self._mock_options.instance_spec.instance, + {CLIENT_UPDATE_ID: update_id_2}) + ] + assert self._fake_context.get_out() == [] + assert self._fake_context.get_err() == [ + StartUpdate.FAILED_TO_START_UPDATE_ERROR_MSG, "\t%s" % err_msg + ] + def test_update_cron_job_fails(self): mock_config = self.create_mock_config(is_cron=True) self._fake_context.get_job_config = Mock(return_value=mock_config) @@ -149,7 +224,7 @@ class TestStartUpdate(AuroraClientCommandTest): call(self._mock_options.instance_spec.instance) ] assert self._mock_api.start_job_update.mock_calls == [ - call(mock_config, None, self._mock_options.instance_spec.instance) + call(mock_config, None, self._mock_options.instance_spec.instance, ANY) ] def test_start_update_command_line_succeeds(self): @@ -170,7 +245,7 @@ class TestStartUpdate(AuroraClientCommandTest): call(self._mock_options.instance_spec.instance) ] assert self._mock_api.start_job_update.mock_calls == [ - call(ANY, 'hello', None) + call(ANY, 'hello', None, ANY) ] assert self._fake_context.get_out() == [ StartUpdate.UPDATE_MSG_TEMPLATE % @@ -198,7 +273,7 @@ class TestStartUpdate(AuroraClientCommandTest): assert self._formatter.show_job_update_diff.mock_calls == [ call(self._mock_options.instance_spec.instance) ] - assert self._mock_api.start_job_update.mock_calls == [call(ANY, None, None)] + assert self._mock_api.start_job_update.mock_calls == [call(ANY, None, None, ANY)] assert self._mock_api.query_job_updates.mock_calls == [ call(update_key=resp.result.startJobUpdateResult.key) ] @@ -256,7 +331,7 @@ class TestStartUpdate(AuroraClientCommandTest): assert self._formatter.show_job_update_diff.mock_calls == [ call(self._mock_options.instance_spec.instance) ] - assert self._mock_api.start_job_update.mock_calls == [call(ANY, None, None)] + assert self._mock_api.start_job_update.mock_calls == [call(ANY, None, None, ANY)] assert self._fake_context.get_out() == [ "Noop update." ] @@ -274,7 +349,7 @@ class TestStartUpdate(AuroraClientCommandTest): self._command.execute(self._fake_context) assert e.value == error - assert self._mock_api.start_job_update.mock_calls == [call(ANY, None, None)] + assert self._mock_api.start_job_update.mock_calls == [call(ANY, None, None, ANY)] class TestListUpdates(AuroraClientCommandTest): http://git-wip-us.apache.org/repos/asf/aurora/blob/5069f93b/src/test/python/apache/aurora/client/cli/util.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/cli/util.py b/src/test/python/apache/aurora/client/cli/util.py index aac9f9c..c65ae75 100644 --- a/src/test/python/apache/aurora/client/cli/util.py +++ b/src/test/python/apache/aurora/client/cli/util.py @@ -31,6 +31,7 @@ from gen.apache.aurora.api.ttypes import ( ExecutorConfig, GetTierConfigResult, JobKey, + JobUpdateSummary, Response, ResponseCode, ResponseDetail, @@ -38,6 +39,7 @@ from gen.apache.aurora.api.ttypes import ( ScheduledTask, ScheduleStatus, ScheduleStatusResult, + StartJobUpdateResult, TaskConfig, TaskEvent, TaskQuery, @@ -191,6 +193,14 @@ class AuroraClientCommandTest(unittest.TestCase): return status_response @classmethod + def create_start_job_update_result(cls, code, msg, key, metadata): + resp = cls.create_blank_response(code, msg) + resp.result = Result( + startJobUpdateResult=StartJobUpdateResult(key=key, updateSummary=JobUpdateSummary( + metadata=metadata))) + return resp + + @classmethod def create_empty_task_result(cls): status_response = cls.create_simple_success_response() status_response.result = Result(scheduleStatusResult=ScheduleStatusResult(tasks=[]))
