Enabled pause/resume for health checks.

Review: https://reviews.apache.org/r/57645/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/85edc8f0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/85edc8f0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/85edc8f0

Branch: refs/heads/master
Commit: 85edc8f078adb513c20bc913b9e17fb5c5bbe78c
Parents: 4bbfaeb
Author: Gastón Kleiman <[email protected]>
Authored: Fri Mar 24 00:49:14 2017 +0100
Committer: Alexander Rukletsov <[email protected]>
Committed: Fri Mar 24 01:28:15 2017 +0100

----------------------------------------------------------------------
 src/checks/health_checker.cpp     | 49 +++++++++++++++++++++++++++++++---
 src/checks/health_checker.hpp     | 11 +++++---
 src/docker/executor.cpp           |  4 +--
 src/launcher/default_executor.cpp | 31 ++++++++++++++++-----
 src/launcher/executor.cpp         |  4 +--
 5 files changed, 80 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/85edc8f0/src/checks/health_checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/health_checker.cpp b/src/checks/health_checker.cpp
index 3a7de78..3290eb6 100644
--- a/src/checks/health_checker.cpp
+++ b/src/checks/health_checker.cpp
@@ -201,11 +201,15 @@ HealthChecker::~HealthChecker()
 }
 
 
-void HealthChecker::stop()
+void HealthChecker::pause()
 {
-  LOG(INFO) << "Health checking stopped";
+  dispatch(process.get(), &HealthCheckerProcess::pause);
+}
+
 
-  terminate(process.get(), true);
+void HealthChecker::resume()
+{
+  dispatch(process.get(), &HealthCheckerProcess::resume);
 }
 
 
@@ -230,7 +234,8 @@ HealthCheckerProcess::HealthCheckerProcess(
     agentURL(_agentURL),
     commandCheckViaAgent(_commandCheckViaAgent),
     consecutiveFailures(0),
-    initializing(true)
+    initializing(true),
+    paused(false)
 {
   Try<Duration> create = Duration::create(check.delay_seconds());
   CHECK_SOME(create);
@@ -324,6 +329,10 @@ void HealthCheckerProcess::success()
 
 void HealthCheckerProcess::performSingleCheck()
 {
+  if (paused) {
+    return;
+  }
+
   Future<Nothing> checkResult;
 
   Stopwatch stopwatch;
@@ -361,6 +370,13 @@ void HealthCheckerProcess::processCheckResult(
     const Stopwatch& stopwatch,
     const Future<Nothing>& future)
 {
+  // `HealthChecker` might have been paused while performing the check.
+  if (paused) {
+    LOG(INFO) << "Ignoring health check result of task " + stringify(taskId) +
+                 " (health checking is paused)";
+    return;
+  }
+
   VLOG(1) << "Performed " << HealthCheck::Type_Name(check.type())
           << " health check in " << stopwatch.elapsed();
 
@@ -904,12 +920,37 @@ Future<Nothing> HealthCheckerProcess::_tcpHealthCheck(
 
 void HealthCheckerProcess::scheduleNext(const Duration& duration)
 {
+  CHECK(!paused);
+
   VLOG(1) << "Scheduling health check in " << duration;
 
   delay(duration, self(), &Self::performSingleCheck);
 }
 
 
+void HealthCheckerProcess::pause()
+{
+  if (!paused) {
+    VLOG(1) << "Health checking paused";
+
+    paused = true;
+  }
+}
+
+
+void HealthCheckerProcess::resume()
+{
+  if (paused) {
+    VLOG(1) << "Health checking resumed";
+
+    paused = false;
+
+    // Schedule a health check immediately.
+    scheduleNext(Duration::zero());
+  }
+}
+
+
 namespace validation {
 
 Option<Error> healthCheck(const HealthCheck& check)

http://git-wip-us.apache.org/repos/asf/mesos/blob/85edc8f0/src/checks/health_checker.hpp
----------------------------------------------------------------------
diff --git a/src/checks/health_checker.hpp b/src/checks/health_checker.hpp
index a7307ac..29df49b 100644
--- a/src/checks/health_checker.hpp
+++ b/src/checks/health_checker.hpp
@@ -110,10 +110,9 @@ public:
 
   ~HealthChecker();
 
-  /**
-   * Immediately stops health checking. Any in-flight health checks are 
dropped.
-   */
-  void stop();
+  // Idempotent helpers for pausing and resuming health checking.
+  void pause();
+  void resume();
 
 private:
   explicit HealthChecker(process::Owned<HealthCheckerProcess> process);
@@ -138,6 +137,9 @@ public:
 
   virtual ~HealthCheckerProcess() {}
 
+  void pause();
+  void resume();
+
 protected:
   virtual void initialize() override;
 
@@ -225,6 +227,7 @@ private:
   uint32_t consecutiveFailures;
   process::Time startTime;
   bool initializing;
+  bool paused;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/85edc8f0/src/docker/executor.cpp
----------------------------------------------------------------------
diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp
index 528bcdb..82ae9bd 100644
--- a/src/docker/executor.cpp
+++ b/src/docker/executor.cpp
@@ -399,7 +399,7 @@ private:
 
       // Stop health checking the task.
       if (checker.get() != nullptr) {
-        checker->stop();
+        checker->pause();
       }
 
       // TODO(bmahler): Replace this with 'docker kill' so
@@ -415,7 +415,7 @@ private:
 
     // Stop health checking the task.
     if (checker.get() != nullptr) {
-      checker->stop();
+      checker->pause();
     }
 
     // In case the stop is stuck, discard it.

http://git-wip-us.apache.org/repos/asf/mesos/blob/85edc8f0/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp 
b/src/launcher/default_executor.cpp
index 58efb4c..ee24531 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -143,8 +143,6 @@ public:
     connectionId = UUID::random();
 
     doReliableRegistration();
-
-    // TODO(gkleiman): Resume (health) checks.
   }
 
   void disconnected()
@@ -163,7 +161,12 @@ public:
       }
     }
 
-    // TODO(gkleiman): Stop (health) checks.
+    // Pause all health checks.
+    foreachvalue (Owned<Container> container, containers) {
+      if (container->healthChecker.isSome()) {
+        container->healthChecker->get()->pause();
+      }
+    }
   }
 
   void received(const Event& event)
@@ -188,6 +191,13 @@ public:
           wait(containers.keys());
         }
 
+        // Resume all health checks.
+        foreachvalue (Owned<Container> container, containers) {
+          if (container->healthChecker.isSome()) {
+            container->healthChecker->get()->resume();
+          }
+        }
+
         break;
       }
 
