Change job updates to rely on `health-checks` rather than on `watch_secs`. Make RUNNING a first class state to indicate that the task is running and is healthy. It is achieved by introducing a new configuration parameter `min_consecutive_successes`, which will dictate when to move a task into RUNNING state.
With this change, it is possible to set the `watch_secs` to 0, so that updates are purely based on the task's health, rather than relying on watching the task to in RUNNING state for a pre-determined timeout. Testing Done: buils-support/jenkins/build.sh sh ./src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh Bugs closed: AURORA-1225 Reviewed at https://reviews.apache.org/r/53590/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/2992c8b4 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/2992c8b4 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/2992c8b4 Branch: refs/heads/master Commit: 2992c8b4dec3294293b2130c49a8836e070bceae Parents: 05f082a Author: Santhosh Kumar Shanmugham <[email protected]> Authored: Thu Nov 17 13:59:35 2016 -0800 Committer: Zameer Manji <[email protected]> Committed: Thu Nov 17 13:59:35 2016 -0800 ---------------------------------------------------------------------- RELEASE-NOTES.md | 4 + docs/development/design-documents.md | 2 +- docs/features/job-updates.md | 13 +- docs/features/services.md | 17 + docs/reference/configuration.md | 3 +- docs/reference/task-lifecycle.md | 4 +- .../apache/aurora/client/api/updater_util.py | 4 +- src/main/python/apache/aurora/client/config.py | 23 +- .../python/apache/aurora/config/schema/base.py | 11 +- .../apache/aurora/executor/aurora_executor.py | 11 +- .../aurora/executor/common/health_checker.py | 147 +++- .../aurora/executor/common/status_checker.py | 53 +- .../apache/aurora/executor/status_manager.py | 31 +- .../apache/aurora/client/cli/test_inspect.py | 3 +- .../python/apache/aurora/client/test_config.py | 44 +- .../executor/common/test_health_checker.py | 681 ++++++++++++++++--- .../executor/common/test_status_checker.py | 116 +++- .../aurora/executor/test_status_manager.py | 39 +- .../aurora/executor/test_thermos_executor.py | 30 +- .../apache/aurora/e2e/http/http_example.aurora | 14 +- .../http/http_example_bad_healthcheck.aurora | 10 +- .../aurora/e2e/http/http_example_updated.aurora | 26 +- .../sh/org/apache/aurora/e2e/http_example.py | 27 +- .../sh/org/apache/aurora/e2e/test_end_to_end.sh | 6 +- 24 files changed, 1102 insertions(+), 217 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/RELEASE-NOTES.md ---------------------------------------------------------------------- diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 3924962..96926f4 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -16,6 +16,10 @@ can be used in the Aurora job configuration to resolve a docker image specified by its `name:tag` to a concrete identifier specified by its `registry/name@digest`. It requires version 2 of the Docker Registry. +- Use `RUNNING` state to indicate that the task is healthy and behaving as expected. Job updates + can now rely purely on health checks rather than `watch_secs` timeout when deciding an individial + instance update state, by setting `watch_secs` to 0. A service will remain in `STARTING` state + util `min_consecutive_successes` consecutive health checks have passed. ### Deprecations and removals: http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/docs/development/design-documents.md ---------------------------------------------------------------------- diff --git a/docs/development/design-documents.md b/docs/development/design-documents.md index 6bfc679..c942643 100644 --- a/docs/development/design-documents.md +++ b/docs/development/design-documents.md @@ -11,7 +11,7 @@ Current and past documents: * [Command Hooks for the Aurora Client](design/command-hooks.md) * [Dynamic Reservations](https://docs.google.com/document/d/19gV8Po6DIHO14tOC7Qouk8RnboY8UCfRTninwn_5-7c/edit) * [GPU Resources in Aurora](https://docs.google.com/document/d/1J9SIswRMpVKQpnlvJAMAJtKfPP7ZARFknuyXl-2aZ-M/edit) -* [Health Checks for Updates](https://docs.google.com/document/d/1ZdgW8S4xMhvKW7iQUX99xZm10NXSxEWR0a-21FP5d94/edit) +* [Health Checks for Updates](https://docs.google.com/document/d/1KOO0LC046k75TqQqJ4c0FQcVGbxvrn71E10wAjMorVY/edit) * [JobUpdateDiff thrift API](https://docs.google.com/document/d/1Fc_YhhV7fc4D9Xv6gJzpfooxbK4YWZcvzw6Bd3qVTL8/edit) * [REST API RFC](https://docs.google.com/document/d/11_lAsYIRlD5ETRzF2eSd3oa8LXAHYFD8rSetspYXaf4/edit) * [Revocable Mesos offers in Aurora](https://docs.google.com/document/d/1r1WCHgmPJp5wbrqSZLsgtxPNj3sULfHrSFmxp2GyPTo/edit) http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/docs/features/job-updates.md ---------------------------------------------------------------------- diff --git a/docs/features/job-updates.md b/docs/features/job-updates.md index 792f2ae..60968ae 100644 --- a/docs/features/job-updates.md +++ b/docs/features/job-updates.md @@ -34,7 +34,7 @@ You may `abort` a job update regardless of the state it is in. This will instruct the scheduler to completely abandon the job update and leave the job in the current (possibly partially-updated) state. -For a configuration update, the Aurora Client calculates required changes +For a configuration update, the Aurora Scheduler calculates required changes by examining the current job config state and the new desired job config. It then starts a *rolling batched update process* by going through every batch and performing these operations: @@ -44,14 +44,13 @@ and performing these operations: - If an instance is not present in the scheduler but is present in the new config, then the instance is created. - If an instance is present in both the scheduler and the new config, then - the client diffs both task configs. If it detects any changes, it + the scheduler diffs both task configs. If it detects any changes, it performs an instance update by killing the old config instance and adds the new config instance. -The Aurora client continues through the instance list until all tasks are -updated, in `RUNNING,` and healthy for a configurable amount of time. -If the client determines the update is not going well (a percentage of health -checks have failed), it cancels the update. +The Aurora Scheduler continues through the instance list until all tasks are +updated and in `RUNNING`. If the scheduler determines the update is not going +well (based on the criteria specified in the UpdateConfig), it cancels the update. Update cancellation runs a procedure similar to the described above update sequence, but in reverse order. New instance configs are swapped @@ -59,7 +58,7 @@ with old instance configs and batch updates proceed backwards from the point where the update failed. E.g.; (0,1,2) (3,4,5) (6,7, 8-FAIL) results in a rollback in order (8,7,6) (5,4,3) (2,1,0). -For details how to control a job update, please see the +For details on how to control a job update, please see the [UpdateConfig](../reference/configuration.md#updateconfig-objects) configuration object. http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/docs/features/services.md ---------------------------------------------------------------------- diff --git a/docs/features/services.md b/docs/features/services.md index b6bfc9d..50189ee 100644 --- a/docs/features/services.md +++ b/docs/features/services.md @@ -90,6 +90,23 @@ Please see the [configuration reference](../reference/configuration.md#user-content-healthcheckconfig-objects) for configuration options for this feature. +Starting with the 0.17.0 release, job updates rely only on task health-checks by introducing +a `min_consecutive_successes` parameter on the HealthCheckConfig object. This parameter represents +the number of successful health checks needed before a task is moved into the `RUNNING` state. Tasks +that do not have enough successful health checks within the first `n` attempts, are moved to the +`FAILED` state, where `n = ceil(initial_interval_secs/interval_secs) + min_consecutive_successes`. +In order to accommodate variability during task warm up, `initial_interval_secs` will +act as a grace period. Any health-check failures during the first `m` attempts are ignored and +do not count towards `max_consecutive_failures`, where `m = ceil(initial_interval_secs/interval_secs)`. + +As [job updates](job-updates.md) are based only on health-checks, it is not necessary to set +`watch_secs` to the worst-case update time, it can instead be set to 0. The scheduler considers a +task that is in the `RUNNING` to be healthy and proceeds to updating the next batch of instances. +For details on how to control health checks, please see the +[HealthCheckConfig](../reference/configuration.md#healthcheckconfig-objects) configuration object. +Existing jobs that do not configure a health-check can fall-back to using `watch_secs` to +monitor a task before considering it healthy. + You can pause health checking by touching a file inside of your sandbox, named `.healthchecksnooze`. As long as that file is present, health checks will be disabled, enabling users to gather core dumps or other performance measurements without worrying about Aurora's health check killing http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/docs/reference/configuration.md ---------------------------------------------------------------------- diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index f2a0b18..6c71142 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -379,9 +379,10 @@ Parameters for controlling a task's health checks via HTTP or a shell command. | param | type | description | ------- | :-------: | -------- | ```health_checker``` | HealthCheckerConfig | Configure what kind of health check to use. -| ```initial_interval_secs``` | Integer | Initial delay for performing a health check. (Default: 15) +| ```initial_interval_secs``` | Integer | Initial grace period (during which health-check failures are ignored) while performing health checks. (Default: 15) | ```interval_secs``` | Integer | Interval on which to check the task's health. (Default: 10) | ```max_consecutive_failures``` | Integer | Maximum number of consecutive failures that will be tolerated before considering a task unhealthy (Default: 0) +| ```min_consecutive_successes``` | Integer | Minimum number of consecutive successful health checks required before considering a task healthy (Default: 1) | ```timeout_secs``` | Integer | Health check timeout. (Default: 1) ### HealthCheckerConfig Objects http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/docs/reference/task-lifecycle.md ---------------------------------------------------------------------- diff --git a/docs/reference/task-lifecycle.md b/docs/reference/task-lifecycle.md index 4dcb481..cf1b679 100644 --- a/docs/reference/task-lifecycle.md +++ b/docs/reference/task-lifecycle.md @@ -35,7 +35,9 @@ the `Task` goes into `STARTING` state. `STARTING` state initializes a `Task` sandbox. When the sandbox is fully initialized, Thermos begins to invoke `Process`es. Also, the agent machine sends an update to the scheduler that the `Task` is -in `RUNNING` state. +in `RUNNING` state, only after the task satisfies the liveness requirements. +See [Health Checking](../features/services#health-checking) for more details +for how to configure health checks. http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/main/python/apache/aurora/client/api/updater_util.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/api/updater_util.py b/src/main/python/apache/aurora/client/api/updater_util.py index c649316..4e39862 100644 --- a/src/main/python/apache/aurora/client/api/updater_util.py +++ b/src/main/python/apache/aurora/client/api/updater_util.py @@ -35,8 +35,8 @@ class UpdaterConfig(object): if batch_size <= 0: raise ValueError('Batch size should be greater than 0') - if watch_secs <= 0: - raise ValueError('Watch seconds should be greater than 0') + if watch_secs < 0: + raise ValueError('Watch seconds should be greater than or equal to 0') if pulse_interval_secs is not None and pulse_interval_secs < self.MIN_PULSE_INTERVAL_SECONDS: raise ValueError('Pulse interval seconds must be at least %s seconds.' % self.MIN_PULSE_INTERVAL_SECONDS) http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/main/python/apache/aurora/client/config.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/config.py b/src/main/python/apache/aurora/client/config.py index 0186af5..70c2c98 100644 --- a/src/main/python/apache/aurora/client/config.py +++ b/src/main/python/apache/aurora/client/config.py @@ -84,11 +84,8 @@ Based on your job size (%s) you should use max_total_failures >= %s. ''' -WATCH_SECS_INSUFFICIENT_ERROR_FORMAT = ''' -You have specified an insufficiently short watch period (%d seconds) in your update configuration. -Your update will always succeed. In order for the updater to detect health check failures, -UpdateConfig.watch_secs must be greater than %d seconds to account for an initial -health check interval (%d seconds) plus %d consecutive failures at a check interval of %d seconds. +INVALID_VALUE_ERROR_FORMAT = ''' +Invalid value (%s) specified for %s. Value cannot be less than 0. ''' @@ -101,6 +98,7 @@ def _validate_update_config(config): watch_secs = update_config.watch_secs().get() initial_interval_secs = health_check_config.initial_interval_secs().get() max_consecutive_failures = health_check_config.max_consecutive_failures().get() + min_consecutive_successes = health_check_config.min_consecutive_successes().get() interval_secs = health_check_config.interval_secs().get() if max_failures >= job_size: @@ -111,10 +109,17 @@ def _validate_update_config(config): if max_failures < min_failure_threshold: die(UPDATE_CONFIG_DEDICATED_THRESHOLD_ERROR % (job_size, min_failure_threshold)) - target_watch = initial_interval_secs + (max_consecutive_failures * interval_secs) - if watch_secs <= target_watch: - die(WATCH_SECS_INSUFFICIENT_ERROR_FORMAT % - (watch_secs, target_watch, initial_interval_secs, max_consecutive_failures, interval_secs)) + params = [ + (watch_secs, 'watch_secs'), + (max_consecutive_failures, 'max_consecutive_failures'), + (min_consecutive_successes, 'min_consecutive_successes'), + (initial_interval_secs, 'initial_interval_secs'), + (interval_secs, 'interval_secs') + ] + + for value, name in params: + if value < 0: + die(INVALID_VALUE_ERROR_FORMAT % (value, name)) PRODUCTION_DEPRECATED_WARNING = ( http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/main/python/apache/aurora/config/schema/base.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/config/schema/base.py b/src/main/python/apache/aurora/config/schema/base.py index 8451630..b15b939 100644 --- a/src/main/python/apache/aurora/config/schema/base.py +++ b/src/main/python/apache/aurora/config/schema/base.py @@ -56,11 +56,12 @@ DefaultHealthChecker = HealthCheckerConfig(http=HttpHealthChecker()) class HealthCheckConfig(Struct): - health_checker = Default(HealthCheckerConfig, DefaultHealthChecker) - initial_interval_secs = Default(Float, 15.0) - interval_secs = Default(Float, 10.0) - max_consecutive_failures = Default(Integer, 0) - timeout_secs = Default(Float, 1.0) + health_checker = Default(HealthCheckerConfig, DefaultHealthChecker) + initial_interval_secs = Default(Float, 15.0) + interval_secs = Default(Float, 10.0) + max_consecutive_failures = Default(Integer, 0) + min_consecutive_successes = Default(Integer, 1) + timeout_secs = Default(Float, 1.0) class HttpLifecycleConfig(Struct): http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/main/python/apache/aurora/executor/aurora_executor.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/aurora_executor.py b/src/main/python/apache/aurora/executor/aurora_executor.py index aee5e56..d01fcb9 100644 --- a/src/main/python/apache/aurora/executor/aurora_executor.py +++ b/src/main/python/apache/aurora/executor/aurora_executor.py @@ -115,8 +115,6 @@ class AuroraExecutor(ExecutorBase, Observable): if not self._start_runner(driver, assigned_task): return - self.send_update(driver, self._task_id, mesos_pb2.TASK_RUNNING) - try: self._start_status_manager(driver, assigned_task) except Exception: @@ -179,10 +177,17 @@ class AuroraExecutor(ExecutorBase, Observable): # chain the runner to the other checkers, but do not chain .start()/.stop() complete_checker = ChainedStatusChecker([self._runner, self._chained_checker]) self._status_manager = self._status_manager_class( - complete_checker, self._shutdown, clock=self._clock) + complete_checker, + self._signal_running, + self._shutdown, + clock=self._clock) self._status_manager.start() self.status_manager_started.set() + def _signal_running(self, reason): + log.info('Send TASK_RUNNING status update. reason: %s' % reason) + self.send_update(self._driver, self._task_id, mesos_pb2.TASK_RUNNING, reason) + def _signal_kill_manager(self, driver, task_id, reason): if self._task_id is None: log.error('Was asked to kill task but no task running!') http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/main/python/apache/aurora/executor/common/health_checker.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/common/health_checker.py b/src/main/python/apache/aurora/executor/common/health_checker.py index 3c7c09d..12af9d8 100644 --- a/src/main/python/apache/aurora/executor/common/health_checker.py +++ b/src/main/python/apache/aurora/executor/common/health_checker.py @@ -12,6 +12,7 @@ # limitations under the License. # +import math import os import pwd import threading @@ -43,14 +44,22 @@ class ThreadedHealthChecker(ExceptionalThread): health_checker should be a callable returning a tuple of (boolean, reason), indicating respectively the health of the service and the reason for its failure (or None if the service is still healthy). + + Health-check failures are ignored during the first `math.ceil(grace_period_secs/interval_secs)` + attempts. Status becomes `TASK_RUNNING` if `min_consecutive_successes` consecutive health + check successes are seen, within `math.ceil(grace_period_secs/interval_secs) + + min_consecutive_successes` attempts. (Converting time to attempts, accounts for slight + discrepancies in sleep intervals do not cost an attempt, and unceremoniously end performing + health checks and marking as unhealthy.) """ def __init__(self, health_checker, sandbox, interval_secs, - initial_interval_secs, + grace_period_secs, max_consecutive_failures, + min_consecutive_successes, clock): """ :param health_checker: health checker to confirm service health @@ -59,10 +68,12 @@ class ThreadedHealthChecker(ExceptionalThread): :type sandbox: DirectorySandbox :param interval_secs: delay between checks :type interval_secs: int - :param initial_interval_secs: seconds to wait before starting checks - :type initial_interval_secs: int + :param grace_period_secs: initial period during which failed health-checks are ignored + :type grace_period_secs: int :param max_consecutive_failures: number of failures to allow before marking dead :type max_consecutive_failures: int + :param min_consecutive_successes: number of successes needed before marking healthy + :type min_consecutive_successes: int :param clock: time module available to be mocked for testing :type clock: time module """ @@ -70,24 +81,33 @@ class ThreadedHealthChecker(ExceptionalThread): self.sandbox = sandbox self.clock = clock self.current_consecutive_failures = 0 + self.current_consecutive_successes = 0 self.dead = threading.Event() self.interval = interval_secs self.max_consecutive_failures = max_consecutive_failures + self.min_consecutive_successes = min_consecutive_successes self.snooze_file = None self.snoozed = False if self.sandbox and self.sandbox.exists(): self.snooze_file = os.path.join(self.sandbox.root, '.healthchecksnooze') - if initial_interval_secs is not None: - self.initial_interval = initial_interval_secs + if grace_period_secs is not None: + self.grace_period_secs = grace_period_secs else: - self.initial_interval = interval_secs * 2 + self.grace_period_secs = interval_secs * 2 + + self.attempts = 0 + # Compute the number of attempts that can be fit into the grace_period_secs, + # to guarantee the number of health checks during the grace period. + # Relying on time might cause non-deterministic behavior since the + # health checks can be spaced apart by interval_secs + epsilon. + self.forgiving_attempts = math.ceil(self.grace_period_secs / self.interval) + + self.max_attempts_to_running = self.forgiving_attempts + self.min_consecutive_successes + self.running = False + self.healthy, self.reason = True, None - if self.initial_interval > 0: - self.healthy, self.reason = True, None - else: - self.healthy, self.reason = self._perform_check_if_not_disabled() super(ThreadedHealthChecker, self).__init__() self.daemon = True @@ -107,27 +127,80 @@ class ThreadedHealthChecker(ExceptionalThread): log.error(traceback.format_exc()) return False, 'Internal health check error: %s' % e - def _maybe_update_failure_count(self, is_healthy, reason): + def _maybe_update_health_check_count(self, is_healthy, reason): if not is_healthy: log.warning('Health check failure: %s' % reason) + + if self.current_consecutive_successes > 0: + log.debug('Reset consecutive successes counter.') + self.current_consecutive_successes = 0 + + if self._should_ignore_failure(): + return + + if self._should_fail_fast(): + log.warning('Not enough attempts left prove health, failing fast.') + self.healthy = False + self.reason = reason + self.current_consecutive_failures += 1 if self.current_consecutive_failures > self.max_consecutive_failures: log.warning('Reached consecutive failure limit.') self.healthy = False self.reason = reason else: + self.current_consecutive_successes += 1 + + if not self.running: + if self.current_consecutive_successes >= self.min_consecutive_successes: + log.info('Reached consecutive success limit.') + self.running = True + if self.current_consecutive_failures > 0: log.debug('Reset consecutive failures counter.') - self.current_consecutive_failures = 0 + self.current_consecutive_failures = 0 + + def _should_fail_fast(self): + if not self.running: + attempts_remaining = self.max_attempts_to_running - self.attempts + successes_needed = self.min_consecutive_successes - self.current_consecutive_successes + if successes_needed > attempts_remaining: + return True + return False + + def _should_ignore_failure(self): + if not self.running: + if self.attempts <= self.forgiving_attempts: + log.warning('Ignoring failure of attempt: %s' % self.attempts) + return True + return False + + def _should_enforce_deadline(self): + if not self.running: + if self.attempts > self.max_attempts_to_running: + return True + return False + + def _do_health_check(self): + if self._should_enforce_deadline(): + # This is needed otherwise it is possible to flap between + # successful health-checks and failed health-checks, never + # really satisfying the criteria for either healthy or unhealthy. + log.warning('Exhausted attempts before satisfying liveness criteria.') + self.healthy = False + self.reason = 'Not enough successful health checks in time.' + return self.healthy, self.reason + + is_healthy, reason = self._perform_check_if_not_disabled() + if not self.running: + self.attempts += 1 + self._maybe_update_health_check_count(is_healthy, reason) + return is_healthy, reason def run(self): log.debug('Health checker thread started.') - if self.initial_interval > 0: - self.clock.sleep(self.initial_interval) - log.debug('Initial interval expired.') while not self.dead.is_set(): - is_healthy, reason = self._perform_check_if_not_disabled() - self._maybe_update_failure_count(is_healthy, reason) + is_healthy, reason = self._do_health_check() self.clock.sleep(self.interval) def start(self): @@ -158,8 +231,9 @@ class HealthChecker(StatusChecker): health_checker, sandbox=None, interval_secs=10, - initial_interval_secs=None, + grace_period_secs=None, max_consecutive_failures=0, + min_consecutive_successes=1, clock=time): self._health_checks = 0 self._total_latency = 0 @@ -169,8 +243,9 @@ class HealthChecker(StatusChecker): self._timing_wrapper(health_checker), sandbox, interval_secs, - initial_interval_secs, + grace_period_secs, max_consecutive_failures, + min_consecutive_successes, clock) self.metrics.register(LambdaGauge('consecutive_failures', lambda: self.threaded_health_checker.current_consecutive_failures)) @@ -192,9 +267,13 @@ class HealthChecker(StatusChecker): @property def status(self): - if not self.threaded_health_checker.healthy: - return StatusResult('Failed health check! %s' % self.threaded_health_checker.reason, - TaskState.Value('TASK_FAILED')) + if self.threaded_health_checker.healthy: + if self.threaded_health_checker.running: + return StatusResult('Task is healthy.', TaskState.Value('TASK_RUNNING')) + else: + return StatusResult(None, TaskState.Value('TASK_STARTING')) + return StatusResult('Failed health check! %s' % self.threaded_health_checker.reason, + TaskState.Value('TASK_FAILED')) def name(self): return 'health_checker' @@ -207,6 +286,22 @@ class HealthChecker(StatusChecker): self.threaded_health_checker.stop() +class NoopHealthChecker(StatusChecker): + """ + A health checker that will always report healthy status. This will be the + stand-in health checker when no health checker is configured. Since there is + no liveness requirement specified, the status is always `TASK_RUNNING`. + """ + + def __init__(self): + self._status = StatusResult('No health-check defined, task is assumed healthy.', + TaskState.Value('TASK_RUNNING')) + + @property + def status(self): + return self._status + + class HealthCheckerProvider(StatusCheckerProvider): def __init__(self, nosetuid_health_checks=False, mesos_containerizer_path=None): @@ -282,7 +377,8 @@ class HealthCheckerProvider(StatusCheckerProvider): else: portmap = resolve_ports(mesos_task, assigned_task.assignedPorts) if 'health' not in portmap: - return None + log.warning('No health-checks defined, will use a no-op health-checker.') + return NoopHealthChecker() http_config = health_checker.get(HTTP_HEALTH_CHECK, {}) http_endpoint = http_config.get('endpoint') http_expected_response = http_config.get('expected_response') @@ -301,7 +397,8 @@ class HealthCheckerProvider(StatusCheckerProvider): a_health_checker, sandbox, interval_secs=health_check_config.get('interval_secs'), - initial_interval_secs=health_check_config.get('initial_interval_secs'), - max_consecutive_failures=health_check_config.get('max_consecutive_failures')) + grace_period_secs=health_check_config.get('initial_interval_secs'), + max_consecutive_failures=health_check_config.get('max_consecutive_failures'), + min_consecutive_successes=health_check_config.get('min_consecutive_successes')) return health_checker http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/main/python/apache/aurora/executor/common/status_checker.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/common/status_checker.py b/src/main/python/apache/aurora/executor/common/status_checker.py index 795dae2..f278825 100644 --- a/src/main/python/apache/aurora/executor/common/status_checker.py +++ b/src/main/python/apache/aurora/executor/common/status_checker.py @@ -49,6 +49,11 @@ class StatusResult(object): self._reason, TaskState.Name(self._status)) + def __eq__(self, other): + if isinstance(other, StatusResult): + return self._status == other._status and self._reason == other._reason + return False + class StatusChecker(Observable, Interface): """Interface to pluggable status checkers for the Aurora Executor.""" @@ -73,6 +78,13 @@ class StatusChecker(Observable, Interface): class StatusCheckerProvider(Interface): @abstractmethod def from_assigned_task(self, assigned_task, sandbox): + """ + :param assigned_task: + :type assigned_task: AssignedTask + :param sandbox: Sandbox of the task corresponding to this status check. + :type sandbox: DirectorySandbox + :return: Instance of a HealthChecker. + """ pass @@ -92,18 +104,43 @@ class ChainedStatusChecker(StatusChecker): @property def status(self): - if self._status is None: + """ + Return status that is computed from the statuses of the StatusCheckers. The computed status + is based on the priority given below (in increasing order of priority). + + None -> healthy (lowest-priority) + TASK_RUNNING -> healthy and running + TASK_STARTING -> healthy but still in starting + Otherwise -> unhealthy (highest-priority) + """ + if not self._in_terminal_state(): + cur_status = None for status_checker in self._status_checkers: - status_checker_status = status_checker.status - if status_checker_status is not None: - log.info('%s reported %s' % (status_checker.__class__.__name__, status_checker_status)) - if not isinstance(status_checker_status, StatusResult): + status_result = status_checker.status + if status_result is not None: + log.info('%s reported %s' % (status_checker.__class__.__name__, status_result)) + if not isinstance(status_result, StatusResult): raise TypeError('StatusChecker returned something other than a StatusResult: got %s' % - type(status_checker_status)) - self._status = status_checker_status - break + type(status_result)) + if status_result.status == TaskState.Value('TASK_STARTING'): + # TASK_STARTING overrides other statuses + cur_status = status_result + elif status_result.status == TaskState.Value('TASK_RUNNING'): + if cur_status is None or cur_status == TaskState.Value('TASK_RUNNING'): + # TASK_RUNNING needs consensus (None is also included) + cur_status = status_result + else: + # Any other status leads to a terminal state + self._status = status_result + return self._status + self._status = cur_status return self._status + def _in_terminal_state(self): + return (self._status is not None and + self._status.status != TaskState.Value('TASK_RUNNING') and + self._status.status != TaskState.Value('TASK_STARTING')) + def start(self): for status_checker in self._status_checkers: status_checker.start() http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/main/python/apache/aurora/executor/status_manager.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/status_manager.py b/src/main/python/apache/aurora/executor/status_manager.py index 228a99a..8b536a9 100644 --- a/src/main/python/apache/aurora/executor/status_manager.py +++ b/src/main/python/apache/aurora/executor/status_manager.py @@ -14,6 +14,7 @@ import time +from mesos.interface.mesos_pb2 import TaskState from twitter.common import log from twitter.common.exceptions import ExceptionalThread from twitter.common.quantity import Amount, Time @@ -26,18 +27,24 @@ class StatusManager(ExceptionalThread): An agent that periodically checks the health of a task via StatusCheckers that provide HTTP health checking, resource consumption, etc. - If any of the status interfaces return a status, the Status Manager - invokes the user-supplied callback with the status. + Invokes the user-supplied `running_callback` with the status, if the StatusChecker + returns `TASK_RUNNING` as the status. `running_callback` is invoked only once during + the first time `TASK_RUNNING` is reported. For any other non-None statuses other than + `TASK_STARTING`, invokes the `unhealthy_callback` and terminates. """ POLL_WAIT = Amount(500, Time.MILLISECONDS) - def __init__(self, status_checker, callback, clock=time): + def __init__(self, status_checker, running_callback, unhealthy_callback, clock=time): if not isinstance(status_checker, StatusChecker): raise TypeError('status_checker must be a StatusChecker, got %s' % type(status_checker)) - if not callable(callback): - raise TypeError('callback needs to be callable!') + if not callable(running_callback): + raise TypeError('running_callback needs to be callable!') + if not callable(unhealthy_callback): + raise TypeError('unhealthy_callback needs to be callable!') self._status_checker = status_checker - self._callback = callback + self._running_callback = running_callback + self._running_callback_dispatched = False + self._unhealthy_callback = unhealthy_callback self._clock = clock super(StatusManager, self).__init__() self.daemon = True @@ -47,7 +54,11 @@ class StatusManager(ExceptionalThread): status_result = self._status_checker.status if status_result is not None: log.info('Status manager got %s' % status_result) - self._callback(status_result) - break - else: - self._clock.sleep(self.POLL_WAIT.as_(Time.SECONDS)) + if status_result.status == TaskState.Value('TASK_RUNNING'): + if not self._running_callback_dispatched: + self._running_callback(status_result) + self._running_callback_dispatched = True + elif status_result.status != TaskState.Value('TASK_STARTING'): + self._unhealthy_callback(status_result) + break + self._clock.sleep(self.POLL_WAIT.as_(Time.SECONDS)) http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/test/python/apache/aurora/client/cli/test_inspect.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/cli/test_inspect.py b/src/test/python/apache/aurora/client/cli/test_inspect.py index 7ef682d..4a23c59 100644 --- a/src/test/python/apache/aurora/client/cli/test_inspect.py +++ b/src/test/python/apache/aurora/client/cli/test_inspect.py @@ -99,7 +99,8 @@ Process 'process': "expected_response": "ok"}}, "interval_secs": 10.0, "timeout_secs": 1.0, - "max_consecutive_failures": 0}, + "max_consecutive_failures": 0, + "min_consecutive_successes": 1}, "cluster": "west", "cron_schedule": "* * * * *", "service": False, http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/test/python/apache/aurora/client/test_config.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/test_config.py b/src/test/python/apache/aurora/client/test_config.py index 5cf68a5..042372e 100644 --- a/src/test/python/apache/aurora/client/test_config.py +++ b/src/test/python/apache/aurora/client/test_config.py @@ -192,21 +192,42 @@ def test_update_config_passes_with_default_values(): config._validate_update_config(AuroraConfig(base_job)) -def test_update_config_passes_with_min_requirement_values(): +def test_update_config_passes_with_max_consecutive_failures_zero(): + base_job = Job( + name='hello_world', role='john_doe', cluster='test-cluster', + health_check_config=HealthCheckConfig(max_consecutive_failures=0), + task=Task(name='main', processes=[], + resources=Resources(cpu=0.1, ram=64 * MB, disk=64 * MB))) + + config._validate_update_config(AuroraConfig(base_job)) + + +def test_update_config_fails_with_max_consecutive_failures_negative(): base_job = Job( name='hello_world', role='john_doe', cluster='test-cluster', update_config=UpdateConfig(watch_secs=26), - health_check_config=HealthCheckConfig(max_consecutive_failures=1), + health_check_config=HealthCheckConfig(max_consecutive_failures=-1), + task=Task(name='main', processes=[], + resources=Resources(cpu=0.1, ram=64 * MB, disk=64 * MB))) + + with pytest.raises(SystemExit): + config._validate_update_config(AuroraConfig(base_job)) + + +def test_update_config_passes_with_min_consecutive_successes_zero(): + base_job = Job( + name='hello_world', role='john_doe', cluster='test-cluster', + health_check_config=HealthCheckConfig(min_consecutive_successes=0), task=Task(name='main', processes=[], resources=Resources(cpu=0.1, ram=64 * MB, disk=64 * MB))) config._validate_update_config(AuroraConfig(base_job)) -def test_update_config_fails_insufficient_watch_secs_less_than_target(): +def test_update_config_fails_with_min_consecutive_successes_negative(): base_job = Job( name='hello_world', role='john_doe', cluster='test-cluster', - update_config=UpdateConfig(watch_secs=10), + health_check_config=HealthCheckConfig(min_consecutive_successes=-1), task=Task(name='main', processes=[], resources=Resources(cpu=0.1, ram=64 * MB, disk=64 * MB))) @@ -214,11 +235,20 @@ def test_update_config_fails_insufficient_watch_secs_less_than_target(): config._validate_update_config(AuroraConfig(base_job)) -def test_update_config_fails_insufficient_watch_secs_equal_to_target(): +def test_update_config_passes_with_watch_secs_zero(): + base_job = Job( + name='hello_world', role='john_doe', cluster='test-cluster', + update_config=UpdateConfig(watch_secs=0), + task=Task(name='main', processes=[], + resources=Resources(cpu=0.1, ram=64 * MB, disk=64 * MB))) + + config._validate_update_config(AuroraConfig(base_job)) + + +def test_update_config_fails_watch_secs_negative(): base_job = Job( name='hello_world', role='john_doe', cluster='test-cluster', - update_config=UpdateConfig(watch_secs=25), - health_check_config=HealthCheckConfig(max_consecutive_failures=1), + update_config=UpdateConfig(watch_secs=-1), task=Task(name='main', processes=[], resources=Resources(cpu=0.1, ram=64 * MB, disk=64 * MB))) http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/test/python/apache/aurora/executor/common/test_health_checker.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/common/test_health_checker.py b/src/test/python/apache/aurora/executor/common/test_health_checker.py index da0c56c..e2a7f16 100644 --- a/src/test/python/apache/aurora/executor/common/test_health_checker.py +++ b/src/test/python/apache/aurora/executor/common/test_health_checker.py @@ -34,9 +34,10 @@ from apache.aurora.config.schema.base import ( from apache.aurora.executor.common.health_checker import ( HealthChecker, HealthCheckerProvider, - ThreadedHealthChecker + NoopHealthChecker ) from apache.aurora.executor.common.sandbox import SandboxInterface +from apache.aurora.executor.common.status_checker import StatusResult from .fixtures import HELLO_WORLD, MESOS_JOB @@ -47,7 +48,8 @@ class TestHealthChecker(unittest.TestCase): def setUp(self): self._clock = ThreadedClock(0) self._checker = mock.Mock(spec=HttpSignaler) - + self.initial_interval_secs = 15 + self.interval_secs = 10 self.fake_health_checks = [] def mock_health_check(): return self.fake_health_checks.pop(0) @@ -58,101 +60,193 @@ class TestHealthChecker(unittest.TestCase): for i in range(num_calls): self.fake_health_checks.append((status, 'reason')) - def test_initial_interval_2x(self): + def test_grace_period_2x_success(self): + '''Grace period is 2 x interval and health checks succeed.''' + + self.append_health_checks(True, num_calls=2) + hct = HealthChecker( + self._checker.health, + interval_secs=self.interval_secs, + clock=self._clock) + hct.start() + assert self._clock.converge(threads=[hct.threaded_health_checker]) + self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs) + assert hct.status == StatusResult('Task is healthy.', TaskState.Value('TASK_RUNNING')) + assert hct.threaded_health_checker.running is True + hct.stop() + assert self._checker.health.call_count == 1 + + def test_grace_period_2x_failure_then_success(self): + '''Grace period is 2 x interval and health checks fail then succeed.''' + self.append_health_checks(False) - hct = HealthChecker(self._checker.health, interval_secs=5, clock=self._clock) + self.append_health_checks(True) + hct = HealthChecker( + self._checker.health, + interval_secs=self.interval_secs, + clock=self._clock) + hct.start() + assert self._clock.converge(threads=[hct.threaded_health_checker]) + self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs) + assert hct.status == StatusResult(None, TaskState.Value('TASK_STARTING')) + assert hct.threaded_health_checker.running is False + self._clock.tick(self.interval_secs) + assert self._clock.converge(threads=[hct.threaded_health_checker]) + self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs) + assert hct.status == StatusResult('Task is healthy.', TaskState.Value('TASK_RUNNING')) + assert hct.threaded_health_checker.running is True + hct.stop() + assert self._checker.health.call_count == 2 + + def test_grace_period_2x_failure(self): + ''' + Grace period is 2 x interval and all health checks fail. + Failures are ignored when in grace period. + ''' + + self.append_health_checks(False, num_calls=3) + hct = HealthChecker( + self._checker.health, + interval_secs=self.interval_secs, + clock=self._clock) hct.start() assert self._clock.converge(threads=[hct.threaded_health_checker]) - self._clock.assert_waiting(hct.threaded_health_checker, 10) - assert hct.status is None - self._clock.tick(6) + self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs) + assert hct.status == StatusResult(None, TaskState.Value('TASK_STARTING')) + assert hct.threaded_health_checker.running is False + self._clock.tick(self.interval_secs) assert self._clock.converge(threads=[hct.threaded_health_checker]) - assert hct.status is None - self._clock.tick(3) + self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs) + assert hct.status == StatusResult(None, TaskState.Value('TASK_STARTING')) + assert hct.threaded_health_checker.running is False + self._clock.tick(self.interval_secs) assert self._clock.converge(threads=[hct.threaded_health_checker]) - assert hct.status is None - self._clock.tick(5) + self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs) + assert hct.status == StatusResult('Failed health check! reason', TaskState.Value('TASK_FAILED')) + assert hct.threaded_health_checker.running is False + hct.stop() + assert self._checker.health.call_count == 3 + + def test_success_outside_grace_period(self): + ''' + Health checks fail inside grace period, but pass outside and leads to success + ''' + + self.append_health_checks(False, num_calls=2) + self.append_health_checks(True) + hct = HealthChecker( + self._checker.health, + interval_secs=self.interval_secs, + clock=self._clock) + hct.start() assert self._clock.converge(threads=[hct.threaded_health_checker]) - assert hct.status.status == TaskState.Value('TASK_FAILED') + self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs) + assert hct.status == StatusResult(None, TaskState.Value('TASK_STARTING')) + assert hct.threaded_health_checker.running is False + self._clock.tick(self.interval_secs) + assert self._clock.converge(threads=[hct.threaded_health_checker]) + self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs) + assert hct.status == StatusResult(None, TaskState.Value('TASK_STARTING')) + assert hct.threaded_health_checker.running is False + self._clock.tick(self.interval_secs) + assert self._clock.converge(threads=[hct.threaded_health_checker]) + self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs) + assert hct.status == StatusResult('Task is healthy.', TaskState.Value('TASK_RUNNING')) + assert hct.threaded_health_checker.running is True hct.stop() - assert self._checker.health.call_count == 1 + assert self._checker.health.call_count == 3 def test_initial_interval_whatev(self): self.append_health_checks(False, 2) hct = HealthChecker( self._checker.health, - interval_secs=5, - initial_interval_secs=0, + interval_secs=self.interval_secs, + grace_period_secs=0, clock=self._clock) hct.start() self._clock.converge(threads=[hct.threaded_health_checker]) - self._clock.assert_waiting(hct.threaded_health_checker, amount=5) - assert hct.status.status == TaskState.Value('TASK_FAILED') + self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs) + assert hct.status == StatusResult('Failed health check! reason', TaskState.Value('TASK_FAILED')) hct.stop() - # this is an implementation detail -- we healthcheck in the initializer and - # healthcheck in the run loop. if we ever change the implementation, expect - # this to break. - assert self._checker.health.call_count == 2 + assert self._checker.health.call_count == 1 - def test_consecutive_failures(self): - '''Verify that a task is unhealthy only after max_consecutive_failures is exceeded''' - initial_interval_secs = 2 - interval_secs = 1 - self.append_health_checks(False, num_calls=2) - self.append_health_checks(True) + def test_consecutive_failures_max_failures(self): + '''Verify that a task is unhealthy after max_consecutive_failures is exceeded''' + grace_period_secs = self.initial_interval_secs + interval_secs = self.interval_secs + self.append_health_checks(True, num_calls=2) self.append_health_checks(False, num_calls=3) hct = HealthChecker( self._checker.health, interval_secs=interval_secs, - initial_interval_secs=initial_interval_secs, + grace_period_secs=grace_period_secs, max_consecutive_failures=2, + min_consecutive_successes=2, clock=self._clock) hct.start() - self._clock.converge(threads=[hct.threaded_health_checker]) - # 2 consecutive health check failures followed by a successful health check. - epsilon = 0.001 - self._clock.tick(initial_interval_secs + epsilon) self._clock.converge(threads=[hct.threaded_health_checker]) - self._clock.assert_waiting(hct.threaded_health_checker, amount=1) - assert hct.status is None - assert hct.metrics.sample()['consecutive_failures'] == 1 - self._clock.tick(interval_secs + epsilon) - self._clock.converge(threads=[hct.threaded_health_checker]) - self._clock.assert_waiting(hct.threaded_health_checker, amount=1) - assert hct.status is None - assert hct.metrics.sample()['consecutive_failures'] == 2 - self._clock.tick(interval_secs + epsilon) + self._clock.assert_waiting(hct.threaded_health_checker, interval_secs) + assert hct.status == StatusResult(None, TaskState.Value('TASK_STARTING')) + assert hct.metrics.sample()['consecutive_failures'] == 0 + self._clock.tick(interval_secs) self._clock.converge(threads=[hct.threaded_health_checker]) - self._clock.assert_waiting(hct.threaded_health_checker, amount=1) - assert hct.status is None + self._clock.assert_waiting(hct.threaded_health_checker, interval_secs) + assert hct.status == StatusResult('Task is healthy.', TaskState.Value('TASK_RUNNING')) assert hct.metrics.sample()['consecutive_failures'] == 0 - - # 3 consecutive health check failures. - self._clock.tick(interval_secs + epsilon) + assert hct.threaded_health_checker.running is True + self._clock.tick(interval_secs) self._clock.converge(threads=[hct.threaded_health_checker]) - self._clock.assert_waiting(hct.threaded_health_checker, amount=1) - assert hct.status is None + self._clock.assert_waiting(hct.threaded_health_checker, interval_secs) + assert hct.status == StatusResult('Task is healthy.', TaskState.Value('TASK_RUNNING')) assert hct.metrics.sample()['consecutive_failures'] == 1 - self._clock.tick(interval_secs + epsilon) + self._clock.tick(interval_secs) self._clock.converge(threads=[hct.threaded_health_checker]) - self._clock.assert_waiting(hct.threaded_health_checker, amount=1) - assert hct.status is None + self._clock.assert_waiting(hct.threaded_health_checker, interval_secs) + assert hct.status == StatusResult('Task is healthy.', TaskState.Value('TASK_RUNNING')) assert hct.metrics.sample()['consecutive_failures'] == 2 - self._clock.tick(interval_secs + epsilon) + self._clock.tick(interval_secs) self._clock.converge(threads=[hct.threaded_health_checker]) - self._clock.assert_waiting(hct.threaded_health_checker, amount=1) - assert hct.status.status == TaskState.Value('TASK_FAILED') + self._clock.assert_waiting(hct.threaded_health_checker, interval_secs) + assert hct.status == StatusResult('Failed health check! reason', TaskState.Value('TASK_FAILED')) assert hct.metrics.sample()['consecutive_failures'] == 3 hct.stop() - assert self._checker.health.call_count == 6 + assert self._checker.health.call_count == 5 + + def test_consecutive_failures_failfast(self): + '''Verify that health check is failed fast''' + grace_period_secs = self.initial_interval_secs + interval_secs = self.interval_secs + self.append_health_checks(False, num_calls=3) + hct = HealthChecker( + self._checker.health, + interval_secs=interval_secs, + grace_period_secs=grace_period_secs, + max_consecutive_failures=2, + min_consecutive_successes=2, + clock=self._clock) + hct.start() + + # 3 consecutive health check failures causes fail-fast + self._clock.converge(threads=[hct.threaded_health_checker]) + self._clock.assert_waiting(hct.threaded_health_checker, interval_secs) + assert hct.status == StatusResult(None, TaskState.Value('TASK_STARTING')) + # failure is ignored inside grace_period_secs + assert hct.metrics.sample()['consecutive_failures'] == 0 + self._clock.tick(interval_secs) + self._clock.converge(threads=[hct.threaded_health_checker]) + self._clock.assert_waiting(hct.threaded_health_checker, interval_secs) + assert hct.status == StatusResult('Failed health check! reason', TaskState.Value('TASK_FAILED')) + assert hct.metrics.sample()['consecutive_failures'] == 1 + hct.stop() + assert self._checker.health.call_count == 2 @pytest.mark.skipif('True', reason='Flaky test (AURORA-1182)') def test_health_checker_metrics(self): def slow_check(): self._clock.sleep(0.5) return (True, None) - hct = HealthChecker(slow_check, interval_secs=1, initial_interval_secs=1, clock=self._clock) + hct = HealthChecker(slow_check, interval_secs=1, grace_period_secs=1, clock=self._clock) hct.start() self._clock.converge(threads=[hct.threaded_health_checker]) self._clock.assert_waiting(hct.threaded_health_checker, amount=1) @@ -193,6 +287,7 @@ class TestHealthCheckerProvider(unittest.TestCase): interval_secs = 17 initial_interval_secs = 3 max_consecutive_failures = 2 + min_consecutive_successes = 2 task_config = TaskConfig( executorConfig=ExecutorConfig( name='thermos', @@ -202,6 +297,7 @@ class TestHealthCheckerProvider(unittest.TestCase): interval_secs=interval_secs, initial_interval_secs=initial_interval_secs, max_consecutive_failures=max_consecutive_failures, + min_consecutive_successes=min_consecutive_successes, timeout_secs=7 ) ).json_dumps() @@ -209,15 +305,17 @@ class TestHealthCheckerProvider(unittest.TestCase): ) assigned_task = AssignedTask(task=task_config, instanceId=1, assignedPorts={'health': 9001}) health_checker = HealthCheckerProvider().from_assigned_task(assigned_task, None) - assert health_checker.threaded_health_checker.interval == interval_secs - assert health_checker.threaded_health_checker.initial_interval == initial_interval_secs - hct_max_fail = health_checker.threaded_health_checker.max_consecutive_failures - assert hct_max_fail == max_consecutive_failures + hc = health_checker.threaded_health_checker + assert hc.interval == interval_secs + assert hc.grace_period_secs == initial_interval_secs + assert hc.max_consecutive_failures == max_consecutive_failures + assert hc.min_consecutive_successes == min_consecutive_successes def test_from_assigned_task_http_endpoint_style_config(self): interval_secs = 17 initial_interval_secs = 3 max_consecutive_failures = 2 + min_consecutive_successes = 2 http_config = HttpHealthChecker( endpoint='/foo', expected_response='bar', @@ -233,6 +331,7 @@ class TestHealthCheckerProvider(unittest.TestCase): interval_secs=interval_secs, initial_interval_secs=initial_interval_secs, max_consecutive_failures=max_consecutive_failures, + min_consecutive_successes=min_consecutive_successes, timeout_secs=7 ) ).json_dumps() @@ -245,14 +344,18 @@ class TestHealthCheckerProvider(unittest.TestCase): assert http_exec_config['expected_response'] == 'bar' assert http_exec_config['expected_response_code'] == 201 health_checker = HealthCheckerProvider().from_assigned_task(assigned_task, None) - assert health_checker.threaded_health_checker.interval == interval_secs - assert health_checker.threaded_health_checker.initial_interval == initial_interval_secs + hc = health_checker.threaded_health_checker + assert hc.interval == interval_secs + assert hc.grace_period_secs == initial_interval_secs + assert hc.max_consecutive_failures == max_consecutive_failures + assert hc.min_consecutive_successes == min_consecutive_successes @mock.patch('pwd.getpwnam') def test_from_assigned_task_shell(self, mock_getpwnam): interval_secs = 17 initial_interval_secs = 3 max_consecutive_failures = 2 + min_consecutive_successes = 2 timeout_secs = 5 shell_config = ShellHealthChecker(shell_command='failed command') task_config = TaskConfig( @@ -266,7 +369,8 @@ class TestHealthCheckerProvider(unittest.TestCase): interval_secs=interval_secs, initial_interval_secs=initial_interval_secs, max_consecutive_failures=max_consecutive_failures, - timeout_secs=timeout_secs, + min_consecutive_successes=min_consecutive_successes, + timeout_secs=timeout_secs ) ).json_dumps() ) @@ -281,10 +385,11 @@ class TestHealthCheckerProvider(unittest.TestCase): type(mock_sandbox).is_filesystem_image = mock.PropertyMock(return_value=False) health_checker = HealthCheckerProvider().from_assigned_task(assigned_task, mock_sandbox) - assert health_checker.threaded_health_checker.interval == interval_secs - assert health_checker.threaded_health_checker.initial_interval == initial_interval_secs - hct_max_fail = health_checker.threaded_health_checker.max_consecutive_failures - assert hct_max_fail == max_consecutive_failures + hc = health_checker.threaded_health_checker + assert hc.interval == interval_secs + assert hc.grace_period_secs == initial_interval_secs + assert hc.max_consecutive_failures == max_consecutive_failures + assert hc.min_consecutive_successes == min_consecutive_successes mock_getpwnam.assert_called_once_with(task_config.job.role) @mock.patch('pwd.getpwnam') @@ -292,6 +397,7 @@ class TestHealthCheckerProvider(unittest.TestCase): interval_secs = 17 initial_interval_secs = 3 max_consecutive_failures = 2 + min_consecutive_successes = 2 timeout_secs = 5 shell_config = ShellHealthChecker(shell_command='failed command') task_config = TaskConfig( @@ -305,7 +411,8 @@ class TestHealthCheckerProvider(unittest.TestCase): interval_secs=interval_secs, initial_interval_secs=initial_interval_secs, max_consecutive_failures=max_consecutive_failures, - timeout_secs=timeout_secs, + min_consecutive_successes=min_consecutive_successes, + timeout_secs=timeout_secs ) ).json_dumps() ) @@ -322,10 +429,11 @@ class TestHealthCheckerProvider(unittest.TestCase): health_checker = HealthCheckerProvider(nosetuid_health_checks=True).from_assigned_task( assigned_task, mock_sandbox) - assert health_checker.threaded_health_checker.interval == interval_secs - assert health_checker.threaded_health_checker.initial_interval == initial_interval_secs - hct_max_fail = health_checker.threaded_health_checker.max_consecutive_failures - assert hct_max_fail == max_consecutive_failures + hc = health_checker.threaded_health_checker + assert hc.interval == interval_secs + assert hc.grace_period_secs == initial_interval_secs + assert hc.max_consecutive_failures == max_consecutive_failures + assert hc.min_consecutive_successes == min_consecutive_successes # Should not be trying to access role's user info. assert not mock_getpwnam.called @@ -334,6 +442,7 @@ class TestHealthCheckerProvider(unittest.TestCase): interval_secs = 17 initial_interval_secs = 3 max_consecutive_failures = 2 + min_consecutive_successes = 2 timeout_secs = 5 shell_config = ShellHealthChecker(shell_command='failed command') task_config = TaskConfig( @@ -347,7 +456,8 @@ class TestHealthCheckerProvider(unittest.TestCase): interval_secs=interval_secs, initial_interval_secs=initial_interval_secs, max_consecutive_failures=max_consecutive_failures, - timeout_secs=timeout_secs, + min_consecutive_successes=min_consecutive_successes, + timeout_secs=timeout_secs ) ).json_dumps() ) @@ -380,6 +490,7 @@ class TestHealthCheckerProvider(unittest.TestCase): interval_secs = 17 initial_interval_secs = 3 max_consecutive_failures = 2 + min_consecutive_successes = 2 timeout_secs = 5 shell_cmd = 'FOO_PORT={{thermos.ports[foo]}} failed command' shell_config = ShellHealthChecker(shell_command=shell_cmd) @@ -393,7 +504,8 @@ class TestHealthCheckerProvider(unittest.TestCase): interval_secs=interval_secs, initial_interval_secs=initial_interval_secs, max_consecutive_failures=max_consecutive_failures, - timeout_secs=timeout_secs, + min_consecutive_successes=min_consecutive_successes, + timeout_secs=timeout_secs ) ).json_dumps() ) @@ -409,6 +521,7 @@ class TestHealthCheckerProvider(unittest.TestCase): interval_secs = 17 initial_interval_secs = 3 max_consecutive_failures = 2 + min_consecutive_successes = 2 timeout_secs = 5 task_config = TaskConfig( executorConfig=ExecutorConfig( @@ -419,7 +532,8 @@ class TestHealthCheckerProvider(unittest.TestCase): interval_secs=interval_secs, initial_interval_secs=initial_interval_secs, max_consecutive_failures=max_consecutive_failures, - timeout_secs=timeout_secs, + min_consecutive_successes=min_consecutive_successes, + timeout_secs=timeout_secs ) ).json_dumps() ) @@ -427,7 +541,7 @@ class TestHealthCheckerProvider(unittest.TestCase): # No health port and we don't have a shell_command. assigned_task = AssignedTask(task=task_config, instanceId=1, assignedPorts={'http': 9001}) health_checker = HealthCheckerProvider().from_assigned_task(assigned_task, None) - self.assertIsNone(health_checker) + assert isinstance(health_checker, NoopHealthChecker) class TestThreadedHealthChecker(unittest.TestCase): @@ -439,17 +553,20 @@ class TestThreadedHealthChecker(unittest.TestCase): self.sandbox.exists.return_value = True self.sandbox.root = '/root' - self.initial_interval_secs = 1 - self.interval_secs = 5 - self.max_consecutive_failures = 2 + self.initial_interval_secs = 15 + self.interval_secs = 10 + self.max_consecutive_failures = 1 + self.min_consecutive_successes = 2 self.clock = mock.Mock(spec=time) - self.clock.time.return_value = 1.0 + self.clock.time.return_value = 0 + self.health_checker = HealthChecker( self.health, None, self.interval_secs, self.initial_interval_secs, self.max_consecutive_failures, + self.min_consecutive_successes, self.clock) self.health_checker_sandbox_exists = HealthChecker( self.health, @@ -457,10 +574,11 @@ class TestThreadedHealthChecker(unittest.TestCase): self.interval_secs, self.initial_interval_secs, self.max_consecutive_failures, + self.min_consecutive_successes, self.clock) def test_perform_check_if_not_disabled_snooze_file_is_none(self): - self.health_checker.threaded_health_checker.snooze_file = None + self.health_checker_sandbox_exists.threaded_health_checker.snooze_file = None assert self.health.call_count == 0 assert self.health_checker_sandbox_exists.metrics.sample()['snoozed'] == 0 self.health_checker.threaded_health_checker._perform_check_if_not_disabled() @@ -487,38 +605,411 @@ class TestThreadedHealthChecker(unittest.TestCase): assert self.health_checker_sandbox_exists.metrics.sample()['snoozed'] == 1 assert result == (True, None) - def test_maybe_update_failure_count(self): + def test_maybe_update_health_check_count_reset_count(self): + hc = self.health_checker.threaded_health_checker + hc.running = True + + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 + + hc._maybe_update_health_check_count(True, 'reason-1') + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 1 + + hc._maybe_update_health_check_count(False, 'reason-2') + assert hc.current_consecutive_failures == 1 + assert hc.current_consecutive_successes == 0 + + hc._maybe_update_health_check_count(True, 'reason-3') + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 1 + + def test_maybe_update_health_check_count_ignore_failures_before_callback(self): hc = self.health_checker.threaded_health_checker + hc.running = False + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 + + hc._maybe_update_health_check_count(False, 'reason-1') + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 + + hc._maybe_update_health_check_count(False, 'reason-2') + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 + + def test_maybe_update_health_check_count_dont_ignore_failures_after_callback(self): + hc = self.health_checker.threaded_health_checker + hc.running = True assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 + + hc._maybe_update_health_check_count(False, 'reason-1') + assert hc.current_consecutive_failures == 1 + assert hc.current_consecutive_successes == 0 + + hc._maybe_update_health_check_count(False, 'reason-2') + assert hc.current_consecutive_failures == 2 + assert hc.current_consecutive_successes == 0 + + def test_maybe_update_health_check_count_fail_fast(self): + hc = self.health_checker.threaded_health_checker + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 assert hc.healthy is True + assert hc.running is False - hc._maybe_update_failure_count(True, 'reason') + hc.attempts += 1 + hc._maybe_update_health_check_count(False, 'reason-1') assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 + assert hc.running is False - hc._maybe_update_failure_count(False, 'reason') + hc.attempts += 1 + hc._maybe_update_health_check_count(False, 'reason-2') assert hc.current_consecutive_failures == 1 + assert hc.current_consecutive_successes == 0 + assert hc.running is False + assert hc.healthy is False + assert hc.reason == 'reason-2' + + def test_maybe_update_health_check_count_max_failures(self): + hc = self.health_checker.threaded_health_checker + hc.running = True + + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 assert hc.healthy is True - hc._maybe_update_failure_count(False, 'reason') - assert hc.current_consecutive_failures == 2 + hc._maybe_update_health_check_count(False, 'reason-1') + assert hc.current_consecutive_failures == 1 + assert hc.current_consecutive_successes == 0 assert hc.healthy is True - hc._maybe_update_failure_count(False, 'reason') + hc._maybe_update_health_check_count(False, 'reason-2') + assert hc.current_consecutive_failures == 2 + assert hc.current_consecutive_successes == 0 assert hc.healthy is False - assert hc.reason == 'reason' + assert hc.reason == 'reason-2' - @mock.patch('apache.aurora.executor.common.health_checker.ThreadedHealthChecker' - '._maybe_update_failure_count', - spec=ThreadedHealthChecker._maybe_update_failure_count) - def test_run(self, mock_maybe_update_failure_count): + def test_maybe_update_health_check_count_success(self): + hc = self.health_checker.threaded_health_checker + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 + assert hc.running is False + assert hc.healthy is True + + hc._maybe_update_health_check_count(True, 'reason') + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 1 + assert hc.running is False + assert hc.healthy is True + + hc._maybe_update_health_check_count(True, 'reason') + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 2 + assert hc.running is True + assert hc.healthy is True + + hc._maybe_update_health_check_count(True, 'reason') + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 3 + assert hc.running is True + assert hc.healthy is True + + def test_run_success(self): + self.health.return_value = (True, 'success') + mock_is_set = mock.Mock(spec=threading._Event.is_set) + liveness = [False, False, False, True] + mock_is_set.side_effect = lambda: liveness.pop(0) + self.health_checker.threaded_health_checker.dead.is_set = mock_is_set + self.health_checker.threaded_health_checker.run() + assert self.clock.sleep.call_count == 3 + assert self.health_checker.threaded_health_checker.current_consecutive_failures == 0 + assert self.health_checker.threaded_health_checker.current_consecutive_successes == 3 + assert self.health_checker.threaded_health_checker.running is True + assert self.health_checker.threaded_health_checker.healthy is True + assert self.health_checker.threaded_health_checker.reason is None + + def test_run_failure(self): + self.health.return_value = (False, 'failure') mock_is_set = mock.Mock(spec=threading._Event.is_set) + liveness = [False, False, False, True] + mock_is_set.side_effect = lambda: liveness.pop(0) self.health_checker.threaded_health_checker.dead.is_set = mock_is_set - liveness = [False, False, True] - self.health_checker.threaded_health_checker.dead.is_set.side_effect = lambda: liveness.pop(0) self.health_checker.threaded_health_checker.run() assert self.clock.sleep.call_count == 3 - assert mock_maybe_update_failure_count.call_count == 2 + assert self.health_checker.threaded_health_checker.current_consecutive_failures == 2 + assert self.health_checker.threaded_health_checker.current_consecutive_successes == 0 + assert self.health_checker.threaded_health_checker.running is False + assert self.health_checker.threaded_health_checker.healthy is False + assert self.health_checker.threaded_health_checker.reason == 'failure' + + def test_run_failure_unhealthy_when_failfast(self): + health_status = [(False, 'failure-1'), (True, None), (False, 'failure-3')] + self.health.side_effect = lambda: health_status.pop(0) + mock_is_set = mock.Mock(spec=threading._Event.is_set) + liveness = [False, False, False, True] + mock_is_set.side_effect = lambda: liveness.pop(0) + self.health_checker.threaded_health_checker.dead.is_set = mock_is_set + self.health_checker.threaded_health_checker.run() + assert self.clock.sleep.call_count == 3 + assert self.health_checker.threaded_health_checker.current_consecutive_failures == 1 + assert self.health_checker.threaded_health_checker.current_consecutive_successes == 0 + assert self.health_checker.threaded_health_checker.running is False + assert self.health_checker.threaded_health_checker.healthy is False + assert self.health_checker.threaded_health_checker.reason == 'failure-3' + + def test_run_unhealthy_after_callback(self): + health_status = [(True, None), (True, None), (False, 'failure-4'), (False, 'failure-5')] + self.health.side_effect = lambda: health_status.pop(0) + mock_is_set = mock.Mock(spec=threading._Event.is_set) + liveness = [False, False, False, False, True] + mock_is_set.side_effect = lambda: liveness.pop(0) + self.health_checker.threaded_health_checker.dead.is_set = mock_is_set + self.health_checker.threaded_health_checker.run() + assert self.clock.sleep.call_count == 4 + assert self.health_checker.threaded_health_checker.current_consecutive_failures == 2 + assert self.health_checker.threaded_health_checker.current_consecutive_successes == 0 + assert self.health_checker.threaded_health_checker.running is True + assert self.health_checker.threaded_health_checker.healthy is False + assert self.health_checker.threaded_health_checker.reason == 'failure-5' + + @mock.patch('apache.aurora.executor.common.health_checker.ExceptionalThread.start', + spec=ExceptionalThread.start) + def test_start(self, mock_start): + assert mock_start.call_count == 0 + self.health_checker.threaded_health_checker.start() + mock_start.assert_called_once_with(self.health_checker.threaded_health_checker) + + def test_stop(self): + assert not self.health_checker.threaded_health_checker.dead.is_set() + self.health_checker.threaded_health_checker.stop() + assert self.health_checker.threaded_health_checker.dead.is_set() + + +class TestThreadedHealthCheckerWithDefaults(unittest.TestCase): + ''' + Similar tests as above but with the default health check configuration. This + will ensure that the defaults are always valid. + ''' + + def setUp(self): + self.health = mock.Mock() + self.health.return_value = (True, 'Fake') + + self.sandbox = mock.Mock(spec_set=SandboxInterface) + self.sandbox.exists.return_value = True + self.sandbox.root = '/root' + + self.health_checker = HealthCheckerProvider().from_assigned_task( + AssignedTask( + task=TaskConfig( + executorConfig=ExecutorConfig( + name='thermos', + data=MESOS_JOB(task=HELLO_WORLD).json_dumps())), + instanceId=1, + assignedPorts={'health': 9001}), + self.sandbox) + + self.health_checker.threaded_health_checker.checker = self.health + + def test_perform_check_if_not_disabled_snooze_file_is_none(self): + self.health_checker.threaded_health_checker.snooze_file = None + assert self.health.call_count == 0 + assert self.health_checker.metrics.sample()['snoozed'] == 0 + self.health_checker.threaded_health_checker._perform_check_if_not_disabled() + assert self.health.call_count == 1 + assert self.health_checker.metrics.sample()['snoozed'] == 0 + + @mock.patch('os.path', spec_set=os.path) + def test_perform_check_if_not_disabled_no_snooze_file(self, mock_os_path): + mock_os_path.isfile.return_value = False + assert self.health.call_count == 0 + assert self.health_checker.metrics.sample()['snoozed'] == 0 + self.health_checker.threaded_health_checker._perform_check_if_not_disabled() + assert self.health.call_count == 1 + assert self.health_checker.metrics.sample()['snoozed'] == 0 + + @mock.patch('os.path', spec_set=os.path) + def test_perform_check_if_not_disabled_snooze_file_exists(self, mock_os_path): + mock_os_path.isfile.return_value = True + assert self.health.call_count == 0 + assert self.health_checker.metrics.sample()['snoozed'] == 0 + result = ( + self.health_checker.threaded_health_checker._perform_check_if_not_disabled()) + assert self.health.call_count == 0 + assert self.health_checker.metrics.sample()['snoozed'] == 1 + assert result == (True, None) + + def test_maybe_update_health_check_count_reset_count(self): + hc = self.health_checker.threaded_health_checker + hc.running = True + + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 + + hc._maybe_update_health_check_count(True, 'reason-1') + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 1 + + hc._maybe_update_health_check_count(False, 'reason-2') + assert hc.current_consecutive_failures == 1 + assert hc.current_consecutive_successes == 0 + + hc._maybe_update_health_check_count(True, 'reason-3') + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 1 + + def test_maybe_update_health_check_count_ignore_failures_before_callback(self): + hc = self.health_checker.threaded_health_checker + hc.running = False + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 + + hc._maybe_update_health_check_count(False, 'reason-1') + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 + + hc._maybe_update_health_check_count(False, 'reason-2') + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 + + def test_maybe_update_health_check_count_dont_ignore_failures_after_callback(self): + hc = self.health_checker.threaded_health_checker + hc.running = True + + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 + + hc._maybe_update_health_check_count(False, 'reason-1') + assert hc.current_consecutive_failures == 1 + assert hc.current_consecutive_successes == 0 + + hc._maybe_update_health_check_count(False, 'reason-2') + assert hc.current_consecutive_failures == 2 + assert hc.current_consecutive_successes == 0 + + def test_maybe_update_health_check_count_fail_fast(self): + hc = self.health_checker.threaded_health_checker + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 + assert hc.healthy is True + assert hc.running is False + + hc.attempts += 1 + hc._maybe_update_health_check_count(False, 'reason-1') + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 + assert hc.running is False + + hc.attempts += 1 + hc._maybe_update_health_check_count(False, 'reason-2') + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 + assert hc.running is False + + hc.attempts += 1 + hc._maybe_update_health_check_count(False, 'reason-3') + assert hc.current_consecutive_failures == 1 + assert hc.current_consecutive_successes == 0 + assert hc.running is False + assert hc.healthy is False + assert hc.reason == 'reason-3' + + def test_maybe_update_health_check_count_max_failures(self): + hc = self.health_checker.threaded_health_checker + hc.running = True + + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 + assert hc.healthy is True + + hc._maybe_update_health_check_count(False, 'reason-1') + assert hc.current_consecutive_failures == 1 + assert hc.current_consecutive_successes == 0 + assert hc.healthy is False + assert hc.reason == 'reason-1' + + def test_maybe_update_health_check_count_success(self): + hc = self.health_checker.threaded_health_checker + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 + assert hc.healthy is True + assert hc.running is False + + hc._maybe_update_health_check_count(True, 'reason') + assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 1 + assert hc.running is True + assert hc.healthy is True + + @mock.patch('apache.aurora.executor.common.health_checker.time.sleep', spec=time.sleep) + def test_run_success(self, mock_sleep): + mock_sleep.return_value = None + self.health.return_value = (True, 'success') + mock_is_set = mock.Mock(spec=threading._Event.is_set) + liveness = [False, False, False, True] + mock_is_set.side_effect = lambda: liveness.pop(0) + self.health_checker.threaded_health_checker.dead.is_set = mock_is_set + self.health_checker.threaded_health_checker.run() + assert mock_sleep.call_count == 3 + assert self.health_checker.threaded_health_checker.current_consecutive_failures == 0 + assert self.health_checker.threaded_health_checker.current_consecutive_successes == 3 + assert self.health_checker.threaded_health_checker.running is True + assert self.health_checker.threaded_health_checker.healthy is True + assert self.health_checker.threaded_health_checker.reason is None + + @mock.patch('apache.aurora.executor.common.health_checker.time.sleep', spec=time.sleep) + def test_run_failure(self, mock_sleep): + mock_sleep.return_value = None + self.health.return_value = (False, 'failure') + mock_is_set = mock.Mock(spec=threading._Event.is_set) + liveness = [False, False, False, True] + mock_is_set.side_effect = lambda: liveness.pop(0) + self.health_checker.threaded_health_checker.dead.is_set = mock_is_set + self.health_checker.threaded_health_checker.run() + assert mock_sleep.call_count == 3 + assert self.health_checker.threaded_health_checker.current_consecutive_failures == 1 + assert self.health_checker.threaded_health_checker.current_consecutive_successes == 0 + assert self.health_checker.threaded_health_checker.running is False + assert self.health_checker.threaded_health_checker.healthy is False + assert self.health_checker.threaded_health_checker.reason == 'failure' + + @mock.patch('apache.aurora.executor.common.health_checker.time.sleep', spec=time.sleep) + def test_run_failure_unhealthy_when_failfast(self, mock_sleep): + mock_sleep.return_value = None + health_status = [(False, 'failure-1'), (False, 'failure-2'), (False, 'failure-3')] + self.health.side_effect = lambda: health_status.pop(0) + mock_is_set = mock.Mock(spec=threading._Event.is_set) + liveness = [False, False, False, True] + mock_is_set.side_effect = lambda: liveness.pop(0) + self.health_checker.threaded_health_checker.dead.is_set = mock_is_set + self.health_checker.threaded_health_checker.run() + assert mock_sleep.call_count == 3 + assert self.health_checker.threaded_health_checker.current_consecutive_failures == 1 + assert self.health_checker.threaded_health_checker.current_consecutive_successes == 0 + assert self.health_checker.threaded_health_checker.running is False + assert self.health_checker.threaded_health_checker.healthy is False + assert self.health_checker.threaded_health_checker.reason == 'failure-3' + + @mock.patch('apache.aurora.executor.common.health_checker.time.sleep', spec=time.sleep) + def test_run_unhealthy_after_callback(self, mock_sleep): + mock_sleep.return_value = None + health_status = [(True, None), (True, None), (False, 'failure-4'), (False, 'failure-5')] + self.health.side_effect = lambda: health_status.pop(0) + mock_is_set = mock.Mock(spec=threading._Event.is_set) + liveness = [False, False, False, False, True] + mock_is_set.side_effect = lambda: liveness.pop(0) + self.health_checker.threaded_health_checker.dead.is_set = mock_is_set + self.health_checker.threaded_health_checker.run() + assert mock_sleep.call_count == 4 + assert self.health_checker.threaded_health_checker.current_consecutive_failures == 2 + assert self.health_checker.threaded_health_checker.current_consecutive_successes == 0 + assert self.health_checker.threaded_health_checker.running is True + assert self.health_checker.threaded_health_checker.healthy is False + assert self.health_checker.threaded_health_checker.reason == 'failure-5' @mock.patch('apache.aurora.executor.common.health_checker.ExceptionalThread.start', spec=ExceptionalThread.start) http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/test/python/apache/aurora/executor/common/test_status_checker.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/common/test_status_checker.py b/src/test/python/apache/aurora/executor/common/test_status_checker.py index 5be1981..3d5fe12 100644 --- a/src/test/python/apache/aurora/executor/common/test_status_checker.py +++ b/src/test/python/apache/aurora/executor/common/test_status_checker.py @@ -14,6 +14,7 @@ import threading +import pytest from mesos.interface.mesos_pb2 import TaskState from apache.aurora.executor.common.status_checker import ( @@ -23,12 +24,16 @@ from apache.aurora.executor.common.status_checker import ( StatusResult ) +TASK_STARTING = StatusResult(None, TaskState.Value('TASK_STARTING')) +TASK_RUNNING = StatusResult(None, TaskState.Value('TASK_RUNNING')) +TASK_FAILED = StatusResult(None, TaskState.Value('TASK_FAILED')) + class EventHealth(StatusChecker): - def __init__(self): + def __init__(self, status=None): self.started = threading.Event() self.stopped = threading.Event() - self._status = None + self._status = status @property def status(self): @@ -73,3 +78,110 @@ def test_chained_health_interface(): chained_si.stop() for si in (si1, si2): assert si.stopped.is_set() + + +def test_chained_empty_checkers(): + hi = ChainedStatusChecker([]) + assert hi.status is None + + +def test_chained_healthy_status_none(): + hi = ChainedStatusChecker([EventHealth()]) + assert hi.status is None + + hi = ChainedStatusChecker([EventHealth(), EventHealth(), EventHealth()]) + assert hi.status is None + + +def test_chained_healthy_status_starting(): + hi = ChainedStatusChecker([EventHealth(TASK_STARTING)]) + assert hi.status is TASK_STARTING + + hi = ChainedStatusChecker([EventHealth(TASK_STARTING), + EventHealth(TASK_STARTING), + EventHealth(TASK_STARTING)]) + assert hi.status is TASK_STARTING + + +def test_chained_healthy_status_running(): + hi = ChainedStatusChecker([EventHealth(TASK_RUNNING)]) + assert hi.status is TASK_RUNNING + + hi = ChainedStatusChecker([EventHealth(TASK_RUNNING), + EventHealth(TASK_RUNNING), + EventHealth(TASK_RUNNING)]) + assert hi.status is TASK_RUNNING + + +def test_chained_healthy_status_failed(): + hi = ChainedStatusChecker([EventHealth(TASK_FAILED)]) + assert hi.status is TASK_FAILED + + hi = ChainedStatusChecker([EventHealth(TASK_FAILED), + EventHealth(TASK_FAILED), + EventHealth(TASK_FAILED)]) + assert hi.status is TASK_FAILED + + +def test_chained_status_failed_trumps_all(): + hi = ChainedStatusChecker([EventHealth(), + EventHealth(TASK_RUNNING), + EventHealth(TASK_STARTING), + EventHealth(TASK_FAILED)]) + assert hi.status is TASK_FAILED + + hi = ChainedStatusChecker([EventHealth(TASK_FAILED), + EventHealth(TASK_STARTING), + EventHealth(TASK_RUNNING), + EventHealth()]) + assert hi.status is TASK_FAILED + + +def test_chained_status_starting_trumps_running_and_none(): + hi = ChainedStatusChecker([EventHealth(), EventHealth(TASK_RUNNING), EventHealth(TASK_STARTING)]) + assert hi.status is TASK_STARTING + + hi = ChainedStatusChecker([EventHealth(TASK_STARTING), EventHealth(TASK_RUNNING), EventHealth()]) + assert hi.status is TASK_STARTING + + +def test_chained_status_running_trumps_none(): + hi = ChainedStatusChecker([EventHealth(TASK_RUNNING), EventHealth()]) + assert hi.status is TASK_RUNNING + + hi = ChainedStatusChecker([EventHealth(), EventHealth(TASK_RUNNING)]) + assert hi.status is TASK_RUNNING + + +def test_chained_status_starting_to_running_consensus(): + eh1 = EventHealth(TASK_STARTING) + eh2 = EventHealth(TASK_STARTING) + hi = ChainedStatusChecker([eh1, eh2]) + assert hi.status is TASK_STARTING + + eh1.set_status(TASK_RUNNING) + assert hi.status is TASK_STARTING + + eh2.set_status(TASK_RUNNING) + assert hi.status is TASK_RUNNING + + +def test_chained_status_failed_is_terminal(): + eh = EventHealth(TASK_FAILED) + hi = ChainedStatusChecker([eh]) + assert hi.status is TASK_FAILED + + eh.set_status(TASK_RUNNING) + assert hi.status is TASK_FAILED + + eh.set_status(TASK_STARTING) + assert hi.status is TASK_FAILED + + eh.set_status(None) + assert hi.status is TASK_FAILED + + +def test_chained_status_raises_unknown_status_result(): + hi = ChainedStatusChecker([EventHealth(1)]) + with pytest.raises(TypeError): + hi.status
