Enabled pause/resume for health checks. Review: https://reviews.apache.org/r/57645/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/85edc8f0 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/85edc8f0 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/85edc8f0 Branch: refs/heads/master Commit: 85edc8f078adb513c20bc913b9e17fb5c5bbe78c Parents: 4bbfaeb Author: Gastón Kleiman <[email protected]> Authored: Fri Mar 24 00:49:14 2017 +0100 Committer: Alexander Rukletsov <[email protected]> Committed: Fri Mar 24 01:28:15 2017 +0100 ---------------------------------------------------------------------- src/checks/health_checker.cpp | 49 +++++++++++++++++++++++++++++++--- src/checks/health_checker.hpp | 11 +++++--- src/docker/executor.cpp | 4 +-- src/launcher/default_executor.cpp | 31 ++++++++++++++++----- src/launcher/executor.cpp | 4 +-- 5 files changed, 80 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/85edc8f0/src/checks/health_checker.cpp ---------------------------------------------------------------------- diff --git a/src/checks/health_checker.cpp b/src/checks/health_checker.cpp index 3a7de78..3290eb6 100644 --- a/src/checks/health_checker.cpp +++ b/src/checks/health_checker.cpp @@ -201,11 +201,15 @@ HealthChecker::~HealthChecker() } -void HealthChecker::stop() +void HealthChecker::pause() { - LOG(INFO) << "Health checking stopped"; + dispatch(process.get(), &HealthCheckerProcess::pause); +} + - terminate(process.get(), true); +void HealthChecker::resume() +{ + dispatch(process.get(), &HealthCheckerProcess::resume); } @@ -230,7 +234,8 @@ HealthCheckerProcess::HealthCheckerProcess( agentURL(_agentURL), commandCheckViaAgent(_commandCheckViaAgent), consecutiveFailures(0), - initializing(true) + initializing(true), + paused(false) { Try<Duration> create = Duration::create(check.delay_seconds()); CHECK_SOME(create); @@ -324,6 +329,10 @@ void HealthCheckerProcess::success() void HealthCheckerProcess::performSingleCheck() { + if (paused) { + return; + } + Future<Nothing> checkResult; Stopwatch stopwatch; @@ -361,6 +370,13 @@ void HealthCheckerProcess::processCheckResult( const Stopwatch& stopwatch, const Future<Nothing>& future) { + // `HealthChecker` might have been paused while performing the check. + if (paused) { + LOG(INFO) << "Ignoring health check result of task " + stringify(taskId) + + " (health checking is paused)"; + return; + } + VLOG(1) << "Performed " << HealthCheck::Type_Name(check.type()) << " health check in " << stopwatch.elapsed(); @@ -904,12 +920,37 @@ Future<Nothing> HealthCheckerProcess::_tcpHealthCheck( void HealthCheckerProcess::scheduleNext(const Duration& duration) { + CHECK(!paused); + VLOG(1) << "Scheduling health check in " << duration; delay(duration, self(), &Self::performSingleCheck); } +void HealthCheckerProcess::pause() +{ + if (!paused) { + VLOG(1) << "Health checking paused"; + + paused = true; + } +} + + +void HealthCheckerProcess::resume() +{ + if (paused) { + VLOG(1) << "Health checking resumed"; + + paused = false; + + // Schedule a health check immediately. + scheduleNext(Duration::zero()); + } +} + + namespace validation { Option<Error> healthCheck(const HealthCheck& check) http://git-wip-us.apache.org/repos/asf/mesos/blob/85edc8f0/src/checks/health_checker.hpp ---------------------------------------------------------------------- diff --git a/src/checks/health_checker.hpp b/src/checks/health_checker.hpp index a7307ac..29df49b 100644 --- a/src/checks/health_checker.hpp +++ b/src/checks/health_checker.hpp @@ -110,10 +110,9 @@ public: ~HealthChecker(); - /** - * Immediately stops health checking. Any in-flight health checks are dropped. - */ - void stop(); + // Idempotent helpers for pausing and resuming health checking. + void pause(); + void resume(); private: explicit HealthChecker(process::Owned<HealthCheckerProcess> process); @@ -138,6 +137,9 @@ public: virtual ~HealthCheckerProcess() {} + void pause(); + void resume(); + protected: virtual void initialize() override; @@ -225,6 +227,7 @@ private: uint32_t consecutiveFailures; process::Time startTime; bool initializing; + bool paused; }; http://git-wip-us.apache.org/repos/asf/mesos/blob/85edc8f0/src/docker/executor.cpp ---------------------------------------------------------------------- diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp index 528bcdb..82ae9bd 100644 --- a/src/docker/executor.cpp +++ b/src/docker/executor.cpp @@ -399,7 +399,7 @@ private: // Stop health checking the task. if (checker.get() != nullptr) { - checker->stop(); + checker->pause(); } // TODO(bmahler): Replace this with 'docker kill' so @@ -415,7 +415,7 @@ private: // Stop health checking the task. if (checker.get() != nullptr) { - checker->stop(); + checker->pause(); } // In case the stop is stuck, discard it. http://git-wip-us.apache.org/repos/asf/mesos/blob/85edc8f0/src/launcher/default_executor.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp index 58efb4c..ee24531 100644 --- a/src/launcher/default_executor.cpp +++ b/src/launcher/default_executor.cpp @@ -143,8 +143,6 @@ public: connectionId = UUID::random(); doReliableRegistration(); - - // TODO(gkleiman): Resume (health) checks. } void disconnected() @@ -163,7 +161,12 @@ public: } } - // TODO(gkleiman): Stop (health) checks. + // Pause all health checks. + foreachvalue (Owned<Container> container, containers) { + if (container->healthChecker.isSome()) { + container->healthChecker->get()->pause(); + } + } } void received(const Event& event) @@ -188,6 +191,13 @@ public: wait(containers.keys()); } + // Resume all health checks. + foreachvalue (Owned<Container> container, containers) { + if (container->healthChecker.isSome()) { + container->healthChecker->get()->resume(); + } + } + break; } @@ -734,11 +744,11 @@ protected: container->checker = None(); } - // If the task is health checked, stop the associated health checker + // If the task is health checked, pause the associated health checker // to avoid sending health updates after a terminal status update. if (container->healthChecker.isSome()) { CHECK_NOTNULL(container->healthChecker->get()); - container->healthChecker->get()->stop(); + container->healthChecker->get()->pause(); container->healthChecker = None(); } @@ -929,13 +939,13 @@ protected: container->checker = None(); } - // If the task is health checked, stop the associated health checker. + // If the task is health checked, pause the associated health checker. // // TODO(alexr): Once we support `TASK_KILLING` in this executor, // consider health checking the task after sending `TASK_KILLING`. if (container->healthChecker.isSome()) { CHECK_NOTNULL(container->healthChecker->get()); - container->healthChecker->get()->stop(); + container->healthChecker->get()->pause(); container->healthChecker = None(); } @@ -1032,6 +1042,13 @@ protected: void taskHealthUpdated(const TaskHealthStatus& healthStatus) { + if (state == DISCONNECTED) { + VLOG(1) << "Ignoring task health update for task" + << " '" << healthStatus.task_id() << "'," + << " because the executor is not connected to the agent"; + return; + } + // If the health checked container has already been waited on, // ignore the health update. This prevents us from sending // `TASK_RUNNING` after a terminal status update. http://git-wip-us.apache.org/repos/asf/mesos/blob/85edc8f0/src/launcher/executor.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp index db703f0..4e5841c 100644 --- a/src/launcher/executor.cpp +++ b/src/launcher/executor.cpp @@ -704,7 +704,7 @@ private: // Stop health checking the task. if (healthChecker.get() != nullptr) { - healthChecker->stop(); + healthChecker->pause(); } // Now perform signal escalation to begin killing the task. @@ -749,7 +749,7 @@ private: // Stop health checking the task. if (healthChecker.get() != nullptr) { - healthChecker->stop(); + healthChecker->pause(); } TaskState taskState;
