Repository: mesos
Updated Branches:
  refs/heads/1.3.x 56f1de415 -> 522896273


Fixed a bug relating to lingering executors [1/2].

An executor should be shutdown if all of its tasks are
killed while the executor is launching.

This patch fixes an issue where the executor is left
running when the task(s) get killed between the executor
registration/subscription and `Slave::___run()`. See
MESOS-8411 for more details. There is an additional race
in the agent failover case that is addressed in this patch.

The fix here is to fix the race by checking an executor's various
tasks queues during task kill and executor (re-)registration,
and shutting down executors that had never received any tasks.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bf281305
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bf281305
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bf281305

Branch: refs/heads/1.3.x
Commit: bf281305cc628d6acdc15b800f869ae9c876271e
Parents: 56f1de4
Author: Meng Zhu <m...@mesosphere.io>
Authored: Wed Feb 7 17:01:58 2018 -0800
Committer: Benjamin Mahler <bmah...@apache.org>
Committed: Mon Feb 12 15:21:39 2018 -0800

----------------------------------------------------------------------
 src/slave/constants.hpp |   8 +++
 src/slave/slave.cpp     | 118 +++++++++++++++++++++++++++++++++++--------
 src/slave/slave.hpp     |  24 +++++++++
 3 files changed, 129 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bf281305/src/slave/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index 398bf62..1b2037c 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -84,6 +84,14 @@ constexpr size_t MAX_COMPLETED_FRAMEWORKS = 50;
 constexpr size_t DEFAULT_MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK = 150;
 
 // Maximum number of completed tasks per executor to store in memory.
+//
+// NOTE: This should be greater than zero because the agent looks
+// for completed tasks to determine (with false positives) whether
+// an executor ever received tasks. See MESOS-8411.
+//
+// TODO(mzhu): Remove this note once we can determine whether an
+// executor ever received tasks without looking through the
+// completed tasks.
 constexpr size_t MAX_COMPLETED_TASKS_PER_EXECUTOR = 200;
 
 // Default cpus offered by the slave.

http://git-wip-us.apache.org/repos/asf/mesos/blob/bf281305/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 54e2782..93e996e 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2471,6 +2471,12 @@ void Slave::___run(
     return;
   }
 
+  // At this point, we must have either sent some tasks to the running
+  // executor or there are queued tasks that need to be delivered.
+  // Otherwise, the executor state would have been synchronously
+  // transitioned to TERMINATING when the queued tasks were killed.
+  CHECK(executor->everSentTask() || !executor->queuedTasks.empty());
+
   foreach (const TaskInfo& task, tasks) {
     // This is the case where the task is killed. No need to send
     // status update because it should be handled in 'killTask'.
@@ -2955,6 +2961,10 @@ void Slave::killTask(
         statusUpdate(update, UPID());
       }
 
+      // TODO(mzhu): Consider shutting down the executor here
+      // if all of its initial tasks are killed rather than
+      // waiting for it to register.
+
       break;
     }
     case Executor::TERMINATING:
@@ -3011,6 +3021,19 @@ void Slave::killTask(
           // a later point in time, it won't get this task.
           statusUpdate(update, UPID());
         }