@@ -734,11 +744,11 @@ protected:
       container->checker = None();
     }
 
-    // If the task is health checked, stop the associated health checker
+    // If the task is health checked, pause the associated health checker
     // to avoid sending health updates after a terminal status update.
     if (container->healthChecker.isSome()) {
       CHECK_NOTNULL(container->healthChecker->get());
-      container->healthChecker->get()->stop();
+      container->healthChecker->get()->pause();
       container->healthChecker = None();
     }
 
@@ -929,13 +939,13 @@ protected:
       container->checker = None();
     }
 
-    // If the task is health checked, stop the associated health checker.
+    // If the task is health checked, pause the associated health checker.
     //
     // TODO(alexr): Once we support `TASK_KILLING` in this executor,
     // consider health checking the task after sending `TASK_KILLING`.
     if (container->healthChecker.isSome()) {
       CHECK_NOTNULL(container->healthChecker->get());
-      container->healthChecker->get()->stop();
+      container->healthChecker->get()->pause();
       container->healthChecker = None();
     }
 
@@ -1032,6 +1042,13 @@ protected:
 
   void taskHealthUpdated(const TaskHealthStatus& healthStatus)
   {
+    if (state == DISCONNECTED) {
+      VLOG(1) << "Ignoring task health update for task"
+              << " '" << healthStatus.task_id() << "',"
+              << " because the executor is not connected to the agent";
+      return;
+    }
+
     // If the health checked container has already been waited on,
     // ignore the health update. This prevents us from sending
     // `TASK_RUNNING` after a terminal status update.

http://git-wip-us.apache.org/repos/asf/mesos/blob/85edc8f0/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index db703f0..4e5841c 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -704,7 +704,7 @@ private:
 
       // Stop health checking the task.
       if (healthChecker.get() != nullptr) {
-        healthChecker->stop();
+        healthChecker->pause();
       }
 
       // Now perform signal escalation to begin killing the task.
@@ -749,7 +749,7 @@ private:
 
     // Stop health checking the task.
     if (healthChecker.get() != nullptr) {
-      healthChecker->stop();
+      healthChecker->pause();
     }
 
     TaskState taskState;

Reply via email to