Repository: aurora Updated Branches: refs/heads/master ff6e05f05 -> 534c4694c
Deprecating --restart-threshold option in 'aurora job restart' Bugs closed: AURORA-1631 Reviewed at https://reviews.apache.org/r/46587/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/534c4694 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/534c4694 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/534c4694 Branch: refs/heads/master Commit: 534c4694c052baa687b3cc938024802593c3e49b Parents: ff6e05f Author: Maxim Khutornenko <[email protected]> Authored: Mon Apr 25 16:22:16 2016 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Mon Apr 25 16:22:16 2016 -0700 ---------------------------------------------------------------------- RELEASE-NOTES.md | 5 + .../aurora/client/api/error_handling_thread.py | 75 ------------ .../apache/aurora/client/api/health_check.py | 25 ++-- .../aurora/client/api/instance_watcher.py | 36 ++---- .../apache/aurora/client/api/job_monitor.py | 7 +- .../apache/aurora/client/api/restarter.py | 3 - .../apache/aurora/client/api/scheduler_mux.py | 121 ------------------- .../apache/aurora/client/api/task_util.py | 63 +--------- .../python/apache/aurora/client/cli/jobs.py | 11 +- .../aurora/client/api/test_health_check.py | 26 ++-- .../aurora/client/api/test_instance_watcher.py | 55 ++------- .../apache/aurora/client/api/test_restarter.py | 1 - .../aurora/client/api/test_scheduler_mux.py | 72 ----------- .../apache/aurora/client/api/test_task_util.py | 33 ++--- .../apache/aurora/client/cli/test_restart.py | 58 ++------- 15 files changed, 74 insertions(+), 517 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/RELEASE-NOTES.md ---------------------------------------------------------------------- diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 4b810f2..3f2c54c 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -9,6 +9,11 @@ role's user. - Upgraded Mesos to 0.27.2 +### Deprecations and removals: + +- Deprecated `--restart-threshold` option in the `aurora job restart` command to match the job + updater behavior. This option has no effect now and will be removed in the future release. + 0.13.0 ------ http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/main/python/apache/aurora/client/api/error_handling_thread.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/api/error_handling_thread.py b/src/main/python/apache/aurora/client/api/error_handling_thread.py deleted file mode 100644 index 530715a..0000000 --- a/src/main/python/apache/aurora/client/api/error_handling_thread.py +++ /dev/null @@ -1,75 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import sys -import traceback -from threading import Thread - -from twitter.common.decorators import identify_thread - -try: - from Queue import Queue -except ImportError: - from queue import Queue - - -class ExecutionError(Exception): - """Unhandled thread error wrapper. Raised on the calling thread.""" - pass - - -class ErrorHandlingThread(Thread): - """A thread that helps with unhandled exceptions by re-raising errors - with the parent thread upon completion.""" - - def __init__(self, *args, **kw): - super(ErrorHandlingThread, self).__init__(*args, **kw) - self.__real_run, self.run = self.run, self._excepting_run - self.__errors = Queue() - - @identify_thread - def _excepting_run(self, *args, **kw): - try: - self.__real_run(*args, **kw) - self.__errors.put(None) - except Exception: - try: - e_type, e_val, e_tb = sys.exc_info() - self.__errors.put(ExecutionError( - 'Unhandled error while running worker thread. ' - 'Original error details: %s' % traceback.format_exception(e_type, e_val, e_tb))) - except: # noqa - # This appears to be the only way to avoid nasty "interpreter shutdown" errors when - # dealing with daemon threads. While not ideal, there is nothing else we could do here - # if the sys.exc_info() call fails. - pass - - def join_and_raise(self): - """Waits for completion and re-raises any exception on a caller thread.""" - error = self.__errors.get(timeout=sys.maxint) # Timeout for interruptibility. - if error is not None: - raise error - - -def spawn_worker(target, *args, **kwargs): - """Creates and starts a new daemon worker thread. - - Arguments: - target -- target method. - - Returns thread handle. - """ - thread = ErrorHandlingThread(target=target, *args, **kwargs) - thread.daemon = True - thread.start() - return thread http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/main/python/apache/aurora/client/api/health_check.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/api/health_check.py b/src/main/python/apache/aurora/client/api/health_check.py index 0503c0b..b3d8331 100644 --- a/src/main/python/apache/aurora/client/api/health_check.py +++ b/src/main/python/apache/aurora/client/api/health_check.py @@ -23,7 +23,7 @@ from gen.apache.aurora.api.ttypes import ScheduleStatus class HealthCheck(Interface): @abstractmethod def health(self, task): - """Checks health of the task and returns a (healthy, retriable) pair.""" + """Checks health of the task and returns a True or False.""" class HealthStatus(object): @@ -35,22 +35,11 @@ class HealthStatus(object): def dead(cls): return cls(False).health() - def __init__(self, retry, health): - self._retry = retry + def __init__(self, health): self._health = health def health(self): - return (self._health, self._retry) - - -class NotRetriable(HealthStatus): - def __init__(self, health): - super(NotRetriable, self).__init__(False, health) - - -class Retriable(HealthStatus): - def __init__(self, health): - super(Retriable, self).__init__(True, health) + return self._health class StatusHealthCheck(HealthCheck): @@ -70,12 +59,12 @@ class StatusHealthCheck(HealthCheck): if status == ScheduleStatus.RUNNING: if instance_id in self._task_ids: if task_id == self._task_ids.get(instance_id): - return Retriable.alive() + return HealthStatus.alive() else: - return NotRetriable.dead() + return HealthStatus.dead() else: log.info('Detected RUNNING instance %s' % instance_id) self._task_ids[instance_id] = task_id - return Retriable.alive() + return HealthStatus.alive() else: - return Retriable.dead() + return HealthStatus.dead() http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/main/python/apache/aurora/client/api/instance_watcher.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/api/instance_watcher.py b/src/main/python/apache/aurora/client/api/instance_watcher.py index 6ed8154..a35fb22 100644 --- a/src/main/python/apache/aurora/client/api/instance_watcher.py +++ b/src/main/python/apache/aurora/client/api/instance_watcher.py @@ -18,7 +18,7 @@ from threading import Event from twitter.common import log from .health_check import StatusHealthCheck -from .task_util import StatusMuxHelper +from .task_util import StatusHelper from gen.apache.aurora.api.ttypes import ScheduleStatus, TaskQuery @@ -41,21 +41,18 @@ class InstanceWatcher(object): def __init__(self, scheduler, job_key, - restart_threshold, watch_secs, health_check_interval_seconds, clock=time, - terminating_event=None, - scheduler_mux=None): + terminating_event=None): self._scheduler = scheduler self._job_key = job_key - self._restart_threshold = restart_threshold self._watch_secs = watch_secs self._health_check_interval_seconds = health_check_interval_seconds self._clock = clock self._terminating = terminating_event or Event() - self._status_helper = StatusMuxHelper(self._scheduler, self._create_query, scheduler_mux) + self._status_helper = StatusHelper(self._scheduler, self._create_query) def watch(self, instance_ids, health_check=None): """Watches a set of instances and detects failures based on a delegated health check. @@ -68,9 +65,6 @@ class InstanceWatcher(object): log.info('Watching instances: %s' % instance_ids) instance_ids = set(instance_ids) health_check = health_check or StatusHealthCheck() - now = self._clock.time() - expected_healthy_by = now + self._restart_threshold - max_time = now + self._restart_threshold + self._watch_secs instance_states = {} @@ -86,15 +80,13 @@ class InstanceWatcher(object): instance_id, self._watch_secs)) instance.set_healthy(True) - def maybe_set_instance_unhealthy(instance_id, retriable): - # An instance that was previously healthy and currently unhealthy has failed. + def set_instance_unhealthy(instance_id): + log.info('Instance %s is unhealthy' % instance_id) if instance_id in instance_states: - log.info('Instance %s is unhealthy' % instance_id) + # An instance that was previously healthy and currently unhealthy has failed. instance_states[instance_id].set_healthy(False) - # If the restart threshold has expired or if the instance cannot be retried it is unhealthy. - elif now > expected_healthy_by or not retriable: - log.info('Instance %s was not reported healthy within %d seconds' % ( - instance_id, self._restart_threshold)) + else: + # An instance never passed a health check (e.g.: failed before the first health check). instance_states[instance_id] = Instance(finished=True) while not self._terminating.is_set(): @@ -105,14 +97,11 @@ class InstanceWatcher(object): if instance_id not in finished_instances(): running_task = tasks_by_instance.get(instance_id) if running_task is not None: - task_healthy, retriable = health_check.health(running_task) + task_healthy = health_check.health(running_task) if task_healthy: set_instance_healthy(instance_id, now) else: - maybe_set_instance_unhealthy(instance_id, retriable) - else: - # Set retriable=True since an instance should be retried if it has not been healthy. - maybe_set_instance_unhealthy(instance_id, retriable=True) + set_instance_unhealthy(instance_id) log.debug('Instances health: %s' % ['%s: %s' % val for val in instance_states.items()]) @@ -120,11 +109,6 @@ class InstanceWatcher(object): if set(finished_instances().keys()) == instance_ids: return set([s_id for s_id, s in instance_states.items() if not s.healthy]) - # Return if time is up. - if now > max_time: - return set([s_id for s_id in instance_ids if s_id not in instance_states - or not instance_states[s_id].healthy]) - self._terminating.wait(self._health_check_interval_seconds) def terminate(self): http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/main/python/apache/aurora/client/api/job_monitor.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/api/job_monitor.py b/src/main/python/apache/aurora/client/api/job_monitor.py index aa7e976..d420a96 100644 --- a/src/main/python/apache/aurora/client/api/job_monitor.py +++ b/src/main/python/apache/aurora/client/api/job_monitor.py @@ -16,7 +16,7 @@ from threading import Event from twitter.common.quantity import Amount, Time -from .task_util import StatusMuxHelper +from .task_util import StatusHelper from gen.apache.aurora.api.constants import LIVE_STATES, TERMINAL_STATES from gen.apache.aurora.api.ttypes import JobKey, TaskQuery @@ -35,14 +35,13 @@ class JobMonitor(object): return status in TERMINAL_STATES def __init__(self, scheduler, job_key, terminating_event=None, - min_poll_interval=MIN_POLL_INTERVAL, max_poll_interval=MAX_POLL_INTERVAL, - scheduler_mux=None): + min_poll_interval=MIN_POLL_INTERVAL, max_poll_interval=MAX_POLL_INTERVAL): self._scheduler = scheduler self._job_key = job_key self._min_poll_interval = min_poll_interval self._max_poll_interval = max_poll_interval self._terminating = terminating_event or Event() - self._status_helper = StatusMuxHelper(self._scheduler, self.create_query, scheduler_mux) + self._status_helper = StatusHelper(self._scheduler, self.create_query) def iter_tasks(self, instances): tasks = self._status_helper.get_tasks(instances) http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/main/python/apache/aurora/client/api/restarter.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/api/restarter.py b/src/main/python/apache/aurora/client/api/restarter.py index fbe8f20..6600c6b 100644 --- a/src/main/python/apache/aurora/client/api/restarter.py +++ b/src/main/python/apache/aurora/client/api/restarter.py @@ -26,14 +26,12 @@ from gen.apache.aurora.api.ttypes import ResponseCode class RestartSettings(object): def __init__(self, batch_size, - restart_threshold, max_per_instance_failures, max_total_failures, watch_secs, health_check_interval_seconds): self.batch_size = batch_size - self.restart_threshold = restart_threshold self.max_per_instance_failures = max_per_instance_failures self.max_total_failures = max_total_failures self.watch_secs = watch_secs @@ -55,7 +53,6 @@ class Restarter(object): self._instance_watcher = instance_watcher or InstanceWatcher( scheduler, job_key.to_thrift(), - restart_settings.restart_threshold, restart_settings.watch_secs, restart_settings.health_check_interval_seconds) http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/main/python/apache/aurora/client/api/scheduler_mux.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/api/scheduler_mux.py b/src/main/python/apache/aurora/client/api/scheduler_mux.py deleted file mode 100644 index 0832a13..0000000 --- a/src/main/python/apache/aurora/client/api/scheduler_mux.py +++ /dev/null @@ -1,121 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import threading -from collections import defaultdict, namedtuple - -from twitter.common.quantity import Amount, Time - -from .error_handling_thread import spawn_worker - -try: - from Queue import Queue, Empty -except ImportError: - from queue import Queue, Empty - - -class SchedulerMux(object): - """Multiplexes scheduler RPC requests on a dedicated worker thread.""" - - class Error(Exception): - """Call error wrapper.""" - pass - - OK_RESULT = 1 - DEFAULT_WAIT_TIMEOUT = Amount(1, Time.SECONDS) - DEFAULT_JOIN_TIMEOUT = Amount(5, Time.SECONDS) - DEFAULT_RPC_TIMEOUT = Amount(120, Time.SECONDS) - WORK_ITEM = namedtuple('WorkItem', ['completion_queue', 'command', 'data', 'aggregator']) - - def __init__(self, wait_timeout=DEFAULT_WAIT_TIMEOUT): - self.__queue = Queue() - self.__terminating = threading.Event() - self.__wait_timeout = wait_timeout - self.__worker = spawn_worker(self.__monitor) - - def __monitor(self): - """Main body of the multiplexer thread. - - This method repeatedly polls the worker queue for new calls, and then - dispatches them in batches to the scheduler. - Callers are notified when their requests complete.""" - - requests_by_command = defaultdict(list) - while not self.__terminating.is_set(): - try: - work_item = self.__queue.get(timeout=self.__wait_timeout.as_(Time.SECONDS)) - requests_by_command[work_item.command].append(work_item) - except Empty: - self.__call_and_notify(requests_by_command) - requests_by_command = defaultdict(list) - - def __call_and_notify(self, requests_by_command): - """Batch executes scheduler requests and notifies on completion. - - Takes a set of RPC requests grouped by command type, dispatches them to the scheduler, - and then waits for the batched calls to complete. When a call is completed, its callers - will be notified via the completion queue.""" - - for command, work_items in requests_by_command.items(): - request = [item.data for item in work_items] - request = work_items[0].aggregator(request) if work_items[0].aggregator else request - result_status = self.OK_RESULT - result_data = None - try: - result_data = command(request) - except (self.Error, Exception) as e: - result_status = e - - for work_item in work_items: - work_item.completion_queue.put((result_status, result_data)) - - def _enqueue(self, completion_queue, command, data, aggregator): - """Queues up a scheduler call for a delayed (batched) completion. - - Arguments: - completion_queue -- completion queue to notify caller on completion. - command -- callback signature accepting a list of data. - data -- single request data object to be batched with other similar requests. - aggregator -- callback function for data aggregation. - """ - self.__queue.put(self.WORK_ITEM(completion_queue, command, data, aggregator)) - - def terminate(self): - """Requests the SchedulerMux to terminate.""" - self.__terminating.set() - self.__worker.join(timeout=self.DEFAULT_JOIN_TIMEOUT.as_(Time.SECONDS)) - - def enqueue_and_wait(self, command, data, aggregator=None, timeout=DEFAULT_RPC_TIMEOUT): - """Queues up the scheduler call and waits for completion. - - Arguments: - command -- scheduler command to run. - data -- data to query scheduler for. - aggregator -- callback function for data aggregation. - timeout -- amount of time to wait for completion. - - Returns the aggregated command call response. Response data decomposition is up to the caller. - """ - try: - completion_queue = Queue() - self._enqueue(completion_queue, command, data, aggregator) - result = completion_queue.get(timeout=timeout.as_(Time.SECONDS)) - result_status = result[0] - if result_status != self.OK_RESULT and not self.__terminating.is_set(): - if isinstance(result_status, self.Error): - raise result_status - else: - raise self.Error('Unknown error: %s' % result_status) - return result[1] - except Empty: - raise self.Error('Failed to complete operation within %s' % timeout) http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/main/python/apache/aurora/client/api/task_util.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/api/task_util.py b/src/main/python/apache/aurora/client/api/task_util.py index b5244ee..fb7c76f 100644 --- a/src/main/python/apache/aurora/client/api/task_util.py +++ b/src/main/python/apache/aurora/client/api/task_util.py @@ -11,27 +11,22 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from itertools import chain - from twitter.common import log from apache.aurora.client.base import format_response -from .scheduler_mux import SchedulerMux - from gen.apache.aurora.api.ttypes import ResponseCode -class StatusMuxHelper(object): - """Handles mux/demux logic of the getTasksWithoutConfigs RPC.""" +class StatusHelper(object): + """Simple wrapper around getTasksWithoutConfigs RPC call.""" - def __init__(self, scheduler, query_factory, scheduler_mux=None): + def __init__(self, scheduler, query_factory): self._scheduler = scheduler self._query_factory = query_factory - self._scheduler_mux = scheduler_mux def get_tasks(self, instance_ids=None): - """Routes call to either immediate direct or multiplexed threaded execution. + """Gets tasks from the scheduler. Arguments: instance_ids -- optional list of instance IDs to query for. @@ -39,46 +34,8 @@ class StatusMuxHelper(object): Returns a list of tasks. """ log.debug('Querying instance statuses: %s' % instance_ids) - - if self._scheduler_mux is not None: - return self._get_tasks_multiplexed(instance_ids) - else: - return self._get_tasks(self._query_factory(instance_ids)) - - def _get_tasks_multiplexed(self, instance_ids=None): - """Gets tasks via SchedulerMux. - - Arguments: - instance_ids -- optional list of instance IDs to query for. - - Returns a list of tasks. - """ - tasks = [] - include_ids = lambda id: id in instance_ids if instance_ids is not None else True - - log.debug('Batch getting task status: %s' % instance_ids) try: - unfiltered_tasks = self._scheduler_mux.enqueue_and_wait( - self._get_tasks, - instance_ids if instance_ids else [], - self._create_aggregated_query) - tasks = [task for task in unfiltered_tasks if include_ids(task.assignedTask.instanceId)] - except SchedulerMux.Error as e: - log.error('Failed to query status for instances %s. Reason: %s' % (instance_ids, e)) - - log.debug('Done batch getting task status: %s' % instance_ids) - return tasks - - def _get_tasks(self, query): - """Gets tasks directly via SchedulerProxy. - - Arguments: - query -- TaskQuery instance. - - Returns a list of tasks. - """ - try: - resp = self._scheduler.getTasksWithoutConfigs(query) + resp = self._scheduler.getTasksWithoutConfigs(self._query_factory(instance_ids)) except IOError as e: log.error('IO Exception during scheduler call: %s' % e) return [] @@ -89,13 +46,3 @@ class StatusMuxHelper(object): log.debug(format_response(resp)) return tasks - - def _create_aggregated_query(self, instance_id_lists): - """Aggregates multiple instance_id lists into a single list. - - Arguments: - instance_id_lists -- list of lists of int. - """ - instance_ids = list(chain.from_iterable(instance_id_lists)) - log.debug('Aggregated instance ids to query status: %s' % instance_ids) - return self._query_factory(instance_ids) http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/main/python/apache/aurora/client/cli/jobs.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/cli/jobs.py b/src/main/python/apache/aurora/client/cli/jobs.py index 3cbd607..e8bc38a 100644 --- a/src/main/python/apache/aurora/client/cli/jobs.py +++ b/src/main/python/apache/aurora/client/cli/jobs.py @@ -615,9 +615,9 @@ class RestartCommand(Verb): CommandOption("--max-per-instance-failures", type=int, default=0, help="Maximum number of restarts per instance during restart. Increments total " "failure count when this limit is exceeded."), - CommandOption("--restart-threshold", type=int, default=60, - help="Maximum number of seconds before an instance must move into the RUNNING state " - "before considered a failure.")] + CommandOption("--restart-threshold", type=int, default=0, + help="This setting is DEPRECATED, will not have any effect if provided and will be " + "removed in the next release.")] @property def help(self): @@ -633,6 +633,10 @@ class RestartCommand(Verb): context.options.max_total_failures) return EXIT_INVALID_PARAMETER + if context.options.restart_threshold: + context.print_out("WARNING: '--restart-threshold' option is no longer supported and will be " + "removed in the next release.") + job = context.options.instance_spec.jobkey instances = (None if context.options.instance_spec.instance == ALL_INSTANCES else context.options.instance_spec.instance) @@ -642,7 +646,6 @@ class RestartCommand(Verb): config = context.get_job_config_optional(job, context.options.config) restart_settings = RestartSettings( batch_size=context.options.batch_size, - restart_threshold=context.options.restart_threshold, watch_secs=context.options.watch_secs, max_per_instance_failures=context.options.max_per_instance_failures, max_total_failures=context.options.max_total_failures, http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/test/python/apache/aurora/client/api/test_health_check.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/api/test_health_check.py b/src/test/python/apache/aurora/client/api/test_health_check.py index af005be..f1be827 100644 --- a/src/test/python/apache/aurora/client/api/test_health_check.py +++ b/src/test/python/apache/aurora/client/api/test_health_check.py @@ -16,12 +16,7 @@ import unittest import mox -from apache.aurora.client.api.health_check import ( - HealthCheck, - NotRetriable, - Retriable, - StatusHealthCheck -) +from apache.aurora.client.api.health_check import HealthCheck, StatusHealthCheck from gen.apache.aurora.api.ttypes import AssignedTask, ScheduledTask, ScheduleStatus, TaskConfig @@ -55,26 +50,19 @@ class HealthCheckTest(unittest.TestCase): """Verify that running instances are reported healthy""" task_a = self.create_task(0, 'a') task_b = self.create_task(1, 'b') - assert self._status_health_check.health(task_a) == Retriable.alive() - assert self._status_health_check.health(task_b) == Retriable.alive() + assert self._status_health_check.health(task_a) + assert self._status_health_check.health(task_b) def test_failed_status_health_check(self): """Verify that the health check fails for tasks in a state other than RUNNING""" pending_task = self.create_task(0, 'a', status=PENDING) failed_task = self.create_task(1, 'b', status=FAILED) - assert self._status_health_check.health(pending_task) == Retriable.dead() - assert self._status_health_check.health(failed_task) == Retriable.dead() + assert not self._status_health_check.health(pending_task) + assert not self._status_health_check.health(failed_task) def test_changed_task_id(self): """Verifes that an instance with a different task id causes the health check to fail""" task_a = self.create_task(0, 'a') task_b = self.create_task(0, 'b') - assert self._status_health_check.health(task_a) == Retriable.alive() - assert self._status_health_check.health(task_b) == NotRetriable.dead() - - def test_health_statuses(self): - """Verfies that the health status tuple (health, retry_status) are as expected""" - assert Retriable.alive() == (True, True) - assert Retriable.dead() == (False, True) - assert NotRetriable.alive() == (True, False) - assert NotRetriable.dead() == (False, False) + assert self._status_health_check.health(task_a) + assert not self._status_health_check.health(task_b) http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/test/python/apache/aurora/client/api/test_instance_watcher.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/api/test_instance_watcher.py b/src/test/python/apache/aurora/client/api/test_instance_watcher.py index 9efe1d4..8fd419f 100644 --- a/src/test/python/apache/aurora/client/api/test_instance_watcher.py +++ b/src/test/python/apache/aurora/client/api/test_instance_watcher.py @@ -68,7 +68,7 @@ def find_expected_cycles(period, sleep_secs): class InstanceWatcherTest(unittest.TestCase): WATCH_INSTANCES = range(3) - RESTART_THRESHOLD = WATCH_SECS = 50 + WATCH_SECS = 50 EXPECTED_CYCLES = find_expected_cycles(WATCH_SECS, 3.0) def setUp(self): @@ -79,7 +79,6 @@ class InstanceWatcherTest(unittest.TestCase): self._health_check = mox.MockObject(HealthCheck) self._watcher = InstanceWatcher(self._scheduler, self._job_key, - self.RESTART_THRESHOLD, self.WATCH_SECS, health_check_interval_seconds=3, clock=self._clock, @@ -114,12 +113,13 @@ class InstanceWatcherTest(unittest.TestCase): for _ in range(int(num_calls)): self._scheduler.getTasksWithoutConfigs(query).AndRaise(IOError('oops')) - def mock_health_check(self, task, status, retry): - self._health_check.health(task).InAnyOrder().AndReturn((status, retry)) + def mock_health_check(self, task, status): + self._health_check.health(task).InAnyOrder().AndReturn(status) - def expect_health_check(self, instance, status, retry=True, num_calls=EXPECTED_CYCLES): + def expect_health_check(self, instance, status, num_calls=EXPECTED_CYCLES): + num_calls = num_calls if status else 1 for _ in range(int(num_calls)): - self.mock_health_check(self.create_task(instance), status, retry) + self.mock_health_check(self.create_task(instance), status) def assert_watch_result(self, expected_failed_instances, instances_to_watch=WATCH_INSTANCES): instances_returned = self._watcher.watch(instances_to_watch, self._health_check) @@ -155,18 +155,9 @@ class InstanceWatcherTest(unittest.TestCase): self.assert_watch_result([0]) self.verify_mocks() - def test_io_failure(self): - """Check that IO errors (socket errors) communicating with the scheduler get handled - correctly""" - - self.expect_io_error_in_get_statuses() - self.replay_mocks() - self.assert_watch_result([0, 1, 2]) - self.verify_mocks() - def test_all_instance_failure(self): """All failed instance in a batch of instances""" - self.expect_get_statuses() + self.expect_get_statuses(num_calls=1) self.expect_health_check(0, False) self.expect_health_check(1, False) self.expect_health_check(2, False) @@ -174,39 +165,15 @@ class InstanceWatcherTest(unittest.TestCase): self.assert_watch_result([0, 1, 2]) self.verify_mocks() - def test_restart_threshold_fail_fast(self): - """Instances are reported unhealthy with retry set to False""" - self.expect_get_statuses(num_calls=1) - self.expect_health_check(0, False, retry=False, num_calls=1) - self.expect_health_check(1, False, retry=False, num_calls=1) - self.expect_health_check(2, False, retry=False, num_calls=1) - self.replay_mocks() - self.assert_watch_result([0, 1, 2]) - self.verify_mocks() - - def test_restart_threshold(self): - """Instances are reported healthy at the end of the restart_threshold""" - self.expect_get_statuses(num_calls=self.EXPECTED_CYCLES - 1) - self.expect_health_check(0, False, num_calls=self.EXPECTED_CYCLES - 1) - self.expect_health_check(1, False, num_calls=self.EXPECTED_CYCLES - 1) - self.expect_health_check(2, False, num_calls=self.EXPECTED_CYCLES - 1) - self.expect_get_statuses() - self.expect_health_check(0, True) - self.expect_health_check(1, True) - self.expect_health_check(2, True) - self.replay_mocks() - self.assert_watch_result([]) - self.verify_mocks() - def test_watch_period_failure(self): """Instances are reported unhealthy before watch_secs expires""" self.expect_get_statuses() self.expect_health_check(0, True, num_calls=self.EXPECTED_CYCLES - 1) self.expect_health_check(1, True, num_calls=self.EXPECTED_CYCLES - 1) self.expect_health_check(2, True, num_calls=self.EXPECTED_CYCLES - 1) - self.expect_health_check(0, False, num_calls=1) - self.expect_health_check(1, False, num_calls=1) - self.expect_health_check(2, False, num_calls=1) + self.expect_health_check(0, False) + self.expect_health_check(1, False) + self.expect_health_check(2, False) self.replay_mocks() self.assert_watch_result([0, 1, 2]) self.verify_mocks() @@ -217,7 +184,7 @@ class InstanceWatcherTest(unittest.TestCase): self.expect_health_check(0, True) self.expect_health_check(1, True) self.expect_health_check(2, True, num_calls=self.EXPECTED_CYCLES - 1) - self.expect_health_check(2, False, num_calls=1) + self.expect_health_check(2, False) self.replay_mocks() self.assert_watch_result([2]) self.verify_mocks() http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/test/python/apache/aurora/client/api/test_restarter.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/api/test_restarter.py b/src/test/python/apache/aurora/client/api/test_restarter.py index ff2002e..a81003e 100644 --- a/src/test/python/apache/aurora/client/api/test_restarter.py +++ b/src/test/python/apache/aurora/client/api/test_restarter.py @@ -39,7 +39,6 @@ CLUSTER = 'east' JOB = AuroraJobKey(CLUSTER, 'johndoe', 'test', 'test_job') RESTART_SETTINGS = RestartSettings( batch_size=2, - restart_threshold=23, watch_secs=45, max_per_instance_failures=0, max_total_failures=0, http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/test/python/apache/aurora/client/api/test_scheduler_mux.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_mux.py b/src/test/python/apache/aurora/client/api/test_scheduler_mux.py deleted file mode 100644 index 021175c..0000000 --- a/src/test/python/apache/aurora/client/api/test_scheduler_mux.py +++ /dev/null @@ -1,72 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import time -import unittest - -from twitter.common.quantity import Amount, Time - -from apache.aurora.client.api.scheduler_mux import SchedulerMux - - -class SchedulerMuxTest(unittest.TestCase): - - DATA = [1, 2, 3] - MUX = None - - @classmethod - def setUpClass(cls): - cls.MUX = SchedulerMux(wait_timeout=Amount(10, Time.MILLISECONDS)) - - @classmethod - def tearDownClass(cls): - cls.MUX.terminate() - - @classmethod - def error_command(cls, data): - raise SchedulerMux.Error('expected') - - @classmethod - def unknown_error_command(cls, data): - raise Exception('expected') - - @classmethod - def timeout_command(cls, data): - time.sleep(2) - - def test_success(self): - assert [self.DATA] == self.MUX.enqueue_and_wait(lambda d: d, self.DATA) - - def test_failure(self): - try: - self.MUX.enqueue_and_wait(self.error_command, self.DATA) - except SchedulerMux.Error as e: - assert 'expected' in e.message - else: - self.fail() - - def test_unknown_failure(self): - try: - self.MUX.enqueue_and_wait(self.unknown_error_command, self.DATA) - except SchedulerMux.Error as e: - assert 'Unknown error' in e.message - else: - self.fail() - - def test_timeout(self): - try: - self.MUX.enqueue_and_wait(self.timeout_command, self.DATA, timeout=Amount(1, Time.SECONDS)) - except SchedulerMux.Error as e: - 'Failed to complete operation' in e.message - else: - self.fail() http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/test/python/apache/aurora/client/api/test_task_util.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/api/test_task_util.py b/src/test/python/apache/aurora/client/api/test_task_util.py index eda326d..365ef59 100644 --- a/src/test/python/apache/aurora/client/api/test_task_util.py +++ b/src/test/python/apache/aurora/client/api/test_task_util.py @@ -14,10 +14,9 @@ import unittest -from mock import create_autospec +from mock import call, create_autospec -from apache.aurora.client.api.scheduler_mux import SchedulerMux -from apache.aurora.client.api.task_util import StatusMuxHelper +from apache.aurora.client.api.task_util import StatusHelper from ...api_util import SchedulerThriftApiSpec @@ -43,20 +42,14 @@ class TaskUtilTest(unittest.TestCase): return query @classmethod - def create_mux_helper(cls, scheduler, query, scheduler_mux=None): - return StatusMuxHelper(scheduler, query, scheduler_mux=scheduler_mux) + def create_helper(cls, scheduler, query): + return StatusHelper(scheduler, query) @classmethod def create_tasks(cls): return [ScheduledTask(assignedTask=AssignedTask(instanceId=index)) for index in cls.INSTANCES] @classmethod - def mock_mux(cls, tasks): - mux = create_autospec(spec=SchedulerMux, instance=True) - mux.enqueue_and_wait.return_value = tasks - return mux - - @classmethod def mock_scheduler(cls, response_code=None): scheduler = create_autospec(spec=SchedulerThriftApiSpec, instance=True) response_code = ResponseCode.OK if response_code is None else response_code @@ -65,22 +58,10 @@ class TaskUtilTest(unittest.TestCase): scheduler.getTasksWithoutConfigs.return_value = resp return scheduler - def test_no_mux_run(self): + def test_run(self): scheduler = self.mock_scheduler() - helper = self.create_mux_helper(scheduler, self.create_query) - tasks = helper.get_tasks(self.INSTANCES) - - scheduler.getTasksWithoutConfigs.assert_called_once_with(self.create_query(self.INSTANCES)) - assert 1 == len(tasks) - - def test_mux_run(self): - expected_tasks = self.create_tasks() - mux = self.mock_mux(expected_tasks) - helper = self.create_mux_helper(None, self.create_query, scheduler_mux=mux) + helper = self.create_helper(scheduler, self.create_query) tasks = helper.get_tasks(self.INSTANCES) - mux.enqueue_and_wait.assert_called_once_with( - helper._get_tasks, - self.INSTANCES, - helper._create_aggregated_query) + assert scheduler.getTasksWithoutConfigs.mock_calls == [call(self.create_query(self.INSTANCES))] assert 1 == len(tasks) http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/test/python/apache/aurora/client/cli/test_restart.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/cli/test_restart.py b/src/test/python/apache/aurora/client/cli/test_restart.py index 54de6cf..967d560 100644 --- a/src/test/python/apache/aurora/client/cli/test_restart.py +++ b/src/test/python/apache/aurora/client/cli/test_restart.py @@ -18,8 +18,7 @@ import pytest from mock import call, create_autospec, patch from twitter.common.contextutil import temporary_file -from apache.aurora.client.api.health_check import Retriable, StatusHealthCheck -from apache.aurora.client.api.restarter import RestartSettings +from apache.aurora.client.api.health_check import StatusHealthCheck from apache.aurora.client.cli import EXIT_API_ERROR, EXIT_INVALID_PARAMETER, Context from apache.aurora.client.cli.client import AuroraCommandLine from apache.aurora.client.cli.jobs import RestartCommand @@ -28,43 +27,10 @@ from apache.aurora.common.aurora_job_key import AuroraJobKey from .util import AuroraClientCommandTest, FakeAuroraCommandContext, IOMock, mock_verb_options -from gen.apache.aurora.api.ttypes import JobKey, PopulateJobResult, ResponseCode, Result, TaskConfig +from gen.apache.aurora.api.ttypes import JobKey, PopulateJobResult, Result, TaskConfig class TestRestartJobCommand(AuroraClientCommandTest): - - def test_restart_with_lock(self): - command = RestartCommand() - - jobkey = AuroraJobKey("cluster", "role", "env", "job") - mock_options = mock_verb_options(command) - mock_options.instance_spec = TaskInstanceKey(jobkey, []) - - fake_context = FakeAuroraCommandContext() - fake_context.set_options(mock_options) - - mock_api = fake_context.get_api("test") - mock_api.restart.return_value = AuroraClientCommandTest.create_blank_response( - ResponseCode.LOCK_ERROR, "Error.") - - with pytest.raises(Context.CommandError): - command.execute(fake_context) - - restart_settings = RestartSettings( - batch_size=mock_options.batch_size, - restart_threshold=mock_options.restart_threshold, - max_per_instance_failures=mock_options.max_per_instance_failures, - max_total_failures=mock_options.max_total_failures, - watch_secs=mock_options.watch_secs, - health_check_interval_seconds=mock_options.healthcheck_interval_seconds) - - mock_api.restart.assert_called_once_with( - jobkey, - mock_options.instance_spec.instance, - restart_settings, - config=None) - self.assert_lock_message(fake_context) - def test_restart_inactive_instance_spec(self): command = RestartCommand() @@ -122,15 +88,15 @@ class TestRestartCommand(AuroraClientCommandTest): return populate @classmethod - def setup_health_checks(cls, mock_api): + def setup_health_checks(cls): mock_health_check = create_autospec(spec=StatusHealthCheck, instance=True) - mock_health_check.health.return_value = Retriable.alive() + mock_health_check.health.return_value = True return mock_health_check def test_restart_simple(self): # Test the client-side restart logic in its simplest case: everything succeeds (mock_api, mock_scheduler_proxy) = self.create_mock_api() - mock_health_check = self.setup_health_checks(mock_api) + mock_health_check = self.setup_health_checks() self.setup_mock_scheduler_for_simple_restart(mock_api) with contextlib.nested( patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy), @@ -159,7 +125,7 @@ class TestRestartCommand(AuroraClientCommandTest): def test_restart_simple_no_config(self): # Test the client-side restart logic in its simplest case: everything succeeds (mock_api, mock_scheduler_proxy) = self.create_mock_api() - mock_health_check = self.setup_health_checks(mock_api) + mock_health_check = self.setup_health_checks() self.setup_mock_scheduler_for_simple_restart(mock_api) with contextlib.nested( patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy), @@ -180,7 +146,7 @@ class TestRestartCommand(AuroraClientCommandTest): # Test the client-side restart when a shard argument is too large, and it's # using strict mode. (mock_api, mock_scheduler_proxy) = self.create_mock_api() - mock_health_check = self.setup_health_checks(mock_api) + mock_health_check = self.setup_health_checks() self.setup_mock_scheduler_for_simple_restart(mock_api) with contextlib.nested( patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy), @@ -201,7 +167,7 @@ class TestRestartCommand(AuroraClientCommandTest): def test_restart_failed_status(self): (mock_api, mock_scheduler_proxy) = self.create_mock_api() - mock_health_check = self.setup_health_checks(mock_api) + mock_health_check = self.setup_health_checks() self.setup_mock_scheduler_for_simple_restart(mock_api) mock_scheduler_proxy.getTasksWithoutConfigs.return_value = self.create_error_response() with contextlib.nested( @@ -222,7 +188,7 @@ class TestRestartCommand(AuroraClientCommandTest): def test_restart_no_such_job_with_instances(self): (mock_api, mock_scheduler_proxy) = self.create_mock_api() - mock_health_check = self.setup_health_checks(mock_api) + mock_health_check = self.setup_health_checks() mock_io = IOMock() self.setup_mock_scheduler_for_simple_restart(mock_api) # Make getTasksWithoutConfigs return an error, which is what happens when a job is not found. @@ -254,7 +220,7 @@ class TestRestartCommand(AuroraClientCommandTest): def test_restart_failed_restart(self): (mock_api, mock_scheduler_proxy) = self.create_mock_api() - mock_health_check = self.setup_health_checks(mock_api) + mock_health_check = self.setup_health_checks() self.setup_mock_scheduler_for_simple_restart(mock_api) mock_scheduler_proxy.restartShards.return_value = self.create_error_response() with contextlib.nested( @@ -297,7 +263,7 @@ class TestRestartCommand(AuroraClientCommandTest): self.reset_mock_io() # Test the client-side restart logic in its simplest case: everything succeeds (mock_api, mock_scheduler_proxy) = self.create_mock_api() - mock_health_check = self.setup_health_checks(mock_api) + mock_health_check = self.setup_health_checks() self.setup_mock_scheduler_for_simple_restart(mock_api) with contextlib.nested( patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy), @@ -322,7 +288,7 @@ class TestRestartCommand(AuroraClientCommandTest): def test_restart_failed_restart_output(self): self.reset_mock_io() (mock_api, mock_scheduler_proxy) = self.create_mock_api() - mock_health_check = self.setup_health_checks(mock_api) + mock_health_check = self.setup_health_checks() self.setup_mock_scheduler_for_simple_restart(mock_api) mock_scheduler_proxy.restartShards.return_value = self.create_error_response() with contextlib.nested(
