Repository: aurora Updated Branches: refs/heads/master 03eff2838 -> 1b3b7f53e
Revert "Add min_consecutive_health_checks in HealthCheckConfig" This reverts commit ed72b1bf662d1e29d2bb483b317c787630c26a9e. Revert "Add support for receiving min_consecutive_successes in health checker" This reverts commit e91130e49445c3933b6e27f5fde18c3a0e61b87a. Revert "Modify executor state transition logic to rely on health checks (if enabled)." This reverts commit ca683cb9e27bae76424a687bc6c3af5a73c501b9. Bugs closed: AURORA-1793 Reviewed at https://reviews.apache.org/r/52806/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/1b3b7f53 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/1b3b7f53 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/1b3b7f53 Branch: refs/heads/master Commit: 1b3b7f53e9542adf8d847d8d159af830b4d79437 Parents: 03eff28 Author: David McLaughlin <da...@dmclaughlin.com> Authored: Wed Oct 12 15:54:20 2016 -0700 Committer: David McLaughlin <dmclaugh...@twitter.com> Committed: Wed Oct 12 15:54:20 2016 -0700 ---------------------------------------------------------------------- RELEASE-NOTES.md | 5 -- docs/features/job-updates.md | 26 +------- docs/reference/configuration.md | 3 +- .../apache/aurora/client/api/updater_util.py | 4 +- src/main/python/apache/aurora/client/config.py | 31 ++------- .../python/apache/aurora/config/schema/base.py | 1 - .../apache/aurora/executor/aurora_executor.py | 46 ++++---------- .../aurora/executor/common/health_checker.py | 62 +++++------------- .../aurora/executor/common/status_checker.py | 29 ++++----- .../apache/aurora/executor/status_manager.py | 33 ++-------- .../python/apache/aurora/client/test_config.py | 31 +++++---- .../executor/common/test_health_checker.py | 67 +++++++------------- .../executor/common/test_status_checker.py | 37 +---------- .../aurora/executor/test_status_manager.py | 28 ++++---- .../aurora/executor/test_thermos_executor.py | 44 +++---------- 15 files changed, 126 insertions(+), 321 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/1b3b7f53/RELEASE-NOTES.md ---------------------------------------------------------------------- diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index f3dd8bb..03349b0 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -2,11 +2,6 @@ ========================= ### New/updated: - -- Aurora scheduler job updater can now rely on health check status rather than `watch_secs` timeout - when deciding an individual instance update state. This will potentially speed up updates as the - `minWaitInInstanceRunningMs` will no longer have to be chosen based on the worst observed instance - startup/warmup delay but rather as a desired health check duration. - A task's tier is now mapped to a label on the Mesos `TaskInfo` proto. - The Aurora client is now using the Thrift binary protocol to communicate with the scheduler. http://git-wip-us.apache.org/repos/asf/aurora/blob/1b3b7f53/docs/features/job-updates.md ---------------------------------------------------------------------- diff --git a/docs/features/job-updates.md b/docs/features/job-updates.md index c4ec42e..792f2ae 100644 --- a/docs/features/job-updates.md +++ b/docs/features/job-updates.md @@ -49,29 +49,9 @@ and performing these operations: the new config instance. The Aurora client continues through the instance list until all tasks are -updated. If the client determines the update is not going well (a percentage -of health checks have failed), it cancels the update. - -Currently, the scheduler job updater uses two mechanisms to determine when -to stop monitoring instance update state: a time-based grace interval and health -check status. - -Job updates with health checks disabled (e.g. no âhealthâ port is defined -in .aurora portmap) will rely on a time-based grace interval called [watch_secs] -(../reference/configuration.md#updateconfig-objects). -An instance will start executing task content when reaching `STARTING` -state. Once the task sandbox is created, the instance is moved into `RUNNING` -state. Afterward, the job updater will start the watch_secs countdown to ensure -an instance is healthy, and then complete the update. - -Job updates with health check enabled will rely on health check status. When instance -reaching `STARTING` state, health checks are performed periodically by the executor -to ensure the instance is healthy. An instance is moved into `RUNNING` state only if -a minimum number of consecutive successful health checks are performed -during the initial warmup period (defined by [initial_interval_secs] -(../reference/configuration.md#healthcheckconfig-objects)). If watch_secs is -set as zero, the scheduler job updater will complete the update immediately. -Otherwise, it will complete the update after the watch_secs expires. +updated, in `RUNNING,` and healthy for a configurable amount of time. +If the client determines the update is not going well (a percentage of health +checks have failed), it cancels the update. Update cancellation runs a procedure similar to the described above update sequence, but in reverse order. New instance configs are swapped http://git-wip-us.apache.org/repos/asf/aurora/blob/1b3b7f53/docs/reference/configuration.md ---------------------------------------------------------------------- diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index 71d2ce5..f2a0b18 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -379,10 +379,9 @@ Parameters for controlling a task's health checks via HTTP or a shell command. | param | type | description | ------- | :-------: | -------- | ```health_checker``` | HealthCheckerConfig | Configure what kind of health check to use. -| ```initial_interval_secs``` | Integer | Initial grace period for performing health checks. (Default: 15) +| ```initial_interval_secs``` | Integer | Initial delay for performing a health check. (Default: 15) | ```interval_secs``` | Integer | Interval on which to check the task's health. (Default: 10) | ```max_consecutive_failures``` | Integer | Maximum number of consecutive failures that will be tolerated before considering a task unhealthy (Default: 0) -| ```min_consecutive_successes``` | Integer | Minimum number of consecutive successful health checks required before considering a task healthy (Default: 1) | ```timeout_secs``` | Integer | Health check timeout. (Default: 1) ### HealthCheckerConfig Objects http://git-wip-us.apache.org/repos/asf/aurora/blob/1b3b7f53/src/main/python/apache/aurora/client/api/updater_util.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/api/updater_util.py b/src/main/python/apache/aurora/client/api/updater_util.py index ebeddab..c649316 100644 --- a/src/main/python/apache/aurora/client/api/updater_util.py +++ b/src/main/python/apache/aurora/client/api/updater_util.py @@ -35,8 +35,8 @@ class UpdaterConfig(object): if batch_size <= 0: raise ValueError('Batch size should be greater than 0') - if watch_secs < 0: - raise ValueError('Watch seconds should not be negative') + if watch_secs <= 0: + raise ValueError('Watch seconds should be greater than 0') if pulse_interval_secs is not None and pulse_interval_secs < self.MIN_PULSE_INTERVAL_SECONDS: raise ValueError('Pulse interval seconds must be at least %s seconds.' % self.MIN_PULSE_INTERVAL_SECONDS) http://git-wip-us.apache.org/repos/asf/aurora/blob/1b3b7f53/src/main/python/apache/aurora/client/config.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/config.py b/src/main/python/apache/aurora/client/config.py index ce4bffe..0186af5 100644 --- a/src/main/python/apache/aurora/client/config.py +++ b/src/main/python/apache/aurora/client/config.py @@ -92,31 +92,15 @@ health check interval (%d seconds) plus %d consecutive failures at a check inter ''' -INITIAL_INTERVAL_SECS_INSUFFICIENT_ERROR_FORMAT = ''' -You have specified an insufficiently short initial interval period (%d seconds) -in your health check configuration. Your health check will always fail. In order for -the health check to pass, HealthCheckConfig.initial_interval_secs must be greater -than the duration of %d consecutive successful health checks at a check interval -of %d seconds. You can either increase initial_interval_secs, decrease interval_secs -or decrease min_consecutive_successes. -''' - - -INVALID_MIN_CONSECUTIVE_SUCCESSES_ERROR = ''' -You have specified an invalid min_consecutive_successes value (%d) in your health check -configuration. Your health check will always succeed. In order for the updater to detect -health check failures, HealthCheckConfig.min_consecutive_successes must be a positive value. -''' - - def _validate_update_config(config): job_size = config.instances() update_config = config.update_config() health_check_config = config.health_check_config() max_failures = update_config.max_total_failures().get() + watch_secs = update_config.watch_secs().get() initial_interval_secs = health_check_config.initial_interval_secs().get() - min_consecutive_successes = health_check_config.min_consecutive_successes().get() + max_consecutive_failures = health_check_config.max_consecutive_failures().get() interval_secs = health_check_config.interval_secs().get() if max_failures >= job_size: @@ -127,13 +111,10 @@ def _validate_update_config(config): if max_failures < min_failure_threshold: die(UPDATE_CONFIG_DEDICATED_THRESHOLD_ERROR % (job_size, min_failure_threshold)) - if min_consecutive_successes <= 0: - die(INVALID_MIN_CONSECUTIVE_SUCCESSES_ERROR % min_consecutive_successes) - - target_initial_interval_secs = interval_secs * min_consecutive_successes - if initial_interval_secs <= target_initial_interval_secs: - die(INITIAL_INTERVAL_SECS_INSUFFICIENT_ERROR_FORMAT % - (initial_interval_secs, min_consecutive_successes, interval_secs)) + target_watch = initial_interval_secs + (max_consecutive_failures * interval_secs) + if watch_secs <= target_watch: + die(WATCH_SECS_INSUFFICIENT_ERROR_FORMAT % + (watch_secs, target_watch, initial_interval_secs, max_consecutive_failures, interval_secs)) PRODUCTION_DEPRECATED_WARNING = ( http://git-wip-us.apache.org/repos/asf/aurora/blob/1b3b7f53/src/main/python/apache/aurora/config/schema/base.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/config/schema/base.py b/src/main/python/apache/aurora/config/schema/base.py index baea660..8451630 100644 --- a/src/main/python/apache/aurora/config/schema/base.py +++ b/src/main/python/apache/aurora/config/schema/base.py @@ -60,7 +60,6 @@ class HealthCheckConfig(Struct): initial_interval_secs = Default(Float, 15.0) interval_secs = Default(Float, 10.0) max_consecutive_failures = Default(Integer, 0) - min_consecutive_successes = Default(Integer, 1) timeout_secs = Default(Float, 1.0) http://git-wip-us.apache.org/repos/asf/aurora/blob/1b3b7f53/src/main/python/apache/aurora/executor/aurora_executor.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/aurora_executor.py b/src/main/python/apache/aurora/executor/aurora_executor.py index d30f2d3..ce5ef68 100644 --- a/src/main/python/apache/aurora/executor/aurora_executor.py +++ b/src/main/python/apache/aurora/executor/aurora_executor.py @@ -22,8 +22,6 @@ from twitter.common.concurrent import Timeout, deadline, defer from twitter.common.metrics import Observable from twitter.common.quantity import Amount, Time -from apache.aurora.executor.common.health_checker import HealthChecker - from .common.kill_manager import KillManager from .common.sandbox import DefaultSandboxProvider from .common.status_checker import ChainedStatusChecker @@ -96,7 +94,7 @@ class AuroraExecutor(ExecutorBase, Observable): - Set up necessary HealthCheckers - Set up StatusManager, and attach HealthCheckers """ - self.send_update(driver, self._task_id, mesos_pb2.TASK_STARTING, 'Starting task execution.') + self.send_update(driver, self._task_id, mesos_pb2.TASK_STARTING, 'Initializing sandbox.') if not self._initialize_sandbox(driver, assigned_task, mounted_volume_paths): return @@ -117,27 +115,10 @@ class AuroraExecutor(ExecutorBase, Observable): if not self._start_runner(driver, assigned_task): return - is_health_check_enabled = False - status_checkers = [] - try: - for status_provider in self._status_providers: - status_checker = status_provider.from_assigned_task(assigned_task, self._sandbox) - if status_checker is None: - continue - else: - if isinstance(status_checker, HealthChecker): - is_health_check_enabled = True - status_checkers.append(status_checker) - except Exception as e: - log.error(traceback.format_exc()) - self._die(driver, mesos_pb2.TASK_FAILED, "Failed to set up status checker: %s" % e) - return - - if not is_health_check_enabled: - self.send_update(driver, self._task_id, mesos_pb2.TASK_RUNNING) + self.send_update(driver, self._task_id, mesos_pb2.TASK_RUNNING) try: - self._start_status_manager(status_checkers) + self._start_status_manager(driver, assigned_task) except Exception: log.error(traceback.format_exc()) self._die(driver, mesos_pb2.TASK_FAILED, "Internal error") @@ -181,9 +162,15 @@ class AuroraExecutor(ExecutorBase, Observable): return True - def _start_status_manager(self, status_checkers): - status_checkers = [self._kill_manager] + status_checkers - for status_checker in status_checkers: + def _start_status_manager(self, driver, assigned_task): + status_checkers = [self._kill_manager] + self.metrics.register_observable(self._kill_manager.name(), self._kill_manager) + + for status_provider in self._status_providers: + status_checker = status_provider.from_assigned_task(assigned_task, self._sandbox) + if status_checker is None: + continue + status_checkers.append(status_checker) self.metrics.register_observable(status_checker.name(), status_checker) self._chained_checker = ChainedStatusChecker(status_checkers) @@ -191,12 +178,8 @@ class AuroraExecutor(ExecutorBase, Observable): # chain the runner to the other checkers, but do not chain .start()/.stop() complete_checker = ChainedStatusChecker([self._runner, self._chained_checker]) - self._status_manager = self._status_manager_class( - complete_checker, - running=self._running, - shutdown=self._shutdown, - clock=self._clock) + complete_checker, self._shutdown, clock=self._clock) self._status_manager.start() self.status_manager_started.set() @@ -214,9 +197,6 @@ class AuroraExecutor(ExecutorBase, Observable): self.log('Activating kill manager.') self._kill_manager.kill(reason) - def _running(self, status_result): - self.send_update(self._driver, self._task_id, status_result.status, status_result.reason) - def _shutdown(self, status_result): runner_status = self._runner.status http://git-wip-us.apache.org/repos/asf/aurora/blob/1b3b7f53/src/main/python/apache/aurora/executor/common/health_checker.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/common/health_checker.py b/src/main/python/apache/aurora/executor/common/health_checker.py index 1e0be10..3c7c09d 100644 --- a/src/main/python/apache/aurora/executor/common/health_checker.py +++ b/src/main/python/apache/aurora/executor/common/health_checker.py @@ -18,7 +18,7 @@ import threading import time import traceback -from mesos.interface import mesos_pb2 +from mesos.interface.mesos_pb2 import TaskState from pystachio import Environment, String from twitter.common import log from twitter.common.exceptions import ExceptionalThread @@ -51,7 +51,6 @@ class ThreadedHealthChecker(ExceptionalThread): interval_secs, initial_interval_secs, max_consecutive_failures, - min_consecutive_successes, clock): """ :param health_checker: health checker to confirm service health @@ -60,12 +59,10 @@ class ThreadedHealthChecker(ExceptionalThread): :type sandbox: DirectorySandbox :param interval_secs: delay between checks :type interval_secs: int - :param initial_interval_secs: seconds to wait before marking health check passed + :param initial_interval_secs: seconds to wait before starting checks :type initial_interval_secs: int :param max_consecutive_failures: number of failures to allow before marking dead :type max_consecutive_failures: int - :param min_consecutive_successes: number of successes before marking health check passed - :type min_consecutive_successes: int :param clock: time module available to be mocked for testing :type clock: time module """ @@ -73,16 +70,11 @@ class ThreadedHealthChecker(ExceptionalThread): self.sandbox = sandbox self.clock = clock self.current_consecutive_failures = 0 - self.current_consecutive_successes = 0 self.dead = threading.Event() self.interval = interval_secs self.max_consecutive_failures = max_consecutive_failures - self.min_consecutive_successes = min_consecutive_successes self.snooze_file = None self.snoozed = False - self._expired = False - self.start_time = 0 - self.health_check_passed = False if self.sandbox and self.sandbox.exists(): self.snooze_file = os.path.join(self.sandbox.root, '.healthchecksnooze') @@ -92,8 +84,10 @@ class ThreadedHealthChecker(ExceptionalThread): else: self.initial_interval = interval_secs * 2 - self.healthy, self.reason = True, None - + if self.initial_interval > 0: + self.healthy, self.reason = True, None + else: + self.healthy, self.reason = self._perform_check_if_not_disabled() super(ThreadedHealthChecker, self).__init__() self.daemon = True @@ -116,35 +110,21 @@ class ThreadedHealthChecker(ExceptionalThread): def _maybe_update_failure_count(self, is_healthy, reason): if not is_healthy: log.warning('Health check failure: %s' % reason) - self.reason = reason - if self.current_consecutive_successes > 0: - log.debug('Reset consecutive successes counter.') - self.current_consecutive_successes = 0 self.current_consecutive_failures += 1 + if self.current_consecutive_failures > self.max_consecutive_failures: + log.warning('Reached consecutive failure limit.') + self.healthy = False + self.reason = reason else: if self.current_consecutive_failures > 0: log.debug('Reset consecutive failures counter.') self.current_consecutive_failures = 0 - self.current_consecutive_successes += 1 - - if not self._expired: - if self.clock.time() - self.start_time > self.initial_interval: - log.debug('Initial interval expired.') - self._expired = True - if not self.health_check_passed: - log.warning('Failed to reach minimum consecutive successes.') - self.healthy = False - else: - if self.current_consecutive_successes >= self.min_consecutive_successes: - log.info('Reached minimum consecutive successes.') - self.health_check_passed = True - - if self._expired and self.healthy: - self.healthy = self.current_consecutive_failures <= self.max_consecutive_failures def run(self): log.debug('Health checker thread started.') - self.start_time = self.clock.time() + if self.initial_interval > 0: + self.clock.sleep(self.initial_interval) + log.debug('Initial interval expired.') while not self.dead.is_set(): is_healthy, reason = self._perform_check_if_not_disabled() self._maybe_update_failure_count(is_healthy, reason) @@ -168,8 +148,6 @@ class HealthChecker(StatusChecker): Exported metrics: health_checker.consecutive_failures: Number of consecutive failures observed. Resets to zero on successful health check. - health_checker.consecutive_successes: Number of consecutive successes observed. Resets - to zero on failed health check. health_checker.snoozed: Returns 1 if the health checker is snoozed, 0 if not. health_checker.total_latency_secs: Total time waiting for the health checker to respond in seconds. To get average latency, use health_checker.total_latency / health_checker.checks. @@ -182,7 +160,6 @@ class HealthChecker(StatusChecker): interval_secs=10, initial_interval_secs=None, max_consecutive_failures=0, - min_consecutive_successes=1, clock=time): self._health_checks = 0 self._total_latency = 0 @@ -194,12 +171,9 @@ class HealthChecker(StatusChecker): interval_secs, initial_interval_secs, max_consecutive_failures, - min_consecutive_successes, clock) self.metrics.register(LambdaGauge('consecutive_failures', lambda: self.threaded_health_checker.current_consecutive_failures)) - self.metrics.register(LambdaGauge('consecutive_successes', - lambda: self.threaded_health_checker.current_consecutive_successes)) self.metrics.register(LambdaGauge('snoozed', lambda: int(self.threaded_health_checker.snoozed))) self.metrics.register(LambdaGauge('total_latency_secs', lambda: self._total_latency)) self.metrics.register(LambdaGauge('checks', lambda: self._health_checks)) @@ -218,12 +192,9 @@ class HealthChecker(StatusChecker): @property def status(self): - if self.threaded_health_checker.healthy: - if self.threaded_health_checker.health_check_passed: - return StatusResult('Task is healthy.', mesos_pb2.TASK_RUNNING) - else: + if not self.threaded_health_checker.healthy: return StatusResult('Failed health check! %s' % self.threaded_health_checker.reason, - mesos_pb2.TASK_FAILED) + TaskState.Value('TASK_FAILED')) def name(self): return 'health_checker' @@ -331,7 +302,6 @@ class HealthCheckerProvider(StatusCheckerProvider): sandbox, interval_secs=health_check_config.get('interval_secs'), initial_interval_secs=health_check_config.get('initial_interval_secs'), - max_consecutive_failures=health_check_config.get('max_consecutive_failures'), - min_consecutive_successes=health_check_config.get('min_consecutive_successes')) + max_consecutive_failures=health_check_config.get('max_consecutive_failures')) return health_checker http://git-wip-us.apache.org/repos/asf/aurora/blob/1b3b7f53/src/main/python/apache/aurora/executor/common/status_checker.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/common/status_checker.py b/src/main/python/apache/aurora/executor/common/status_checker.py index fccb69a..795dae2 100644 --- a/src/main/python/apache/aurora/executor/common/status_checker.py +++ b/src/main/python/apache/aurora/executor/common/status_checker.py @@ -14,7 +14,7 @@ from abc import abstractmethod, abstractproperty -from mesos.interface import mesos_pb2 +from mesos.interface.mesos_pb2 import TaskState from twitter.common import log from twitter.common.lang import Interface from twitter.common.metrics import Observable @@ -31,7 +31,7 @@ class StatusResult(object): def __init__(self, reason, status): self._reason = reason - if status not in mesos_pb2.TaskState.values(): + if status not in TaskState.values(): raise ValueError('Unknown task state: %r' % status) self._status = status @@ -47,7 +47,7 @@ class StatusResult(object): return '%s(%r, status=%r)' % ( self.__class__.__name__, self._reason, - mesos_pb2.TaskState.Name(self._status)) + TaskState.Name(self._status)) class StatusChecker(Observable, Interface): @@ -85,25 +85,24 @@ class Healthy(StatusChecker): class ChainedStatusChecker(StatusChecker): def __init__(self, status_checkers): self._status_checkers = status_checkers + self._status = None if not all(isinstance(h_i, StatusChecker) for h_i in status_checkers): raise TypeError('ChainedStatusChecker must take an iterable of StatusCheckers.') super(ChainedStatusChecker, self).__init__() @property def status(self): - # Do not memoize the status so that faluires occur in RUNNING state won't be masked. - status = None - for status_checker in self._status_checkers: - status_checker_status = status_checker.status - if status_checker_status is not None: - log.info('%s reported %s' % (status_checker.__class__.__name__, status_checker_status)) - if not isinstance(status_checker_status, StatusResult): - raise TypeError('StatusChecker returned something other than a StatusResult: got %s' % - type(status_checker_status)) - status = status_checker_status - if status_checker_status.status is not mesos_pb2.TASK_RUNNING: + if self._status is None: + for status_checker in self._status_checkers: + status_checker_status = status_checker.status + if status_checker_status is not None: + log.info('%s reported %s' % (status_checker.__class__.__name__, status_checker_status)) + if not isinstance(status_checker_status, StatusResult): + raise TypeError('StatusChecker returned something other than a StatusResult: got %s' % + type(status_checker_status)) + self._status = status_checker_status break - return status + return self._status def start(self): for status_checker in self._status_checkers: http://git-wip-us.apache.org/repos/asf/aurora/blob/1b3b7f53/src/main/python/apache/aurora/executor/status_manager.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/status_manager.py b/src/main/python/apache/aurora/executor/status_manager.py index 26aea23..228a99a 100644 --- a/src/main/python/apache/aurora/executor/status_manager.py +++ b/src/main/python/apache/aurora/executor/status_manager.py @@ -14,7 +14,6 @@ import time -from mesos.interface import mesos_pb2 from twitter.common import log from twitter.common.exceptions import ExceptionalThread from twitter.common.quantity import Amount, Time @@ -32,43 +31,23 @@ class StatusManager(ExceptionalThread): """ POLL_WAIT = Amount(500, Time.MILLISECONDS) - def __init__(self, status_checker, running, shutdown, clock=time): - """ - :param status_checker: status checker to check the task status - :type status_checker: ChainedStatusChecker - :param running: callback function that handles the RUNNING task status - :type running: callable - :param shutdown: callback function that handles the terminal task status - :type shutdown: callable - :param clock: time module available to be mocked for testing - :type clock: time modulet - """ + def __init__(self, status_checker, callback, clock=time): if not isinstance(status_checker, StatusChecker): raise TypeError('status_checker must be a StatusChecker, got %s' % type(status_checker)) - if not callable(running): - raise TypeError('running callback needs to be callable!') - if not callable(shutdown): - raise TypeError('shutdown callback needs to be callable!') + if not callable(callback): + raise TypeError('callback needs to be callable!') self._status_checker = status_checker - self._running = running - self._shutdown = shutdown + self._callback = callback self._clock = clock super(StatusManager, self).__init__() self.daemon = True - self.is_task_running = False def run(self): while True: status_result = self._status_checker.status if status_result is not None: log.info('Status manager got %s' % status_result) - if status_result.status is mesos_pb2.TASK_RUNNING: - if not self.is_task_running: - self._running(status_result) - self.is_task_running = True - self._clock.sleep(self.POLL_WAIT.as_(Time.SECONDS)) - else: - self._shutdown(status_result) - break + self._callback(status_result) + break else: self._clock.sleep(self.POLL_WAIT.as_(Time.SECONDS)) http://git-wip-us.apache.org/repos/asf/aurora/blob/1b3b7f53/src/test/python/apache/aurora/client/test_config.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/test_config.py b/src/test/python/apache/aurora/client/test_config.py index ff46558..5cf68a5 100644 --- a/src/test/python/apache/aurora/client/test_config.py +++ b/src/test/python/apache/aurora/client/test_config.py @@ -24,7 +24,15 @@ from apache.aurora.client.config import get_config as get_aurora_config from apache.aurora.client.config import PRODUCTION_DEPRECATED_WARNING from apache.aurora.config import AuroraConfig from apache.aurora.config.loader import AuroraConfigLoader -from apache.aurora.config.schema.base import MB, Announcer, HealthCheckConfig, Job, Resources, Task +from apache.aurora.config.schema.base import ( + MB, + Announcer, + HealthCheckConfig, + Job, + Resources, + Task, + UpdateConfig +) from apache.thermos.config.schema_base import Process MESOS_CONFIG_BASE = """ @@ -184,21 +192,21 @@ def test_update_config_passes_with_default_values(): config._validate_update_config(AuroraConfig(base_job)) -def test_health_check_config_fails_insufficient_initital_interval_secs_less_than_target(): +def test_update_config_passes_with_min_requirement_values(): base_job = Job( name='hello_world', role='john_doe', cluster='test-cluster', - health_check_config=HealthCheckConfig(initial_interval_secs=5), + update_config=UpdateConfig(watch_secs=26), + health_check_config=HealthCheckConfig(max_consecutive_failures=1), task=Task(name='main', processes=[], resources=Resources(cpu=0.1, ram=64 * MB, disk=64 * MB))) - with pytest.raises(SystemExit): - config._validate_update_config(AuroraConfig(base_job)) + config._validate_update_config(AuroraConfig(base_job)) -def test_health_check_config_fails_insufficient_initital_interval_secs_equal_to_target(): +def test_update_config_fails_insufficient_watch_secs_less_than_target(): base_job = Job( name='hello_world', role='john_doe', cluster='test-cluster', - health_check_config=HealthCheckConfig(initial_interval_secs=10), + update_config=UpdateConfig(watch_secs=10), task=Task(name='main', processes=[], resources=Resources(cpu=0.1, ram=64 * MB, disk=64 * MB))) @@ -206,15 +214,16 @@ def test_health_check_config_fails_insufficient_initital_interval_secs_equal_to_ config._validate_update_config(AuroraConfig(base_job)) -def test_health_check_config_passes_with_min_requirement_values(): +def test_update_config_fails_insufficient_watch_secs_equal_to_target(): base_job = Job( name='hello_world', role='john_doe', cluster='test-cluster', - health_check_config=HealthCheckConfig(initial_interval_secs=21, - min_consecutive_successes=2), + update_config=UpdateConfig(watch_secs=25), + health_check_config=HealthCheckConfig(max_consecutive_failures=1), task=Task(name='main', processes=[], resources=Resources(cpu=0.1, ram=64 * MB, disk=64 * MB))) - config._validate_update_config(AuroraConfig(base_job)) + with pytest.raises(SystemExit): + config._validate_update_config(AuroraConfig(base_job)) def test_validate_deprecated_config_adds_warning_for_production(): http://git-wip-us.apache.org/repos/asf/aurora/blob/1b3b7f53/src/test/python/apache/aurora/executor/common/test_health_checker.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/common/test_health_checker.py b/src/test/python/apache/aurora/executor/common/test_health_checker.py index 28769dc..da0c56c 100644 --- a/src/test/python/apache/aurora/executor/common/test_health_checker.py +++ b/src/test/python/apache/aurora/executor/common/test_health_checker.py @@ -20,7 +20,7 @@ import unittest import mock import pytest -from mesos.interface import mesos_pb2 +from mesos.interface.mesos_pb2 import TaskState from twitter.common.exceptions import ExceptionalThread from twitter.common.testing.clock import ThreadedClock @@ -59,28 +59,23 @@ class TestHealthChecker(unittest.TestCase): self.fake_health_checks.append((status, 'reason')) def test_initial_interval_2x(self): - self.append_health_checks(False, 2) - self.append_health_checks(True, 1) - self.append_health_checks(False, 1) + self.append_health_checks(False) hct = HealthChecker(self._checker.health, interval_secs=5, clock=self._clock) hct.start() assert self._clock.converge(threads=[hct.threaded_health_checker]) - self._clock.assert_waiting(hct.threaded_health_checker, amount=5) + self._clock.assert_waiting(hct.threaded_health_checker, 10) assert hct.status is None - self._clock.tick(5) + self._clock.tick(6) assert self._clock.converge(threads=[hct.threaded_health_checker]) - self._clock.assert_waiting(hct.threaded_health_checker, amount=5) assert hct.status is None - self._clock.tick(5) + self._clock.tick(3) assert self._clock.converge(threads=[hct.threaded_health_checker]) - self._clock.assert_waiting(hct.threaded_health_checker, amount=5) - assert hct.status.status == mesos_pb2.TASK_RUNNING + assert hct.status is None self._clock.tick(5) assert self._clock.converge(threads=[hct.threaded_health_checker]) - self._clock.assert_waiting(hct.threaded_health_checker, amount=5) - assert hct.status.status == mesos_pb2.TASK_FAILED + assert hct.status.status == TaskState.Value('TASK_FAILED') hct.stop() - assert self._checker.health.call_count == 4 + assert self._checker.health.call_count == 1 def test_initial_interval_whatev(self): self.append_health_checks(False, 2) @@ -92,11 +87,7 @@ class TestHealthChecker(unittest.TestCase): hct.start() self._clock.converge(threads=[hct.threaded_health_checker]) self._clock.assert_waiting(hct.threaded_health_checker, amount=5) - assert hct.status is None - self._clock.tick(5) - self._clock.converge(threads=[hct.threaded_health_checker]) - self._clock.assert_waiting(hct.threaded_health_checker, amount=5) - assert hct.status.status == mesos_pb2.TASK_FAILED + assert hct.status.status == TaskState.Value('TASK_FAILED') hct.stop() # this is an implementation detail -- we healthcheck in the initializer and # healthcheck in the run loop. if we ever change the implementation, expect @@ -120,36 +111,38 @@ class TestHealthChecker(unittest.TestCase): self._clock.converge(threads=[hct.threaded_health_checker]) # 2 consecutive health check failures followed by a successful health check. + epsilon = 0.001 + self._clock.tick(initial_interval_secs + epsilon) self._clock.converge(threads=[hct.threaded_health_checker]) self._clock.assert_waiting(hct.threaded_health_checker, amount=1) assert hct.status is None assert hct.metrics.sample()['consecutive_failures'] == 1 - self._clock.tick(interval_secs) + self._clock.tick(interval_secs + epsilon) self._clock.converge(threads=[hct.threaded_health_checker]) self._clock.assert_waiting(hct.threaded_health_checker, amount=1) assert hct.status is None assert hct.metrics.sample()['consecutive_failures'] == 2 - self._clock.tick(interval_secs) + self._clock.tick(interval_secs + epsilon) self._clock.converge(threads=[hct.threaded_health_checker]) self._clock.assert_waiting(hct.threaded_health_checker, amount=1) - assert hct.status.status == mesos_pb2.TASK_RUNNING + assert hct.status is None assert hct.metrics.sample()['consecutive_failures'] == 0 # 3 consecutive health check failures. - self._clock.tick(interval_secs) + self._clock.tick(interval_secs + epsilon) self._clock.converge(threads=[hct.threaded_health_checker]) self._clock.assert_waiting(hct.threaded_health_checker, amount=1) - assert hct.status.status == mesos_pb2.TASK_RUNNING + assert hct.status is None assert hct.metrics.sample()['consecutive_failures'] == 1 - self._clock.tick(interval_secs) + self._clock.tick(interval_secs + epsilon) self._clock.converge(threads=[hct.threaded_health_checker]) self._clock.assert_waiting(hct.threaded_health_checker, amount=1) - assert hct.status.status == mesos_pb2.TASK_RUNNING + assert hct.status is None assert hct.metrics.sample()['consecutive_failures'] == 2 - self._clock.tick(interval_secs) + self._clock.tick(interval_secs + epsilon) self._clock.converge(threads=[hct.threaded_health_checker]) self._clock.assert_waiting(hct.threaded_health_checker, amount=1) - assert hct.status.status == mesos_pb2.TASK_FAILED + assert hct.status.status == TaskState.Value('TASK_FAILED') assert hct.metrics.sample()['consecutive_failures'] == 3 hct.stop() assert self._checker.health.call_count == 6 @@ -446,10 +439,9 @@ class TestThreadedHealthChecker(unittest.TestCase): self.sandbox.exists.return_value = True self.sandbox.root = '/root' - self.initial_interval_secs = 2 - self.interval_secs = 1 + self.initial_interval_secs = 1 + self.interval_secs = 5 self.max_consecutive_failures = 2 - self.min_consecutive_successes = 1 self.clock = mock.Mock(spec=time) self.clock.time.return_value = 1.0 self.health_checker = HealthChecker( @@ -458,7 +450,6 @@ class TestThreadedHealthChecker(unittest.TestCase): self.interval_secs, self.initial_interval_secs, self.max_consecutive_failures, - self.min_consecutive_successes, self.clock) self.health_checker_sandbox_exists = HealthChecker( self.health, @@ -466,7 +457,6 @@ class TestThreadedHealthChecker(unittest.TestCase): self.interval_secs, self.initial_interval_secs, self.max_consecutive_failures, - self.min_consecutive_successes, self.clock) def test_perform_check_if_not_disabled_snooze_file_is_none(self): @@ -501,28 +491,17 @@ class TestThreadedHealthChecker(unittest.TestCase): hc = self.health_checker.threaded_health_checker assert hc.current_consecutive_failures == 0 - assert hc.current_consecutive_successes == 0 - assert hc.healthy is True - - hc._maybe_update_failure_count(False, 'reason') - assert hc.current_consecutive_failures == 1 - assert hc.current_consecutive_successes == 0 assert hc.healthy is True hc._maybe_update_failure_count(True, 'reason') assert hc.current_consecutive_failures == 0 - assert hc.current_consecutive_successes == 1 - assert hc.healthy is True - hc._expired = True hc._maybe_update_failure_count(False, 'reason') assert hc.current_consecutive_failures == 1 - assert hc.current_consecutive_successes == 0 assert hc.healthy is True hc._maybe_update_failure_count(False, 'reason') assert hc.current_consecutive_failures == 2 - assert hc.current_consecutive_successes == 0 assert hc.healthy is True hc._maybe_update_failure_count(False, 'reason') @@ -538,7 +517,7 @@ class TestThreadedHealthChecker(unittest.TestCase): liveness = [False, False, True] self.health_checker.threaded_health_checker.dead.is_set.side_effect = lambda: liveness.pop(0) self.health_checker.threaded_health_checker.run() - assert self.clock.sleep.call_count == 2 + assert self.clock.sleep.call_count == 3 assert mock_maybe_update_failure_count.call_count == 2 @mock.patch('apache.aurora.executor.common.health_checker.ExceptionalThread.start', http://git-wip-us.apache.org/repos/asf/aurora/blob/1b3b7f53/src/test/python/apache/aurora/executor/common/test_status_checker.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/common/test_status_checker.py b/src/test/python/apache/aurora/executor/common/test_status_checker.py index 8942639..5be1981 100644 --- a/src/test/python/apache/aurora/executor/common/test_status_checker.py +++ b/src/test/python/apache/aurora/executor/common/test_status_checker.py @@ -14,7 +14,7 @@ import threading -from mesos.interface import mesos_pb2 +from mesos.interface.mesos_pb2 import TaskState from apache.aurora.executor.common.status_checker import ( ChainedStatusChecker, @@ -62,42 +62,11 @@ def test_chained_health_interface(): assert si.started.is_set() assert chained_si.status is None - reason = StatusResult('derp', mesos_pb2.TASK_FAILED) + reason = StatusResult('derp', TaskState.Value('TASK_FAILED')) si2.set_status(reason) assert chained_si.status == reason assert chained_si.status.reason == 'derp' - assert mesos_pb2.TaskState.Name(chained_si.status.status) == 'TASK_FAILED' - - for si in (si1, si2): - assert not si.stopped.is_set() - chained_si.stop() - for si in (si1, si2): - assert si.stopped.is_set() - - # A task may fail after transition into RUNNING state. We need to test - # the status is not memoized in ChainedStatusChecker. - si1 = EventHealth() - si2 = EventHealth() - chained_si = ChainedStatusChecker([si1, si2]) - - for si in (si1, si2): - assert not si.started.is_set() - chained_si.start() - for si in (si1, si2): - assert si.started.is_set() - - assert chained_si.status is None - reason2 = StatusResult('Task is healthy.', mesos_pb2.TASK_RUNNING) - si2.set_status(reason2) - assert chained_si.status == reason2 - assert chained_si.status.reason == 'Task is healthy.' - assert mesos_pb2.TaskState.Name(chained_si.status.status) == 'TASK_RUNNING' - - reason1 = StatusResult('derp', mesos_pb2.TASK_FAILED) - si1.set_status(reason1) - assert chained_si.status == reason1 - assert chained_si.status.reason == 'derp' - assert mesos_pb2.TaskState.Name(chained_si.status.status) == 'TASK_FAILED' + assert TaskState.Name(chained_si.status.status) == 'TASK_FAILED' for si in (si1, si2): assert not si.stopped.is_set() http://git-wip-us.apache.org/repos/asf/aurora/blob/1b3b7f53/src/test/python/apache/aurora/executor/test_status_manager.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/test_status_manager.py b/src/test/python/apache/aurora/executor/test_status_manager.py index 7c0efe8..ce4679b 100644 --- a/src/test/python/apache/aurora/executor/test_status_manager.py +++ b/src/test/python/apache/aurora/executor/test_status_manager.py @@ -16,9 +16,9 @@ import time from unittest import TestCase import mock -from mesos.interface import mesos_pb2 +from mesos.interface.mesos_pb2 import TaskState -from apache.aurora.executor.common.status_checker import StatusChecker, StatusResult +from apache.aurora.executor.common.status_checker import StatusChecker from apache.aurora.executor.status_manager import StatusManager @@ -28,29 +28,23 @@ class FakeStatusChecker(StatusChecker): @property def status(self): - status_result = None - if self.call_count == 1: - status_result = StatusResult('Task is healthy.', mesos_pb2.TASK_RUNNING) if self.call_count == 2: - status_result = StatusResult('Task is healthy.', mesos_pb2.TASK_RUNNING) - if self.call_count == 3: - status_result = StatusResult('Reason why a task failed.', mesos_pb2.TASK_KILLED) + return TaskState.Value('TASK_KILLED') self.call_count += 1 - return status_result + return None class TestStatusManager(TestCase): def setUp(self): - self.callback_call_count = 0 + self.callback_called = False def test_run(self): checker = FakeStatusChecker() - def running(result): - self.callback_call_count += 1 - def shutdown(result): - self.callback_call_count += 1 + def callback(result): + assert result == TaskState.Value('TASK_KILLED') + self.callback_called = True mock_time = mock.create_autospec(spec=time, instance=True) - status_manager = StatusManager(checker, running, shutdown, mock_time) + status_manager = StatusManager(checker, callback, mock_time) status_manager.run() - assert mock_time.sleep.call_count == 3 - assert self.callback_call_count == 2 + assert mock_time.sleep.call_count == 2 + assert self.callback_called is True http://git-wip-us.apache.org/repos/asf/aurora/blob/1b3b7f53/src/test/python/apache/aurora/executor/test_thermos_executor.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/test_thermos_executor.py b/src/test/python/apache/aurora/executor/test_thermos_executor.py index ac914db..0bfe9e9 100644 --- a/src/test/python/apache/aurora/executor/test_thermos_executor.py +++ b/src/test/python/apache/aurora/executor/test_thermos_executor.py @@ -44,9 +44,9 @@ from apache.aurora.config.schema.base import ( ) from apache.aurora.executor.aurora_executor import AuroraExecutor from apache.aurora.executor.common.executor_timeout import ExecutorTimeout -from apache.aurora.executor.common.health_checker import HealthChecker, HealthCheckerProvider +from apache.aurora.executor.common.health_checker import HealthCheckerProvider from apache.aurora.executor.common.sandbox import DirectorySandbox, SandboxProvider -from apache.aurora.executor.common.status_checker import ChainedStatusChecker, StatusCheckerProvider +from apache.aurora.executor.common.status_checker import ChainedStatusChecker from apache.aurora.executor.common.task_runner import TaskError from apache.aurora.executor.status_manager import StatusManager from apache.aurora.executor.thermos_task_runner import ( @@ -100,14 +100,6 @@ class FailingSandboxProvider(SandboxProvider): return FailingSandbox(safe_mkdtemp(), exception_type=self._exception_type, **kwargs) -class FailingStatusCheckerProvider(StatusCheckerProvider): - def __init__(self, exception_type): - self._exception_type = exception_type - - def from_assigned_task(self, assigned_task, sandbox): - raise self._exception_type('Could not create status checker!') - - class SlowSandbox(DirectorySandbox): def __init__(self, *args, **kwargs): super(SlowSandbox, self).__init__(*args, **kwargs) @@ -225,25 +217,18 @@ def make_executor( ExecutorTimeout(te.launched, proxy_driver, timeout=Amount(100, Time.MILLISECONDS)).start() task_description = make_task(task, assigned_ports=ports, instanceId=0) te.launchTask(proxy_driver, task_description) - te.status_manager_started.wait() - is_health_check_enabled = False - for status_checker in te._chained_checker._status_checkers: - if isinstance(status_checker, HealthChecker): - is_health_check_enabled = True - break + te.status_manager_started.wait() - status_update_nums = 1 if is_health_check_enabled else 2 - while len(proxy_driver.method_calls['sendStatusUpdate']) < status_update_nums: + while len(proxy_driver.method_calls['sendStatusUpdate']) < 2: time.sleep(0.1) # make sure startup was kosher updates = proxy_driver.method_calls['sendStatusUpdate'] - assert len(updates) == status_update_nums + assert len(updates) == 2 status_updates = [arg_tuple[0][0] for arg_tuple in updates] assert status_updates[0].state == mesos_pb2.TASK_STARTING - if not is_health_check_enabled: - assert status_updates[1].state == mesos_pb2.TASK_RUNNING + assert status_updates[1].state == mesos_pb2.TASK_RUNNING # wait for the runner to bind to a task while True: @@ -251,6 +236,7 @@ def make_executor( if runner: break time.sleep(0.1) + assert te.launched.is_set() return runner, te @@ -452,7 +438,7 @@ class TestThermosExecutor(object): executor.terminated.wait() updates = proxy_driver.method_calls['sendStatusUpdate'] - assert len(updates) == 2 + assert len(updates) == 3 assert updates[-1][0][0].state == mesos_pb2.TASK_FAILED def test_task_health_ok(self): @@ -586,20 +572,6 @@ class TestThermosExecutor(object): assert updates[0][0][0].state == mesos_pb2.TASK_STARTING assert updates[1][0][0].state == mesos_pb2.TASK_FAILED - def test_failing_status_provider_initialize_unknown_exception(self): - proxy_driver = ProxyDriver() - - with temporary_dir() as td: - te = FastThermosExecutor( - runner_provider=make_provider(td), - sandbox_provider=DefaultTestSandboxProvider(), - status_providers=(FailingStatusCheckerProvider(exception_type=Exception),)) - te.launchTask(proxy_driver, make_task(HELLO_WORLD_MTI)) - proxy_driver.wait_stopped() - - updates = proxy_driver.method_calls['sendStatusUpdate'] - assert updates[-1][0][0].state == mesos_pb2.TASK_FAILED - def test_waiting_executor(): proxy_driver = ProxyDriver()