+
+        // Shutdown the executor if all of its initial tasks are killed.
+        // See MESOS-8411. This is a workaround for those executors (e.g.,
+        // command executor, default executor) that do not have a proper
+        // self terminating logic when they haven't received the task or
+        // task group within a timeout.
+        if (!executor->everSentTask() && executor->queuedTasks.empty()) {
+          LOG(WARNING) << "Shutting down executor " << *executor
+                       << " because it has never been sent a task and all of"
+                       << " its queued tasks have been killed before delivery";
+
+          _shutdownExecutor(framework, executor);
+        }
       } else {
         // Send a message to the executor and wait for
         // it to send us a status update.
@@ -3638,15 +3661,10 @@ void Slave::subscribe(
       // those executors (e.g., command executor, default executor) that do not
       // have a proper self terminating logic when they haven't received the
       // task or task group within a timeout.
-      if (state != RECOVERING &&
-          executor->queuedTasks.empty() &&
-          executor->queuedTaskGroups.empty()) {
-        CHECK(executor->launchedTasks.empty())
-            << " Newly registered executor '" << executor->id
-            << "' has launched tasks";
-
-        LOG(WARNING) << "Shutting down the executor " << *executor
-                     << " because it has no tasks to run";
+      if (!executor->everSentTask() && executor->queuedTasks.empty()) {
+        LOG(WARNING) << "Shutting down subscribing executor " << *executor
+                     << " because it was never sent a task and"
+                     << " has no tasks to run";
 
         _shutdownExecutor(framework, executor);
 
@@ -3736,7 +3754,7 @@ void Slave::subscribe(
       // TODO(vinod): Consider checkpointing 'TaskInfo' instead of
       // 'Task' so that we can relaunch such tasks! Currently we don't
       // do it because 'TaskInfo.data' could be huge.
-      foreachvalue (Task* task, executor->launchedTasks) {
+      foreach (Task* task, executor->launchedTasks.values()) {
         if (task->state() == TASK_STAGING &&
             !unackedTasks.contains(task->task_id())) {
           mesos::TaskState newTaskState = TASK_DROPPED;
@@ -3880,12 +3898,8 @@ void Slave::registerExecutor(
       // this shutdown message, it is safe because the executor driver shuts
       // down the executor if it gets disconnected from the agent before
       // registration.
-      if (executor->queuedTasks.empty()) {
-        CHECK(executor->launchedTasks.empty())
-            << " Newly registered executor '" << executor->id
-            << "' has launched tasks";
-
-        LOG(WARNING) << "Shutting down the executor " << *executor
+      if (!executor->everSentTask() && executor->queuedTasks.empty()) {
+        LOG(WARNING) << "Shutting down registering executor " << *executor
                      << " because it has no tasks to run";
 
         _shutdownExecutor(framework, executor);
@@ -4094,7 +4108,7 @@ void Slave::reregisterExecutor(
       // TODO(vinod): Consider checkpointing 'TaskInfo' instead of
       // 'Task' so that we can relaunch such tasks! Currently we
       // don't do it because 'TaskInfo.data' could be huge.
-      foreachvalue (Task* task, executor->launchedTasks) {
+      foreach (Task* task, executor->launchedTasks.values()) {
         if (task->state() == TASK_STAGING &&
             !unackedTasks.contains(task->task_id())) {
           mesos::TaskState newTaskState = TASK_DROPPED;
@@ -4124,8 +4138,21 @@ void Slave::reregisterExecutor(
         }
       }
 
-      // TODO(vinod): Similar to what we do in `registerExecutor()` the 
executor
-      // should be shutdown if it hasn't received any tasks.
+      // Shutdown the executor if all of its initial tasks are killed.
+      // This is a workaround for those executors (e.g.,
+      // command executor, default executor) that do not have a proper
+      // self terminating logic when they haven't received the task or
+      // task group within a timeout.
+      if (!executor->everSentTask() && executor->queuedTasks.empty()) {
+        LOG(WARNING) << "Shutting down re-registering executor " << *executor
+                     << " because it has no tasks to run and"
+                     << " has never been sent a task";
+
+        _shutdownExecutor(framework, executor);
+
+        return;
+      }
+
       break;
     }
     default:
@@ -4437,11 +4464,21 @@ void Slave::statusUpdate(StatusUpdate update, const 
Option<UPID>& pid)
   // do not need to send the container status and we must
   // synchronously transition the task to ensure that it is removed
   // from the queued tasks before the run task path continues.
+  //
+  // Also if the task is in `launchedTasks` but was dropped by the
+  // agent, we know that the task did not reach the executor. We
+  // will synchronously transition the task to ensure that the
+  // agent re-registration logic can call `everSentTask()` after
+  // dropping tasks.
   if (executor->queuedTasks.contains(status.task_id())) {
     CHECK(protobuf::isTerminalState(status.state()))
         << "Queued tasks can only be transitioned to terminal states";
 
     _statusUpdate(update, pid, executor->id, None());
+  } else if (executor->launchedTasks.contains(status.task_id()) &&
+            (status.state() == TASK_DROPPED || status.state() == TASK_LOST) &&
+            status.source() == TaskStatus::SOURCE_SLAVE) {
+    _statusUpdate(update, pid, executor->id, None());
   } else {
     // NOTE: If the executor sets the ContainerID inside the
     // ContainerStatus, that indicates that the Task this status update
@@ -7439,11 +7476,24 @@ Executor::Executor(
     checkpoint(_checkpoint),
     http(None()),
     pid(None()),
-    resources(_info.resources()),
-    completedTasks(MAX_COMPLETED_TASKS_PER_EXECUTOR)
+    resources(_info.resources())
 {
   CHECK_NOTNULL(slave);
 
+  // NOTE: This should be greater than zero because the agent looks
+  // for completed tasks to determine (with false positives) whether
+  // an executor ever received tasks. See MESOS-8411.
+  //
+  // TODO(mzhu): Remove this check once we can determine whether an
+  // executor ever received tasks without looking through the
+  // completed tasks.
+  static_assert(
+      MAX_COMPLETED_TASKS_PER_EXECUTOR > 0,
+      "Max completed tasks per executor should be greater than zero");
+
+  completedTasks =
+    boost::circular_buffer<shared_ptr<Task>>(MAX_COMPLETED_TASKS_PER_EXECUTOR);
+
   Result<string> executorPath =
     os::realpath(path::join(slave->flags.launcher_dir, MESOS_EXECUTOR));
 
@@ -7713,6 +7763,32 @@ bool Executor::incompleteTasks()
 }
 
 
+bool Executor::everSentTask() const
+{
+  if (!launchedTasks.empty()) {
+    return true;
+  }
+
+  foreachvalue (Task* task, terminatedTasks) {
+    foreach (const TaskStatus& status, task->statuses()) {
+      if (status.source() == TaskStatus::SOURCE_EXECUTOR) {
+        return true;
+      }
+    }
+  }
+
+  foreach (const shared_ptr<Task>& task, completedTasks) {
+    foreach (const TaskStatus& status, task->statuses()) {
+      if (status.source() == TaskStatus::SOURCE_EXECUTOR) {
+        return true;
+      }
+    }
+  }
+
+  return false;
+}
+
+
 bool Executor::isCommandExecutor() const
 {
   return commandExecutor;

http://git-wip-us.apache.org/repos/asf/mesos/blob/bf281305/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 0f4a53c..28305b3 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -966,6 +966,30 @@ struct Executor
   // Returns true if there are any queued/launched/terminated tasks.
   bool incompleteTasks();
 
+  // Returns true if the agent ever sent any tasks to this executor.
+  // More precisely, this function returns whether either:
+  //
+  //  (1) There are terminated/completed tasks with a
+  //      SOURCE_EXECUTOR status.
+  //
+  //  (2) `launchedTasks` is not empty.
+  //
+  // If this function returns false and there are no queued tasks,
+  // we probably (see TODO below) have killed or dropped all of its
+  // initial tasks, in which case the agent will shut down the executor.
+  //
+  // TODO(mzhu): Note however, that since we look through the cache
+  // of completed tasks, we can have false positives when a task
+  // that was delivered to the executor has been evicted from the
+  // completed task cache by tasks that have been killed by the
+  // agent before delivery. This should be fixed.
+  //
+  // NOTE: This function checks whether any tasks has ever been sent,
+  // this does not necessarily mean the executor has ever received
+  // any tasks. Specifically, tasks in `launchedTasks` may be dropped
+  // before received by the executor if the agent restarts.
+  bool everSentTask() const;
+
   // Sends a message to the connected executor.
   template <typename Message>
   void send(const Message& message)

Reply via email to