Remove the client-side updater. Bugs closed: AURORA-785
Reviewed at https://reviews.apache.org/r/41368/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/3e1f8235 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/3e1f8235 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/3e1f8235 Branch: refs/heads/master Commit: 3e1f823597d45026814a649216ab3dfd5400a13a Parents: fb8155d Author: Bill Farner <[email protected]> Authored: Wed Dec 16 12:38:44 2015 -0800 Committer: Bill Farner <[email protected]> Committed: Wed Dec 16 12:38:44 2015 -0800 ---------------------------------------------------------------------- NEWS | 4 + docs/client-commands.md | 32 - docs/configuration-tutorial.md | 2 +- docs/hooks.md | 2 - docs/tutorial.md | 2 +- docs/user-guide.md | 2 +- .../python/apache/aurora/client/api/__init__.py | 24 - .../python/apache/aurora/client/api/updater.py | 720 --------------- src/main/python/apache/aurora/client/base.py | 19 - .../python/apache/aurora/client/cli/jobs.py | 115 --- .../apache/aurora/client/hooks/hooked_api.py | 14 - src/test/python/apache/aurora/client/api/BUILD | 12 - .../apache/aurora/client/api/test_updater.py | 899 ------------------- src/test/python/apache/aurora/client/cli/BUILD | 13 - .../aurora/client/cli/test_cancel_update.py | 62 -- .../apache/aurora/client/cli/test_update.py | 528 ----------- .../aurora/client/hooks/test_hooked_api.py | 5 +- .../aurora/client/hooks/test_non_hooked_api.py | 15 +- .../python/apache/aurora/client/test_base.py | 8 - .../sh/org/apache/aurora/e2e/test_end_to_end.sh | 8 - 20 files changed, 14 insertions(+), 2472 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/3e1f8235/NEWS ---------------------------------------------------------------------- diff --git a/NEWS b/NEWS index 99531a3..7a80f32 100644 --- a/NEWS +++ b/NEWS @@ -1,6 +1,10 @@ 0.11.0 ------ - Upgraded Mesos to 0.24.1. +- The client-side updater has been removed, along with the CLI commands that used it: + 'aurora job update' and 'aurora job cancel-update'. Users are encouraged to take + advantage of scheduler-driven updates (see 'aurora update -h' for usage), which has been a + stable feature for several releases. 0.10.0 ------ http://git-wip-us.apache.org/repos/asf/aurora/blob/3e1f8235/docs/client-commands.md ---------------------------------------------------------------------- diff --git a/docs/client-commands.md b/docs/client-commands.md index 9977b49..67faaa3 100644 --- a/docs/client-commands.md +++ b/docs/client-commands.md @@ -11,7 +11,6 @@ Aurora Client Commands - [Killing a Job](#killing-a-job) - [Updating a Job](#updating-a-job) - [Coordinated job updates](#user-content-coordinated-job-updates) - - [Client-orchestrated updates (deprecated)](#user-content-client-orchestrated-updates-deprecated) - [Renaming a Job](#renaming-a-job) - [Restarting Jobs](#restarting-jobs) - [Cron Jobs](#cron-jobs) @@ -123,11 +122,9 @@ the machine executing Aurora commands. Hooks can be associated with these Aurora Client commands. - - `job cancel-update` - `job create` - `job kill` - `job restart` - - `job update` The process for writing and activating them is complex enough that we explain it in a devoted document, [Hooks for Aurora Client API](hooks.md). @@ -213,35 +210,6 @@ progress until the first pulse arrives. However, a paused update (`ROLL_FORWARD_ `ROLL_BACK_PAUSED`) is still considered active and upon resuming will immediately make progress provided the pulse interval has not expired. -#### Client-orchestrated updates (deprecated) - -*Note: This feature is deprecated and will be removed in 0.9.0. -Please use aurora update instead.* - - aurora job update CLUSTER/ROLE/ENV/NAME[/INSTANCES] <configuration file> - aurora job cancel-update CLUSTER/ROLE/ENV/NAME - -Given a running job, does a rolling update to reflect a new -configuration version. Only updates Tasks in the Job with a changed -configuration. You can further restrict the operated on Tasks by specifying -specific instances that should be updated. - -You may want to run `aurora job diff` beforehand to validate which Tasks -have different configurations. - -Updating jobs are locked to be sure the update finishes without -disruption. If the update abnormally terminates, the lock may stay -around and cause failure of subsequent update attempts. - `aurora job cancel-update `unlocks the Job specified by -its `job_key` argument. Be sure you don't issue `job cancel-update` when -another user is working with the specified Job. - -The `<configuration file>` argument for `job cancel-update` is optional. Use -it only if it contains hook definitions and activations that affect the -`cancel_update` command. The `<configuration file>` argument for -`update` is required, but in addition to a new configuration it can be -used to define and activate hooks for `job update`. - ### Renaming a Job Renaming is a tricky operation as downstream clients must be informed of http://git-wip-us.apache.org/repos/asf/aurora/blob/3e1f8235/docs/configuration-tutorial.md ---------------------------------------------------------------------- diff --git a/docs/configuration-tutorial.md b/docs/configuration-tutorial.md index e665817..97664f3 100644 --- a/docs/configuration-tutorial.md +++ b/docs/configuration-tutorial.md @@ -4,7 +4,7 @@ Aurora Configuration Tutorial How to write Aurora configuration files, including feature descriptions and best practices. When writing a configuration file, make use of `aurora job inspect`. It takes the same job key and configuration file -arguments as `aurora job create` or `aurora job update`. It first ensures the +arguments as `aurora job create` or `aurora update start`. It first ensures the configuration parses, then outputs it in human-readable form. You should read this after going through the general [Aurora Tutorial](tutorial.md). http://git-wip-us.apache.org/repos/asf/aurora/blob/3e1f8235/docs/hooks.md ---------------------------------------------------------------------- diff --git a/docs/hooks.md b/docs/hooks.md index 63dbdb9..28307ab 100644 --- a/docs/hooks.md +++ b/docs/hooks.md @@ -86,10 +86,8 @@ You can associate `pre_`, `post_`, and `err_` hooks with the following methods. Aurora Client API Method | Client API Method Argument Signature | Aurora Command Line Command -------------------------| ------------------------------------- | --------------------------- - ```cancel_update``` | ```self```, ```job_key``` | ```job cancel-update``` ```create_job``` | ```self```, ```config``` | ```job create```, <code>runtask ```restart``` | ```self```, ```job_key```, ```shards```, ```update_config```, ```health_check_interval_seconds``` | ```job restart``` - ```update_job``` | ```self```, ```config```, ```health_check_interval_seconds=3```, ```shards=None``` | ```job update``` ```kill_job``` | ```self```, ```job_key```, ```shards=None``` | ```job kill``` ```start_cronjob``` | ```self```, ```job_key``` | ```cron start``` ```start_job_update``` | ```self```, ```config```, ```instances=None``` | ```update start``` http://git-wip-us.apache.org/repos/asf/aurora/blob/3e1f8235/docs/tutorial.md ---------------------------------------------------------------------- diff --git a/docs/tutorial.md b/docs/tutorial.md index 812a5cc..1bdc1ca 100644 --- a/docs/tutorial.md +++ b/docs/tutorial.md @@ -226,7 +226,7 @@ It looks like we made a typo in our Python script. We wanted `xrange`, not `xrang`. Edit the `hello_world.py` script to use the correct function and we will try again. - aurora job update devcluster/www-data/devel/hello_world /vagrant/hello_world.aurora + aurora update start devcluster/www-data/devel/hello_world /vagrant/hello_world.aurora This time, the task comes up, we inspect the page, and see that the `hello_world` process is running. http://git-wip-us.apache.org/repos/asf/aurora/blob/3e1f8235/docs/user-guide.md ---------------------------------------------------------------------- diff --git a/docs/user-guide.md b/docs/user-guide.md index e608500..1a78d30 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -45,7 +45,7 @@ request 1 core of cpu, 1 GB of RAM, and 1 GB of disk space as specified in the configuration file `hello_world.aurora`. I want to update it so it requests 2 GB of RAM instead of 1. I create a new configuration file to do that called `new_hello_world.aurora` and -issue a `aurora job update <job_key_value>/0-1 new_hello_world.aurora` +issue a `aurora update start <job_key_value>/0-1 new_hello_world.aurora` command. This results in instances 0 and 1 having 1 cpu, 2 GB of RAM, and 1 GB of disk space, http://git-wip-us.apache.org/repos/asf/aurora/blob/3e1f8235/src/main/python/apache/aurora/client/api/__init__.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/api/__init__.py b/src/main/python/apache/aurora/client/api/__init__.py index 1514394..a638158 100644 --- a/src/main/python/apache/aurora/client/api/__init__.py +++ b/src/main/python/apache/aurora/client/api/__init__.py @@ -16,14 +16,12 @@ from __future__ import print_function from twitter.common import log -from apache.aurora.client.base import combine_messages from apache.aurora.common.aurora_job_key import AuroraJobKey from apache.aurora.common.cluster import Cluster from .restarter import Restarter from .scheduler_client import SchedulerProxy from .sla import Sla -from .updater import Updater from .updater_util import UpdaterConfig from gen.apache.aurora.api.constants import LIVE_STATES @@ -34,7 +32,6 @@ from gen.apache.aurora.api.ttypes import ( JobUpdateRequest, Lock, ResourceAggregate, - ResponseCode, TaskQuery ) @@ -136,16 +133,6 @@ class AuroraClientAPI(object): except SchedulerProxy.ThriftInternalError as e: raise self.ThriftInternalError(e.args[0]) - def update_job(self, config, health_check_interval_seconds=3, instances=None): - """Run a job update for a given config, for the specified instances. If - instances is left unspecified, update all instances. Returns whether or not - the update was successful.""" - - log.info("Updating job: %s" % config.name()) - updater = Updater(config, health_check_interval_seconds, self._scheduler_proxy) - - return updater.update(instances) - def _job_update_request(self, config, instances=None): try: settings = UpdaterConfig(**config.update_config().get()).to_thrift_update_settings(instances) @@ -259,17 +246,6 @@ class AuroraClientAPI(object): % (key, JobUpdateKey.__name__, key.__class__.__name__)) return self._scheduler_proxy.getJobUpdateDetails(key) - def cancel_update(self, job_key): - """Cancel the update represented by job_key. Returns whether or not the cancellation was - successful.""" - self._assert_valid_job_key(job_key) - - log.info("Canceling update on job %s" % job_key) - resp = Updater.cancel_update(self._scheduler_proxy, job_key) - if resp.responseCode != ResponseCode.OK: - log.error('Error cancelling the update: %s' % combine_messages(resp)) - return resp - def restart(self, job_key, instances, updater_config, health_check_interval_seconds): """Perform a rolling restart of the job. http://git-wip-us.apache.org/repos/asf/aurora/blob/3e1f8235/src/main/python/apache/aurora/client/api/updater.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/api/updater.py b/src/main/python/apache/aurora/client/api/updater.py deleted file mode 100644 index acbce21..0000000 --- a/src/main/python/apache/aurora/client/api/updater.py +++ /dev/null @@ -1,720 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import json -import signal -from collections import namedtuple -from difflib import unified_diff -from threading import Lock as threading_lock -from threading import Event - -from thrift.protocol import TJSONProtocol -from thrift.TSerialization import serialize -from twitter.common import log -from twitter.common.quantity import Amount, Time - -from apache.aurora.client.base import combine_messages, format_response - -from .error_handling_thread import ExecutionError, spawn_worker -from .instance_watcher import InstanceWatcher -from .job_monitor import JobMonitor -from .quota_check import CapacityRequest, QuotaCheck -from .scheduler_client import SchedulerProxy -from .scheduler_mux import SchedulerMux -from .updater_util import FailureThreshold, UpdaterConfig - -from gen.apache.aurora.api.constants import ACTIVE_STATES -from gen.apache.aurora.api.ttypes import ( - AddInstancesConfig, - JobKey, - Lock, - LockKey, - LockValidation, - Response, - ResponseCode, - ResponseDetail, - TaskQuery -) - -try: - from Queue import Queue, Empty -except ImportError: - from queue import Queue, Empty - - -class Updater(object): - """Performs an update command using a collection of parallel threads. - The number of parallel threads used is determined by the UpdateConfig.batch_size.""" - - class Error(Exception): - """Updater error wrapper.""" - pass - - RPC_COMPLETION_TIMEOUT_SECS = Amount(120, Time.SECONDS) - - OPERATION_CONFIGS = namedtuple('OperationConfigs', ['from_config', 'to_config']) - INSTANCE_CONFIGS = namedtuple( - 'InstanceConfigs', - ['remote_config_map', 'local_config_map', 'instances_to_process'] - ) - - INSTANCE_DATA = namedtuple('InstanceData', ['instance_id', 'operation_configs']) - - def __init__(self, - config, - health_check_interval_seconds, - scheduler=None, - instance_watcher=None, - quota_check=None, - job_monitor=None, - scheduler_mux=None, - rpc_completion_timeout=RPC_COMPLETION_TIMEOUT_SECS): - self._config = config - self._job_key = JobKey(role=config.role(), environment=config.environment(), name=config.name()) - self._health_check_interval_seconds = health_check_interval_seconds - self._scheduler = scheduler or SchedulerProxy(config.cluster()) - self._quota_check = quota_check or QuotaCheck(self._scheduler) - self._scheduler_mux = scheduler_mux or SchedulerMux() - self._job_monitor = job_monitor or JobMonitor( - self._scheduler, - self._config.job_key(), - scheduler_mux=self._scheduler_mux) - self._rpc_completion_timeout = rpc_completion_timeout - try: - self._update_config = UpdaterConfig(**config.update_config().get()) - except ValueError as e: - raise self.Error(str(e)) - if self._update_config.pulse_interval_secs: - raise self.Error('Pulse interval seconds is not supported by the client updater.') - self._lock = None - self._thread_lock = threading_lock() - self._batch_wait_event = Event() - self._batch_completion_queue = Queue() - self.failure_threshold = FailureThreshold( - self._update_config.max_per_instance_failures, - self._update_config.max_total_failures - ) - self._watcher = instance_watcher or InstanceWatcher( - self._scheduler, - self._job_key, - self._update_config.restart_threshold, - self._update_config.watch_secs, - self._health_check_interval_seconds, - scheduler_mux=self._scheduler_mux) - self._terminating = False - - def _start(self): - """Starts an update by applying an exclusive lock on a job being updated. - - Returns Response instance from the scheduler call. - """ - resp = self._scheduler.acquireLock(LockKey(job=self._job_key)) - if resp.responseCode == ResponseCode.OK: - self._lock = resp.result.acquireLockResult.lock - return resp - - def _finish(self): - """Finishes an update by removing an exclusive lock on an updated job. - - Returns Response instance from the scheduler call. - """ - resp = self._scheduler.releaseLock(self._lock, LockValidation.CHECKED) - - if resp.responseCode == ResponseCode.OK: - self._lock = None - else: - log.error('There was an error finalizing the update: %s' % combine_messages(resp)) - return resp - - def int_handler(self, *args): - """Ensures keyboard interrupt exception is raised on a main thread.""" - raise KeyboardInterrupt() - - def _update(self, instance_configs): - """Drives execution of the update logic. - - Performs instance updates in parallel using a number of threads bound by - the batch_size config option. - - Arguments: - instance_configs -- list of instance update configurations to go through. - - Returns the set of instances that failed to update. - """ - # Register signal handler to ensure KeyboardInterrupt is received by a main thread. - signal.signal(signal.SIGINT, self.int_handler) - - instances_to_update = [ - self.INSTANCE_DATA( - instance_id, - self.OPERATION_CONFIGS( - from_config=instance_configs.remote_config_map, - to_config=instance_configs.local_config_map)) - for instance_id in instance_configs.instances_to_process - ] - - log.info('Instances to update: %s' % instance_configs.instances_to_process) - update_queue = self._update_instances_in_parallel(self._update_instance, instances_to_update) - - if self._is_failed_update(quiet=False): - if not self._update_config.rollback_on_failure: - log.info('Rollback on failure is disabled in config. Aborting rollback') - return - - rollback_ids = self._get_rollback_ids(instance_configs.instances_to_process, update_queue) - instances_to_revert = [ - self.INSTANCE_DATA( - instance_id, - self.OPERATION_CONFIGS( - from_config=instance_configs.local_config_map, - to_config=instance_configs.remote_config_map)) - for instance_id in rollback_ids - ] - - log.info('Reverting update for: %s' % rollback_ids) - self._update_instances_in_parallel(self._revert_instance, instances_to_revert) - - return not self._is_failed_update() - - def _update_instances_in_parallel(self, target, instances_to_update): - """Processes instance updates in parallel and waits for completion. - - Arguments: - target -- target method to handle instance update. - instances_to_update -- list of InstanceData with update details. - - Returns Queue with non-updated instance data. - """ - log.info('Processing in parallel with %s worker thread(s)' % self._update_config.batch_size) - instance_queue = Queue() - for instance_to_update in instances_to_update: - instance_queue.put(instance_to_update) - - try: - threads = [] - for _ in range(self._update_config.batch_size): - threads.append(spawn_worker(target, kwargs={'instance_queue': instance_queue})) - - for thread in threads: - thread.join_and_raise() - except Exception as e: - log.debug('Caught unhandled exception: %s' % e) - self._terminate() - raise - - return instance_queue - - def _try_reset_batch_wait_event(self, instance_id, instance_queue): - """Resets batch_wait_event in case the current batch is filled up. - - This is a helper method that separates thread locked logic. Called from - _wait_for_batch_completion_if_needed() when a given instance update completes. - Resumes worker threads if all batch instances are updated. - - Arguments: - instance_id -- Instance ID being processed. - instance_queue -- Instance update work queue. - """ - with self._thread_lock: - log.debug("Instance ID %s: Completion queue size %s" % - (instance_id, self._batch_completion_queue.qsize())) - log.debug("Instance ID %s: Instance queue size %s" % - (instance_id, instance_queue.qsize())) - self._batch_completion_queue.put(instance_id) - filled_up = self._batch_completion_queue.qsize() % self._update_config.batch_size == 0 - all_done = instance_queue.qsize() == 0 - if filled_up or all_done: - # Required batch size of completed instances has filled up -> unlock waiting threads. - log.debug('Instance %s completes the batch wait.' % instance_id) - self._batch_wait_event.set() - self._batch_wait_event.clear() - return True - - return False - - def _wait_for_batch_completion_if_needed(self, instance_id, instance_queue): - """Waits for batch completion if wait_for_batch_completion flag is set. - - Arguments: - instance_id -- Instance ID. - instance_queue -- Instance update work queue. - """ - if not self._update_config.wait_for_batch_completion: - return - - if not self._try_reset_batch_wait_event(instance_id, instance_queue): - # The current batch has not filled up -> block the work thread. - log.debug('Instance %s is done. Waiting for batch to complete.' % instance_id) - self._batch_wait_event.wait() - - def _terminate(self): - """Attempts to terminate all outstanding activities.""" - if not self._terminating: - log.info('Cleaning up') - self._terminating = True - self._scheduler.terminate() - self._job_monitor.terminate() - self._scheduler_mux.terminate() - self._watcher.terminate() - self._batch_wait_event.set() - - def _update_instance(self, instance_queue): - """Works through the instance_queue and performs instance updates (one at a time). - - Arguments: - instance_queue -- Queue of InstanceData to update. - """ - while not self._terminating and not self._is_failed_update(): - try: - instance_data = instance_queue.get_nowait() - except Empty: - return - - update = True - restart = False - while update or restart and not self._terminating and not self._is_failed_update(): - instances_to_watch = [] - if update: - instances_to_watch += self._kill_and_add_instance(instance_data) - update = False - else: - instances_to_watch += self._request_restart_instance(instance_data) - - if instances_to_watch: - failed_instances = self._watcher.watch(instances_to_watch) - restart = self._is_restart_needed(failed_instances) - - self._wait_for_batch_completion_if_needed(instance_data.instance_id, instance_queue) - - def _revert_instance(self, instance_queue): - """Works through the instance_queue and performs instance rollbacks (one at a time). - - Arguments: - instance_queue -- Queue of InstanceData to revert. - """ - while not self._terminating: - try: - instance_data = instance_queue.get_nowait() - except Empty: - return - - log.info('Reverting instance: %s' % instance_data.instance_id) - instances_to_watch = self._kill_and_add_instance(instance_data) - if instances_to_watch and self._watcher.watch(instances_to_watch): - log.error('Rollback failed for instance: %s' % instance_data.instance_id) - - def _kill_and_add_instance(self, instance_data): - """Acquires update instructions and performs required kill/add/kill+add sequence. - - Arguments: - instance_data -- InstanceData to update. - - Returns added instance ID. - """ - log.info('Examining instance: %s' % instance_data.instance_id) - to_kill, to_add = self._create_kill_add_lists( - [instance_data.instance_id], - instance_data.operation_configs) - if not to_kill and not to_add: - log.info('Skipping unchanged instance: %s' % instance_data.instance_id) - return to_add - - if to_kill: - self._request_kill_instance(instance_data) - if to_add: - self._request_add_instance(instance_data) - - return to_add - - def _request_kill_instance(self, instance_data): - """Instructs the scheduler to kill instance and waits for completion. - - Arguments: - instance_data -- InstanceData to kill. - """ - log.info('Killing instance: %s' % instance_data.instance_id) - self._enqueue_and_wait(instance_data, self._kill_instances) - result = self._job_monitor.wait_until( - JobMonitor.terminal, - [instance_data.instance_id], - with_timeout=True) - - if not result: - raise self.Error('Instance %s was not killed in time' % instance_data.instance_id) - log.info('Killed: %s' % instance_data.instance_id) - - def _request_add_instance(self, instance_data): - """Instructs the scheduler to add instance. - - Arguments: - instance_data -- InstanceData to add. - """ - log.info('Adding instance: %s' % instance_data.instance_id) - self._enqueue_and_wait(instance_data, self._add_instances) - log.info('Added: %s' % instance_data.instance_id) - - def _request_restart_instance(self, instance_data): - """Instructs the scheduler to restart instance. - - Arguments: - instance_data -- InstanceData to restart. - - Returns restarted instance ID. - """ - log.info('Restarting instance: %s' % instance_data.instance_id) - self._enqueue_and_wait(instance_data, self._restart_instances) - log.info('Restarted: %s' % instance_data.instance_id) - return [instance_data.instance_id] - - def _enqueue_and_wait(self, instance_data, command): - """Queues up the scheduler call and waits for completion. - - Arguments: - instance_data -- InstanceData to query scheduler for. - command -- scheduler command to run. - """ - try: - self._scheduler_mux.enqueue_and_wait( - command, - instance_data, - timeout=self._rpc_completion_timeout) - except SchedulerMux.Error as e: - raise self.Error('Failed to complete instance %s operation. Reason: %s' - % (instance_data.instance_id, e)) - - def _is_failed_update(self, quiet=True): - """Verifies the update status in a thread-safe manner. - - Arguments: - quiet -- Whether the logging should be suppressed in case of a failed update. Default True. - - Returns True if update failed, False otherwise. - """ - with self._thread_lock: - return self.failure_threshold.is_failed_update(log_errors=not quiet) - - def _is_restart_needed(self, failed_instances): - """Checks if there are any failed instances recoverable via restart. - - Arguments: - failed_instances -- Failed instance IDs. - - Returns True if restart is allowed, False otherwise (i.e. update failed). - """ - if not failed_instances: - return False - - log.info('Failed instances: %s' % failed_instances) - - with self._thread_lock: - unretryable_instances = self.failure_threshold.update_failure_counts(failed_instances) - if unretryable_instances: - log.warn('Not restarting failed instances %s, which exceeded ' - 'maximum allowed instance failure limit of %s' % - (unretryable_instances, self._update_config.max_per_instance_failures)) - return False if unretryable_instances else True - - def _get_rollback_ids(self, update_list, update_queue): - """Gets a list of instance ids to rollback. - - Arguments: - update_list -- original list of instances intended for update. - update_queue -- untouched instances not processed during update. - - Returns sorted list of instance IDs to rollback. - """ - untouched_ids = [] - while not update_queue.empty(): - untouched_ids.append(update_queue.get_nowait().instance_id) - - return sorted(list(set(update_list) - set(untouched_ids)), reverse=True) - - def _hashable(self, element): - if isinstance(element, (list, set)): - return tuple(sorted(self._hashable(item) for item in element)) - elif isinstance(element, dict): - return tuple( - sorted((self._hashable(key), self._hashable(value)) for (key, value) in element.items()) - ) - return element - - def _thrift_to_json(self, config): - return json.loads( - serialize(config, protocol_factory=TJSONProtocol.TSimpleJSONProtocolFactory())) - - def _diff_configs(self, from_config, to_config): - # Thrift objects do not correctly compare against each other due to the unhashable nature - # of python sets. That results in occasional diff failures with the following symptoms: - # - Sets are not equal even though their reprs are identical; - # - Items are reordered within thrift structs; - # - Items are reordered within sets; - # To overcome all the above, thrift objects are converted into JSON dicts to flatten out - # thrift type hierarchy. Next, JSONs are recursively converted into nested tuples to - # ensure proper ordering on compare. - return ''.join(unified_diff(repr(self._hashable(self._thrift_to_json(from_config))), - repr(self._hashable(self._thrift_to_json(to_config))))) - - def _create_kill_add_lists(self, instance_ids, operation_configs): - """Determines a particular action (kill or add) to use for every instance in instance_ids. - - Arguments: - instance_ids -- current batch of IDs to process. - operation_configs -- OperationConfigs with update details. - - Returns lists of instances to kill and to add. - """ - to_kill = [] - to_add = [] - for instance_id in instance_ids: - from_config = operation_configs.from_config.get(instance_id) - to_config = operation_configs.to_config.get(instance_id) - - if from_config and to_config: - diff_output = self._diff_configs(from_config, to_config) - if diff_output: - log.debug('Task configuration changed for instance [%s]:\n%s' - % (instance_id, diff_output)) - to_kill.append(instance_id) - to_add.append(instance_id) - elif from_config and not to_config: - to_kill.append(instance_id) - elif not from_config and to_config: - to_add.append(instance_id) - else: - raise self.Error('Instance %s is outside of supported range' % instance_id) - - return to_kill, to_add - - def _kill_instances(self, instance_data): - """Instructs the scheduler to batch-kill instances and waits for completion. - - Arguments: - instance_data -- list of InstanceData to kill. - """ - instance_ids = [data.instance_id for data in instance_data] - log.debug('Batch killing instances: %s' % instance_ids) - query = self._create_task_query(instanceIds=frozenset(int(s) for s in instance_ids)) - self._check_and_log_response(self._scheduler.killTasks(query, self._lock)) - log.debug('Done batch killing instances: %s' % instance_ids) - - def _add_instances(self, instance_data): - """Instructs the scheduler to batch-add instances. - - Arguments: - instance_data -- list of InstanceData to add. - """ - instance_ids = [data.instance_id for data in instance_data] - to_config = instance_data[0].operation_configs.to_config - - log.debug('Batch adding instances: %s' % instance_ids) - add_config = AddInstancesConfig( - key=self._job_key, - taskConfig=to_config[instance_ids[0]], # instance_ids will always have at least 1 item. - instanceIds=frozenset(int(s) for s in instance_ids)) - self._check_and_log_response(self._scheduler.addInstances(add_config, self._lock)) - log.debug('Done batch adding instances: %s' % instance_ids) - - def _restart_instances(self, instance_data): - """Instructs the scheduler to batch-restart instances. - - Arguments: - instance_data -- list of InstanceData to restart. - """ - instance_ids = [data.instance_id for data in instance_data] - log.debug('Batch restarting instances: %s' % instance_ids) - resp = self._scheduler.restartShards(self._job_key, instance_ids, self._lock) - self._check_and_log_response(resp) - log.debug('Done batch restarting instances: %s' % instance_ids) - - def _validate_quota(self, instance_configs): - """Validates job update will not exceed quota for production tasks. - Arguments: - instance_configs -- InstanceConfig with update details. - - Returns Response.OK if quota check was successful. - """ - instance_operation = self.OPERATION_CONFIGS( - from_config=instance_configs.remote_config_map, - to_config=instance_configs.local_config_map - ) - - def _aggregate_quota(ops_list, config_map): - request = CapacityRequest() - for instance in ops_list: - task = config_map[instance] - if task.production: - request += CapacityRequest.from_task(task) - - return request - - to_kill, to_add = self._create_kill_add_lists( - instance_configs.instances_to_process, - instance_operation) - - return self._quota_check.validate_quota_from_requested( - self._job_key, - self._config.job().taskConfig.production, - _aggregate_quota(to_kill, instance_operation.from_config), - _aggregate_quota(to_add, instance_operation.to_config)) - - def _get_update_instructions(self, instances=None): - """Loads, validates and populates update working set. - - Arguments: - instances -- (optional) set of instances to update. - - Returns: - InstanceConfigs with the following data: - remote_config_map -- dictionary of {key:instance_id, value:task_config} from scheduler. - local_config_map -- dictionary of {key:instance_id, value:task_config} with local - task configs validated and populated with default values. - instances_to_process -- list of instance IDs to go over in update. - """ - # Load existing tasks and populate remote config map and instance list. - assigned_tasks = self._get_existing_tasks() - remote_config_map = {} - remote_instances = [] - for assigned_task in assigned_tasks: - remote_config_map[assigned_task.instanceId] = assigned_task.task - remote_instances.append(assigned_task.instanceId) - - # Validate local job config and populate local task config. - local_task_config = self._validate_and_populate_local_config() - - # Union of local and remote instance IDs. - job_config_instances = list(range(self._config.instances())) - instance_superset = sorted(list(set(remote_instances) | set(job_config_instances))) - - # Calculate the update working set. - if instances is None: - # Full job update -> union of remote and local instances - instances_to_process = instance_superset - else: - # Partial job update -> validate all instances are recognized - instances_to_process = instances - unrecognized = list(set(instances) - set(instance_superset)) - if unrecognized: - raise self.Error('Instances %s are outside of supported range' % unrecognized) - - # Populate local config map - local_config_map = dict.fromkeys(job_config_instances, local_task_config) - - return self.INSTANCE_CONFIGS(remote_config_map, local_config_map, instances_to_process) - - def _get_existing_tasks(self): - """Loads all existing tasks from the scheduler. - - Returns a list of AssignedTasks. - """ - resp = self._scheduler.getTasksStatus(self._create_task_query()) - self._check_and_log_response(resp) - return [t.assignedTask for t in resp.result.scheduleStatusResult.tasks] - - def _validate_and_populate_local_config(self): - """Validates local job configuration and populates local task config with default values. - - Returns a TaskConfig populated with default values. - """ - resp = self._scheduler.populateJobConfig(self._config.job()) - self._check_and_log_response(resp) - return resp.result.populateJobResult.taskConfig - - def _replace_template_if_cron(self): - """Checks if the provided job config represents a cron job and if so, replaces it. - - Returns True if job is cron and False otherwise. - """ - if self._config.job().cronSchedule: - resp = self._scheduler.replaceCronTemplate(self._config.job(), self._lock) - self._check_and_log_response(resp) - return True - else: - return False - - def _create_task_query(self, instanceIds=None): - return TaskQuery(jobKeys=[self._job_key], statuses=ACTIVE_STATES, instanceIds=instanceIds) - - def _failed_response(self, message): - # TODO(wfarner): Avoid synthesizing scheduler responses, consider using an exception instead. - return Response(responseCode=ResponseCode.ERROR, details=[ResponseDetail(message=message)]) - - def update(self, instances=None): - """Performs the job update, blocking until it completes. - - A rollback will be performed if the update was considered a failure based on the - update configuration. - - Arguments: - instances -- (optional) instances to update. If not specified, all instances will be updated. - - Returns a response object with update result status. - """ - try: - resp = self._start() - if resp.responseCode != ResponseCode.OK: - return resp - - try: - # Handle cron jobs separately from other jobs. - if self._replace_template_if_cron(): - log.info('Cron template updated, next run will reflect changes') - return self._finish() - else: - try: - instance_configs = self._get_update_instructions(instances) - self._check_and_log_response(self._validate_quota(instance_configs)) - except self.Error as e: - # Safe to release the lock acquired above as no job mutation has happened yet. - self._finish() - return self._failed_response('Unable to start job update: %s' % e) - - if not self._update(instance_configs): - log.warn('Update failures threshold reached') - self._finish() - return self._failed_response('Update reverted') - else: - log.info('Update successful') - return self._finish() - except (self.Error, ExecutionError, Exception) as e: - return self._failed_response('Aborting update without rollback! Fatal error: %s' % e) - finally: - self._scheduler_mux.terminate() - - @classmethod - def cancel_update(cls, scheduler, job_key): - """Cancels an update process by removing an exclusive lock on a provided job. - - Arguments: - scheduler -- scheduler instance to use. - job_key -- job key to cancel update for. - - Returns a response object with cancel update result status. - """ - return scheduler.releaseLock( - Lock(key=LockKey(job=job_key.to_thrift())), - LockValidation.UNCHECKED) - - def _check_and_log_response(self, resp): - """Checks scheduler return status, raises Error in case of unexpected response. - - Arguments: - resp -- scheduler response object. - - Raises Error in case of unexpected response status. - """ - message = format_response(resp) - if resp.responseCode == ResponseCode.OK: - log.debug(message) - else: - raise self.Error(message) http://git-wip-us.apache.org/repos/asf/aurora/blob/3e1f8235/src/main/python/apache/aurora/client/base.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/base.py b/src/main/python/apache/aurora/client/base.py index 91c276b..901335b 100644 --- a/src/main/python/apache/aurora/client/base.py +++ b/src/main/python/apache/aurora/client/base.py @@ -25,18 +25,6 @@ from apache.aurora.common.pex_version import UnknownVersion, pex_version from gen.apache.aurora.api.ttypes import ResponseCode -LOCKED_WARNING = """ -Note: if the scheduler detects that a job update is in progress (or was not -properly completed) it will reject subsequent updates. This is because your -job is likely in a partially-updated state. You should only begin another -update if you are confident that nobody is updating this job, and that -the job is in a state suitable for an update. - -After checking on the above, you may release the update lock on the job by -invoking cancel_update. -""" - - def die(msg): log.fatal(msg) sys.exit(1) @@ -58,16 +46,9 @@ def format_response(resp): def check_and_log_response(resp): log.info(format_response(resp)) if resp.responseCode != ResponseCode.OK: - if resp.responseCode == ResponseCode.LOCK_ERROR: - log.info(LOCKED_WARNING) sys.exit(1) -def check_and_log_locked_response(resp): - if resp.responseCode == ResponseCode.LOCK_ERROR: - log.info(LOCKED_WARNING) - - class requires(object): # noqa @classmethod def wrap_function(cls, fn, fnargs, comparator): http://git-wip-us.apache.org/repos/asf/aurora/blob/3e1f8235/src/main/python/apache/aurora/client/cli/jobs.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/cli/jobs.py b/src/main/python/apache/aurora/client/cli/jobs.py index 4694849..ccc52c8 100644 --- a/src/main/python/apache/aurora/client/cli/jobs.py +++ b/src/main/python/apache/aurora/client/cli/jobs.py @@ -20,7 +20,6 @@ import os import pprint import subprocess import textwrap -import time import webbrowser from collections import namedtuple from copy import deepcopy @@ -752,118 +751,6 @@ class StatusCommand(Verb): return self._print_jobs_not_found(context) -CLIENT_UPDATER_DEPRECATION = """\ -WARNING: The command you are using is deprecated, and will be removed in -Aurora 0.9.0. -Please see the new commands at 'aurora update -h'.""" - - -class CancelUpdateCommand(Verb): - @property - def name(self): - return "cancel-update" - - @property - def help(self): - return ("%s\n\nCancel an in-progress update operation, releasing the update lock" - % CLIENT_UPDATER_DEPRECATION) - - def get_options(self): - return [JSON_READ_OPTION, - BIND_OPTION, - CONFIG_OPTION, - JOBSPEC_ARGUMENT] - - def execute(self, context): - context.print_err(CLIENT_UPDATER_DEPRECATION) - api = context.get_api(context.options.jobspec.cluster) - config = context.get_job_config_optional(context.options.jobspec, context.options.config) - resp = api.cancel_update(context.options.jobspec, config=config) - context.log_response_and_raise(resp) - return EXIT_OK - - -class UpdateCommand(Verb): - @property - def name(self): - return "update" - - def get_options(self): - return [FORCE_OPTION, BIND_OPTION, JSON_READ_OPTION, HEALTHCHECK_OPTION, - INSTANCES_SPEC_ARGUMENT, STRICT_OPTION, CONFIG_ARGUMENT] - - @property - def help(self): - return textwrap.dedent("""\ - %s - - Perform a rolling upgrade on a running job, using the update configuration - within the config file as a control for update velocity and failure tolerance. - - Updates are fully controlled client-side, so aborting an update halts the - update and leaves the job in a 'locked' state on the scheduler. - Subsequent update attempts will fail until the update is 'unlocked' using the - 'cancel_update' command. - - The updater only takes action on instances in a job that have changed, meaning - that changing a single instance will only induce a restart on the changed task instance. - - You may want to consider using the 'diff' subcommand before updating, - to preview what changes will take effect. - """ % CLIENT_UPDATER_DEPRECATION) - - def warn_if_dangerous_change(self, context, api, job_spec, config): - # Get the current job status, so that we can check if there's anything - # dangerous about this update. - resp = api.query_no_configs(api.build_query(config.role(), config.name(), - env=config.environment(), statuses=ACTIVE_STATES)) - context.log_response_and_raise(resp, err_msg="Server could not find running job to update") - remote_tasks = [t.assignedTask.task for t in resp.result.scheduleStatusResult.tasks] - # for determining if an update is dangerous, we estimate the scope of the change - # by comparing number of instances to be updated, with the number of - # instances running in the cluster. - # If the user passed an instance count, then we select the *smaller* of the - # number of instances being updated, and the total number running on the server. - # So updating 20 instances out of 500 isn't a large change: even though 20 < 500/4; - # but updating 20 instances when there are only 4 running is a large change. - if context.options.instance_spec.instance == ALL_INSTANCES: - local_task_count = config.instances() - remote_task_count = len(remote_tasks) - else: - local_task_count = len(context.options.instance_spec.instance) - remote_task_count = min(len(remote_tasks), local_task_count) - - # Dangerous if it's more than a factor-of-four change in number of instances. - if (local_task_count >= 4 * remote_task_count or - 4 * local_task_count <= remote_task_count or - local_task_count == 0): - context.print_out("Warning: this update is a large change. ") - context.print_out("Press ^c within five seconds to abort.") - time.sleep(5) - - def execute(self, context): - context.print_err(CLIENT_UPDATER_DEPRECATION) - job = context.options.instance_spec.jobkey - instances = (None if context.options.instance_spec.instance == ALL_INSTANCES else - context.options.instance_spec.instance) - config = context.get_job_config(job, context.options.config_file) - - if not config.job().taskConfig.isService and not config.job().cronSchedule: - raise context.CommandError( - EXIT_COMMAND_FAILURE, - "Only service and cron jobs may be updated this way, " - "please kill and re-create your job instead.") - - api = context.get_api(config.cluster()) - if not context.options.force: - self.warn_if_dangerous_change(context, api, job, config) - resp = api.update_job(config, context.options.healthcheck_interval_seconds, instances) - context.log_response_and_raise(resp, err_code=EXIT_COMMAND_FAILURE, - err_msg="Update failed due to error:") - context.print_out("Update completed successfully") - return EXIT_OK - - class Job(Noun): @property def name(self): @@ -879,7 +766,6 @@ class Job(Noun): def __init__(self): super(Job, self).__init__() - self.register_verb(CancelUpdateCommand()) self.register_verb(CreateJobCommand()) self.register_verb(DiffCommand()) self.register_verb(InspectCommand()) @@ -889,4 +775,3 @@ class Job(Noun): self.register_verb(OpenCommand()) self.register_verb(RestartCommand()) self.register_verb(StatusCommand()) - self.register_verb(UpdateCommand()) http://git-wip-us.apache.org/repos/asf/aurora/blob/3e1f8235/src/main/python/apache/aurora/client/hooks/hooked_api.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/hooks/hooked_api.py b/src/main/python/apache/aurora/client/hooks/hooked_api.py index 7fc0b71..6410908 100644 --- a/src/main/python/apache/aurora/client/hooks/hooked_api.py +++ b/src/main/python/apache/aurora/client/hooks/hooked_api.py @@ -49,9 +49,6 @@ class NonHookedAuroraClientAPI(AuroraClientAPI): * is thus available to API methods in subclasses """ - def cancel_update(self, job_key, config=None): - return super(NonHookedAuroraClientAPI, self).cancel_update(job_key) - def kill_job(self, job_key, instances=None, lock=None, config=None): return super(NonHookedAuroraClientAPI, self).kill_job(job_key, instances=instances, lock=lock) @@ -158,11 +155,6 @@ class HookedAuroraClientAPI(NonHookedAuroraClientAPI): return self._hooked_call(config, None, _partial(super(HookedAuroraClientAPI, self).create_job, config, lock)) - def cancel_update(self, job_key, config=None): - return self._hooked_call(config, job_key, - _partial(super(HookedAuroraClientAPI, self).cancel_update, - job_key, config=config)) - def kill_job(self, job_key, instances=None, lock=None, config=None): return self._hooked_call(config, job_key, _partial(super(HookedAuroraClientAPI, self).kill_job, @@ -178,12 +170,6 @@ class HookedAuroraClientAPI(NonHookedAuroraClientAPI): _partial(super(HookedAuroraClientAPI, self).start_cronjob, job_key, config=config)) - def update_job(self, config, health_check_interval_seconds=3, instances=None): - return self._hooked_call(config, None, - _partial(super(HookedAuroraClientAPI, self).update_job, - config, health_check_interval_seconds=health_check_interval_seconds, - instances=instances)) - def start_job_update(self, config, message, instances=None): return self._hooked_call(config, None, _partial(super(HookedAuroraClientAPI, self).start_job_update, http://git-wip-us.apache.org/repos/asf/aurora/blob/3e1f8235/src/test/python/apache/aurora/client/api/BUILD ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/api/BUILD b/src/test/python/apache/aurora/client/api/BUILD index 2756912..2a55cec 100644 --- a/src/test/python/apache/aurora/client/api/BUILD +++ b/src/test/python/apache/aurora/client/api/BUILD @@ -23,7 +23,6 @@ target(name = 'all', ':scheduler_client', ':sla', ':task_util', - ':updater', ':updater_util' ], ) @@ -111,17 +110,6 @@ python_tests(name = 'task_util', ] ) -python_tests(name = 'updater', - sources = ['test_updater.py'], - dependencies = [ - '3rdparty/python:mox', - 'src/main/python/apache/aurora/common', - 'src/main/python/apache/aurora/client', - 'api/src/main/thrift/org/apache/aurora/gen', - 'src/test/python/apache/aurora/client:fake_scheduler_proxy', - ] -) - python_tests(name = 'updater_util', sources = ['test_updater_util.py'], dependencies = [ http://git-wip-us.apache.org/repos/asf/aurora/blob/3e1f8235/src/test/python/apache/aurora/client/api/test_updater.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/api/test_updater.py b/src/test/python/apache/aurora/client/api/test_updater.py deleted file mode 100644 index 5f54d22..0000000 --- a/src/test/python/apache/aurora/client/api/test_updater.py +++ /dev/null @@ -1,899 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from copy import deepcopy -from os import environ -from unittest import TestCase - -from mox import MockObject, Replay, Verify -from pytest import raises - -from apache.aurora.client.api.instance_watcher import InstanceWatcher -from apache.aurora.client.api.job_monitor import JobMonitor -from apache.aurora.client.api.quota_check import CapacityRequest, QuotaCheck -from apache.aurora.client.api.scheduler_mux import SchedulerMux -from apache.aurora.client.api.updater import Updater -from apache.aurora.common.aurora_job_key import AuroraJobKey -from apache.aurora.common.cluster import Cluster - -from ..fake_scheduler_proxy import FakeSchedulerProxy - -from gen.apache.aurora.api.AuroraSchedulerManager import Client as scheduler_client -from gen.apache.aurora.api.constants import ACTIVE_STATES, THRIFT_API_VERSION -from gen.apache.aurora.api.ttypes import ( - AcquireLockResult, - AddInstancesConfig, - AssignedTask, - Constraint, - ExecutorConfig, - Identity, - JobConfiguration, - JobKey, - LimitConstraint, - LockKey, - LockValidation, - Metadata, - PopulateJobResult, - ResourceAggregate, - Response, - ResponseCode, - ResponseDetail, - Result, - ScheduledTask, - ScheduleStatusResult, - ServerInfo, - TaskConfig, - TaskConstraint, - TaskQuery, - ValueConstraint -) - -# Debug output helper -> enables log.* in source. -if 'UPDATER_DEBUG' in environ: - from twitter.common import log - from twitter.common.log.options import LogOptions - LogOptions.set_disk_log_level('NONE') - LogOptions.set_stderr_log_level('DEBUG') - log.init('test_updater') - -SERVER_INFO = ServerInfo(thriftAPIVersion=THRIFT_API_VERSION) - - -def make_response(code, msg='test'): - return Response( - responseCode=code, - serverInfo=SERVER_INFO, - details=[ResponseDetail(message=msg)]) - - -class FakeConfig(object): - def __init__(self, role, name, env, update_config): - self._role = role - self._env = env - self._name = name - self._update_config = update_config - self.job_config = None - - def role(self): - return self._role - - def name(self): - return self._name - - def update_config(self): - class Anon(object): - def get(_): - return self._update_config - return Anon() - - def has_health_port(self): - return False - - def cluster(self): - return 'test' - - def environment(self): - return self._env - - def job(self): - return self.job_config - - def job_key(self): - return AuroraJobKey(self.cluster(), self.role(), self.environment(), self.name()) - - def instances(self): - return self.job_config.instanceCount - - -class FakeSchedulerMux(object): - def __init__(self): - self._raise_error = False - - def enqueue_and_wait(self, command, data, timeout=None): - command([data]) - if self._raise_error: - raise SchedulerMux.Error("expected") - - def terminate(self): - pass - - def raise_error(self): - self._raise_error = True - - -class UpdaterTest(TestCase): - UPDATE_CONFIG = { - 'batch_size': 1, - 'restart_threshold': 50, - 'watch_secs': 50, - 'max_per_shard_failures': 0, - 'max_total_failures': 0, - 'rollback_on_failure': True, - 'wait_for_batch_completion': False, - } - - def setUp(self): - self._role = 'mesos' - self._name = 'jimbob' - self._env = 'test' - self._job_key = JobKey(name=self._name, environment=self._env, role=self._role) - self._lock = 'test_lock' - self._instance_watcher = MockObject(InstanceWatcher) - self._job_monitor = MockObject(JobMonitor) - self._scheduler_mux = FakeSchedulerMux() - self._scheduler = MockObject(scheduler_client) - self._scheduler_proxy = FakeSchedulerProxy(Cluster(name='test-cluster'), self._scheduler) - self._quota_check = MockObject(QuotaCheck) - self.init_updater(deepcopy(self.UPDATE_CONFIG)) - self._num_cpus = 1.0 - self._num_ram = 1 - self._num_disk = 1 - - def replay_mocks(self): - Replay(self._scheduler) - Replay(self._instance_watcher) - Replay(self._quota_check) - Replay(self._job_monitor) - - def verify_mocks(self): - Verify(self._scheduler) - Verify(self._instance_watcher) - Verify(self._quota_check) - Verify(self._job_monitor) - - def init_updater(self, update_config): - self._config = FakeConfig(self._role, self._name, self._env, update_config) - self._updater = Updater( - self._config, - 3, - self._scheduler_proxy, - self._instance_watcher, - self._quota_check, - self._job_monitor, - self._scheduler_mux) - - def expect_terminate(self): - self._job_monitor.terminate() - self._instance_watcher.terminate() - - def expect_watch_instances(self, instance_ids, failed_instances=[]): - for i in instance_ids: - failed = [i] if i in failed_instances else [] - self._instance_watcher.watch(instance_ids).InAnyOrder().AndReturn(set(failed)) - - def expect_populate(self, job_config, response_code=ResponseCode.OK): - resp = make_response(response_code) - config = deepcopy(job_config.taskConfig) - resp.result = Result(populateJobResult=PopulateJobResult(taskConfig=config)) - - self._scheduler.populateJobConfig(job_config).AndReturn(resp) - - def expect_get_tasks(self, tasks, ignore_ids=None, response_code=ResponseCode.OK): - scheduled = [] - for index, task in enumerate(tasks): - if not ignore_ids or index not in ignore_ids: - scheduled.append(ScheduledTask(assignedTask=AssignedTask(task=task, instanceId=index))) - response = make_response(response_code) - response.result = Result(scheduleStatusResult=ScheduleStatusResult(tasks=scheduled)) - query = TaskQuery(jobKeys=[self._job_key], statuses=ACTIVE_STATES) - self._scheduler.getTasksStatus(query).AndReturn(response) - - def expect_cron_replace(self, job_config, response_code=ResponseCode.OK): - resp = make_response(response_code) - self._scheduler.replaceCronTemplate(job_config, self._lock).AndReturn(resp) - - def expect_restart(self, instance_ids, response_code=None): - for i in instance_ids: - response_code = ResponseCode.OK if response_code is None else response_code - response = make_response(response_code) - self._scheduler.restartShards( - self._job_key, - [i], - self._lock).AndReturn(response) - - def expect_kill(self, - instance_ids, - response_code=ResponseCode.OK, - monitor_result=True, - skip_monitor=False): - for i in instance_ids: - query = TaskQuery(jobKeys=[self._job_key], - statuses=ACTIVE_STATES, - instanceIds=frozenset([int(i)])) - self._scheduler.killTasks( - query, - self._lock).InAnyOrder().AndReturn(make_response(response_code)) - - self.expect_job_monitor(response_code, instance_ids, monitor_result, skip_monitor) - - def expect_job_monitor(self, response_code, instance_ids, monitor_result=True, skip=False): - if skip or response_code != ResponseCode.OK: - return - - self._job_monitor.wait_until( - JobMonitor.terminal, - instance_ids, - with_timeout=True).InAnyOrder().AndReturn(monitor_result) - - def expect_add(self, instance_ids, task_config, response_code=ResponseCode.OK): - for i in instance_ids: - add_config = AddInstancesConfig( - key=self._job_key, - taskConfig=task_config, - instanceIds=frozenset([int(i)])) - self._scheduler.addInstances( - add_config, - self._lock).InAnyOrder().AndReturn(make_response(response_code)) - - def expect_update_instances(self, instance_ids, task_config): - for i in instance_ids: - self.expect_kill([i]) - self.expect_add([i], task_config) - self.expect_watch_instances([i]) - - def expect_add_instances(self, instance_ids, task_config): - for i in instance_ids: - self.expect_add([i], task_config) - self.expect_watch_instances([i]) - - def expect_kill_instances(self, instance_ids): - for i in instance_ids: - self.expect_kill([i]) - - def expect_start(self, response_code=ResponseCode.OK): - response = make_response(response_code) - response.result = Result(acquireLockResult=AcquireLockResult(lock=self._lock)) - self._scheduler.acquireLock(LockKey(job=self._job_key)).AndReturn(response) - - def expect_finish(self, response_code=ResponseCode.OK): - self._scheduler.releaseLock( - self._lock, - LockValidation.CHECKED).AndReturn(make_response(response_code)) - - def expect_quota_check(self, - num_released, - num_acquired, - response_code=ResponseCode.OK, - prod=True): - released = CapacityRequest(ResourceAggregate( - numCpus=num_released * self._num_cpus, - ramMb=num_released * self._num_ram, - diskMb=num_released * self._num_disk)) - acquired = CapacityRequest(ResourceAggregate( - numCpus=num_acquired * self._num_cpus, - ramMb=num_acquired * self._num_ram, - diskMb=num_acquired * self._num_disk)) - - self._quota_check.validate_quota_from_requested( - self._job_key, prod, released, acquired).AndReturn(make_response(response_code)) - - def make_task_configs(self, count=1, prod=True): - return [TaskConfig( - owner=Identity(role=self._job_key.role), - environment=self._job_key.environment, - jobName=self._job_key.name, - numCpus=self._num_cpus, - ramMb=self._num_ram, - diskMb=self._num_disk, - priority=0, - maxTaskFailures=1, - production=prod, - taskLinks={'task': 'link'}, - contactEmail='[email protected]', - executorConfig=ExecutorConfig(name='test', data='test data') - # Not setting any set()-related properties as that throws off mox verification. - )] * count - - def make_job_config(self, task_config, instance_count, cron_schedule=None): - return JobConfiguration( - key=self._job_key, - owner=Identity(role=self._job_key.role), - cronSchedule=cron_schedule, - taskConfig=deepcopy(task_config), - instanceCount=instance_count - ) - - def update_and_expect_ok(self, instances=None): - self.update_and_expect_response(ResponseCode.OK, instances) - - def update_and_expect_response(self, expected_code, instances=None, message=None): - resp = self._updater.update(instances) - assert expected_code == resp.responseCode, ( - 'Expected response:%s Actual response:%s' % (expected_code, resp.responseCode)) - - if message: - assert len(resp.details) == 1, ( - 'Unexpected error count:%s' % len(resp.details)) - - assert message in resp.details[0].message, ( - 'Expected %s message not found in: %s' % (message, resp.details[0].message)) - - def test_pulse_interval_not_supported(self): - update_config = self.UPDATE_CONFIG.copy() - update_config.update(pulse_interval_secs=60) - - with raises(Updater.Error) as e: - self.init_updater(update_config) - assert 'Pulse interval seconds is not supported in client updater.' in e.message - - def test_grow(self): - """Adds instances to the existing job.""" - old_configs = self.make_task_configs(3) - new_config = old_configs[0] - job_config = self.make_job_config(new_config, 7) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(0, 4) - self.expect_add_instances([3, 4, 5, 6], new_config) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_ok() - self.verify_mocks() - - def test_grow_fails_quota_check(self): - """Adds instances to the existing job fails due to not enough quota.""" - old_configs = self.make_task_configs(3) - new_config = old_configs[0] - job_config = self.make_job_config(new_config, 7) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(0, 4, response_code=ResponseCode.INVALID_REQUEST) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_response(expected_code=ResponseCode.ERROR) - self.verify_mocks() - - def test_non_to_prod_fails_quota_check(self): - """Update with shrinking with non->prod transition fails quota check.""" - old_configs = self.make_task_configs(4, prod=False) - new_config = deepcopy(old_configs[0]) - new_config.production = True - job_config = self.make_job_config(new_config, 2) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(0, 2, response_code=ResponseCode.INVALID_REQUEST) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_response(expected_code=ResponseCode.ERROR) - self.verify_mocks() - - def test_prod_to_non_always_passes_quota_check(self): - """Update with growth with prod->non transition always passes.""" - old_configs = self.make_task_configs(1, prod=True) - new_config = deepcopy(old_configs[0]) - new_config.production = False - job_config = self.make_job_config(new_config, 3) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(1, 0, prod=False) - self.expect_kill([0]) - self.expect_add_instances([0, 1, 2], new_config) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_ok() - self.verify_mocks() - - def test_shrink(self): - """Reduces the number of instances of the job.""" - old_configs = self.make_task_configs(10) - new_config = old_configs[0] - job_config = self.make_job_config(new_config, 3) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(7, 0) - self.expect_kill_instances([3, 4, 5, 6, 7, 8, 9]) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_ok() - self.verify_mocks() - - def test_update_and_grow(self): - """Updates existing instances and adds new ones.""" - old_configs = self.make_task_configs(3) - new_config = deepcopy(old_configs[0]) - new_config.priority = 5 - job_config = self.make_job_config(new_config, 7) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(3, 7) - self.expect_update_instances([0, 1, 2], new_config) - self.expect_add_instances([3, 4, 5, 6], new_config) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_ok() - self.verify_mocks() - - def test_update_and_shrink(self): - """Updates some existing instances and reduce the instance count.""" - old_configs = self.make_task_configs(10) - new_config = deepcopy(old_configs[0]) - new_config.priority = 5 - job_config = self.make_job_config(new_config, 1) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(10, 1) - self.expect_update_instances([0], new_config) - self.expect_kill_instances([1, 2, 3, 4, 5, 6, 7, 8, 9]) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_ok() - self.verify_mocks() - - def test_update_instances(self): - """Update existing instances.""" - old_configs = self.make_task_configs(5) - new_config = deepcopy(old_configs[0]) - new_config.priority = 5 - job_config = self.make_job_config(new_config, 5) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(5, 5) - self.expect_update_instances([0, 1, 2, 3, 4], new_config) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_ok() - self.verify_mocks() - - def test_grow_with_instance_option(self): - """Adding instances by providing an optional list of instance IDs.""" - old_configs = self.make_task_configs(3) - new_config = old_configs[0] - job_config = self.make_job_config(new_config, 5) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(0, 2) - self.expect_add_instances([3, 4], new_config) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_ok(instances=[3, 4]) - self.verify_mocks() - - def test_shrink_with_instance_option(self): - """Reducing instance count by providing an optional list of instance IDs.""" - old_configs = self.make_task_configs(10) - new_config = old_configs[0] - job_config = self.make_job_config(new_config, 4) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(6, 0) - self.expect_kill_instances([4, 5, 6, 7, 8, 9]) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_ok(instances=[4, 5, 6, 7, 8, 9]) - self.verify_mocks() - - def test_update_with_instance_option(self): - """Updating existing instances by providing an optional list of instance IDs.""" - old_configs = self.make_task_configs(10) - new_config = deepcopy(old_configs[0]) - new_config.priority = 5 - job_config = self.make_job_config(new_config, 10) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(3, 3) - self.expect_update_instances([2, 3, 4], new_config) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_ok(instances=[2, 3, 4]) - self.verify_mocks() - - def test_patch_hole_with_instance_option(self): - """Patching an instance ID gap created by a terminated update.""" - old_configs = self.make_task_configs(8) - new_config = old_configs[0] - job_config = self.make_job_config(new_config, 10) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs, [2, 3]) - self.expect_populate(job_config) - self.expect_quota_check(0, 2) - self.expect_add_instances([2, 3], new_config) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_ok([2, 3]) - self.verify_mocks() - - def test_noop_update(self): - """No update calls happen if task configs are in sync.""" - old_configs = self.make_task_configs(5) - new_config = old_configs[0] - job_config = self.make_job_config(new_config, 5) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(0, 0) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_ok() - self.verify_mocks() - - def test_update_rollback(self): - """Update process failures exceed total allowable count and update is rolled back.""" - update_config = self.UPDATE_CONFIG.copy() - update_config.update(max_per_shard_failures=1) - self.init_updater(update_config) - - old_configs = self.make_task_configs(10) - new_config = deepcopy(old_configs[0]) - new_config.priority = 5 - job_config = self.make_job_config(new_config, 10) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(10, 10) - self.expect_update_instances([0, 1], new_config) - self.expect_kill([2]) - self.expect_add([2], new_config) - self.expect_watch_instances([2], failed_instances=[2]) - self.expect_restart([2]) - self.expect_watch_instances([2], failed_instances=[2]) - self.expect_update_instances([2, 1, 0], old_configs[0]) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_response(ResponseCode.ERROR) - self.verify_mocks() - - def test_update_after_restart(self): - """Update succeeds after failed instances are restarted.""" - update_config = self.UPDATE_CONFIG.copy() - update_config.update(max_total_failures=2, max_per_shard_failures=1) - self.init_updater(update_config) - - old_configs = self.make_task_configs(6) - new_config = deepcopy(old_configs[0]) - new_config.priority = 5 - job_config = self.make_job_config(new_config, 6) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(6, 6) - self.expect_update_instances([0, 1], new_config) - self.expect_kill([2]) - self.expect_add([2], new_config) - self.expect_watch_instances([2], failed_instances=[2]) - self.expect_restart([2]) - self.expect_watch_instances([2]) - self.expect_update_instances([3, 4, 5], new_config) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_ok() - self.verify_mocks() - - def test_update_cron_job(self): - """Updating cron job.""" - new_config = self.make_task_configs(1)[0] - job_config = self.make_job_config(new_config, 1, cron_schedule='cron') - self._config.job_config = job_config - self.expect_start() - self.expect_cron_replace(job_config) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_ok() - self.verify_mocks() - - def test_start_invalid_response(self): - """The acquireLock call fails.""" - self.expect_start(response_code=ResponseCode.INVALID_REQUEST) - self.replay_mocks() - - self.update_and_expect_response(ResponseCode.INVALID_REQUEST) - self.verify_mocks() - - def test_finish_invalid_response(self): - """The releaseLock call fails.""" - new_config = self.make_task_configs(1)[0] - job_config = self.make_job_config(new_config, 1, cron_schedule='cron') - self._config.job_config = job_config - self.expect_start() - self.expect_cron_replace(job_config) - self.expect_finish(response_code=ResponseCode.INVALID_REQUEST) - self.replay_mocks() - - self.update_and_expect_response(ResponseCode.INVALID_REQUEST) - self.verify_mocks() - - def test_invalid_batch_size(self): - """Test for out of range error for batch size.""" - update_config = self.UPDATE_CONFIG.copy() - update_config.update(batch_size=0) - with raises(Updater.Error): - self.init_updater(update_config) - - def test_invalid_restart_threshold(self): - """Test for out of range error for restart threshold.""" - update_config = self.UPDATE_CONFIG.copy() - update_config.update(restart_threshold=0) - with raises(Updater.Error): - self.init_updater(update_config) - - def test_invalid_watch_secs(self): - """Test for out of range error for watch secs.""" - update_config = self.UPDATE_CONFIG.copy() - update_config.update(watch_secs=0) - with raises(Updater.Error): - self.init_updater(update_config) - - def test_update_invalid_response(self): - """A response code other than success is returned by a scheduler RPC.""" - old_configs = self.make_task_configs(5) - new_config = deepcopy(old_configs[0]) - new_config.priority = 5 - job_config = self.make_job_config(new_config, 5) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(5, 5) - self._scheduler_mux.raise_error() - self.expect_kill([0], skip_monitor=True) - self.expect_terminate() - self.replay_mocks() - - self.update_and_expect_response(ResponseCode.ERROR) - self.verify_mocks() - - def test_update_kill_timeout(self): - """Test job monitor timeout while waiting for tasks killed.""" - old_configs = self.make_task_configs(5) - new_config = deepcopy(old_configs[0]) - new_config.priority = 5 - job_config = self.make_job_config(new_config, 5) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(5, 5) - self.expect_kill([0], monitor_result=False) - self.expect_terminate() - self.replay_mocks() - - self.update_and_expect_response(ResponseCode.ERROR) - self.verify_mocks() - - def test_failed_update_populates_error_details(self): - """Test failed update populates Response.details.""" - old_configs = self.make_task_configs(5) - new_config = deepcopy(old_configs[0]) - new_config.priority = 5 - job_config = self.make_job_config(new_config, 5) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(5, 5) - self.expect_kill([0], monitor_result=False) - self.expect_terminate() - self.replay_mocks() - - self.update_and_expect_response(ResponseCode.ERROR, message="Aborting update without rollback") - self.verify_mocks() - - def test_job_does_not_exist(self): - """Unable to update a job that does not exist.""" - old_configs = self.make_task_configs(5) - new_config = deepcopy(old_configs[0]) - new_config.priority = 5 - job_config = self.make_job_config(new_config, 5) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs, response_code=ResponseCode.INVALID_REQUEST) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_response(ResponseCode.ERROR) - self.verify_mocks() - - def test_instances_outside_range(self): - """Provided optional instance IDs are outside of remote | local scope.""" - old_configs = self.make_task_configs(3) - new_config = old_configs[0] - job_config = self.make_job_config(new_config, 3) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_response(ResponseCode.ERROR, instances=[3, 4]) - self.verify_mocks() - - def test_update_skips_unretryable(self): - """Update process skips instances exceeding max_per_shard_failures""" - update_config = self.UPDATE_CONFIG.copy() - update_config.update(max_total_failures=1, max_per_shard_failures=2) - self.init_updater(update_config) - - old_configs = self.make_task_configs(10) - new_config = deepcopy(old_configs[0]) - new_config.priority = 5 - job_config = self.make_job_config(new_config, 10) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(10, 10) - self.expect_update_instances([0, 1], new_config) - self.expect_kill([2]) - self.expect_add([2], new_config) - self.expect_watch_instances([2], failed_instances=[2]) - self.expect_restart([2]) - self.expect_watch_instances([2], failed_instances=[2]) - self.expect_restart([2]) - self.expect_watch_instances([2], failed_instances=[2]) - self.expect_update_instances([3, 4, 5, 6, 7, 8, 9], new_config) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_ok() - self.verify_mocks() - - def test_diff_unordered_configs(self): - """Diff between two config objects with different repr but identical content works ok.""" - from_config = self.make_task_configs()[0] - from_config.constraints = set([ - Constraint(name='value', constraint=ValueConstraint(values=set(['1', '2']))), - Constraint(name='limit', constraint=TaskConstraint(limit=LimitConstraint(limit=int(10))))]) - from_config.taskLinks = {'task1': 'link1', 'task2': 'link2'} - from_config.metadata = set([ - Metadata(key='k2', value='v2'), - Metadata(key='k1', value='v1')]) - from_config.executorConfig = ExecutorConfig(name='test', data='test data') - from_config.requestedPorts = set(['3424', '142', '45235']) - - # Deepcopy() almost guarantees from_config != to_config due to a different sequence of - # dict insertions. That in turn generates unequal json objects. The ideal here would be to - # assert to_config != from_config but that would produce a flaky test as I have observed - # the opposite on rare occasions as the ordering is not stable between test runs. - to_config = deepcopy(from_config) - - diff_result = self._updater._diff_configs(from_config, to_config) - assert diff_result == "", ( - 'diff result must be empty but was: %s' % diff_result) - - def test_update_no_rollback(self): - """Update process failures exceed total allowable count and update is not rolled back.""" - update_config = self.UPDATE_CONFIG.copy() - update_config.update(max_total_failures=1, max_per_shard_failures=1, rollback_on_failure=False) - self.init_updater(update_config) - - old_configs = self.make_task_configs(10) - new_config = deepcopy(old_configs[0]) - new_config.priority = 5 - job_config = self.make_job_config(new_config, 10) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(10, 10) - self.expect_kill([0]) - self.expect_add([0], new_config) - self.expect_watch_instances([0], failed_instances=[0]) - self.expect_restart([0]) - self.expect_watch_instances([0], failed_instances=[0]) - self.expect_kill([1]) - self.expect_add([1], new_config) - self.expect_watch_instances([1], failed_instances=[1]) - self.expect_restart([1]) - self.expect_watch_instances([1], failed_instances=[1]) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_response(ResponseCode.ERROR) - self.verify_mocks() - - def test_update_instances_wait_for_batch_completion_filled_batch(self): - """Update existing instances with wait_for_batch_completion flag set.""" - update_config = self.UPDATE_CONFIG.copy() - update_config.update(wait_for_batch_completion=True, batch_size=2) - self.init_updater(update_config) - - old_configs = self.make_task_configs(6) - new_config = deepcopy(old_configs[0]) - new_config.priority = 5 - job_config = self.make_job_config(new_config, 6) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(6, 6) - self.expect_update_instances([0, 1, 2, 3, 4, 5], new_config) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_ok() - self.verify_mocks() - - def test_update_instances_wait_for_batch_completion_partially_filled_batch(self): - """Update existing instances with wait_for_batch_completion flag set.""" - update_config = self.UPDATE_CONFIG.copy() - update_config.update(wait_for_batch_completion=True, batch_size=3) - self.init_updater(update_config) - - old_configs = self.make_task_configs(5) - new_config = deepcopy(old_configs[0]) - new_config.priority = 5 - job_config = self.make_job_config(new_config, 5) - self._config.job_config = job_config - self.expect_start() - self.expect_get_tasks(old_configs) - self.expect_populate(job_config) - self.expect_quota_check(5, 5) - self.expect_update_instances([0, 1, 2, 3, 4], new_config) - self.expect_finish() - self.replay_mocks() - - self.update_and_expect_ok() - self.verify_mocks() http://git-wip-us.apache.org/repos/asf/aurora/blob/3e1f8235/src/test/python/apache/aurora/client/cli/BUILD ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/cli/BUILD b/src/test/python/apache/aurora/client/cli/BUILD index 1b14e8c..6d4d5e9 100644 --- a/src/test/python/apache/aurora/client/cli/BUILD +++ b/src/test/python/apache/aurora/client/cli/BUILD @@ -29,7 +29,6 @@ target( ':sla', ':task', ':supdate', - ':update', ':version', ] ) @@ -101,20 +100,8 @@ python_tests( ) python_tests( - name='update', - sources=['test_update.py'], - dependencies = [ - ':util', - '3rdparty/python:mock', - '3rdparty/python:twitter.common.contextutil', - 'src/main/python/apache/aurora/client', - ] -) - -python_tests( name = 'job', sources = [ - 'test_cancel_update.py', 'test_create.py', 'test_diff.py', 'test_kill.py', http://git-wip-us.apache.org/repos/asf/aurora/blob/3e1f8235/src/test/python/apache/aurora/client/cli/test_cancel_update.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/cli/test_cancel_update.py b/src/test/python/apache/aurora/client/cli/test_cancel_update.py deleted file mode 100644 index d4fc049..0000000 --- a/src/test/python/apache/aurora/client/cli/test_cancel_update.py +++ /dev/null @@ -1,62 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from mock import call, patch - -from apache.aurora.client.cli.client import AuroraCommandLine - -from .util import AuroraClientCommandTest, FakeAuroraCommandContext - -from gen.apache.aurora.api.ttypes import JobKey, Lock, LockKey, LockValidation, TaskQuery - - -class TestClientCancelUpdateCommand(AuroraClientCommandTest): - def test_simple_successful_cancel_update(self): - """Run a test of the "kill" command against a mocked-out API: - Verifies that the kill command sends the right API RPCs, and performs the correct - tests on the result.""" - mock_context = FakeAuroraCommandContext() - mock_api = mock_context.get_api('west') - mock_api.cancel_update.return_value = self.create_simple_success_response() - with patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context): - cmd = AuroraCommandLine() - cmd.execute(['job', 'cancel-update', self.TEST_JOBSPEC]) - assert mock_api.cancel_update.mock_calls == [call(self.TEST_JOBKEY, config=None)] - - @classmethod - def get_expected_task_query(cls, shards=None): - instance_ids = frozenset(shards) if shards is not None else None - # Helper to create the query that will be a parameter to job kill. - return TaskQuery( - taskIds=None, - instanceIds=instance_ids, - jobKeys=[JobKey(role=cls.TEST_ROLE, environment=cls.TEST_ENV, name=cls.TEST_JOB)]) - - @classmethod - def get_release_lock_response(cls): - """Set up the response to a startUpdate API call.""" - return cls.create_simple_success_response() - - def test_cancel_update_api_level(self): - """Test kill client-side API logic.""" - (mock_api, mock_scheduler_proxy) = self.create_mock_api() - mock_scheduler_proxy.releaseLock.return_value = self.get_release_lock_response() - with patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy): - cmd = AuroraCommandLine() - cmd.execute(['job', 'cancel-update', self.TEST_JOBSPEC]) - - # All that cancel_update really does is release the update lock. - # So that's all we really need to check. - assert mock_scheduler_proxy.releaseLock.mock_calls == [ - call(Lock(key=LockKey(job=self.TEST_JOBKEY.to_thrift())), LockValidation.UNCHECKED)]
