Fixed a bug relating to lingering executors [2/2].

A task-less v1 executor could linger if the agent restarts
before any of the executor's initial tasks got delivered.
This was because we checked for the executor having any
tasks running before we remove the dropped tasks.

This patch fixes this issue by checking whether the
executor should be shut down *after* we've removed the
tasks that were dropped during agent restart.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0ef1c35c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0ef1c35c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0ef1c35c

Branch: refs/heads/1.3.x
Commit: 0ef1c35c499c5d7f97b53c4b4c43b7a55e103043
Parents: bf28130
Author: Meng Zhu <m...@mesosphere.io>
Authored: Wed Feb 7 17:46:15 2018 -0800
Committer: Benjamin Mahler <bmah...@apache.org>
Committed: Mon Feb 12 15:21:58 2018 -0800

----------------------------------------------------------------------
 src/slave/slave.cpp | 135 ++++++++++++++++++++++++-----------------------
 1 file changed, 68 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0ef1c35c/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 93e996e..53934bc 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -128,6 +128,7 @@ using std::find;
 using std::list;
 using std::map;
 using std::ostringstream;
+using std::shared_ptr;
 using std::set;
 using std::string;
 using std::vector;
@@ -3656,34 +3657,6 @@ void Slave::subscribe(
         CHECK_SOME(os::touch(path));
       }
 
-      // Here, we kill the executor if it no longer has any task or task group
-      // to run (e.g., framework sent a `killTask()`). This is a workaround for
-      // those executors (e.g., command executor, default executor) that do not
-      // have a proper self terminating logic when they haven't received the
-      // task or task group within a timeout.
-      if (!executor->everSentTask() && executor->queuedTasks.empty()) {
-        LOG(WARNING) << "Shutting down subscribing executor " << *executor
-                     << " because it was never sent a task and"
-                     << " has no tasks to run";
-
-        _shutdownExecutor(framework, executor);
-
-        return;
-      }
-
-      // Tell executor it's registered and give it any queued tasks
-      // or task groups.
-      executor::Event event;
-      event.set_type(executor::Event::SUBSCRIBED);
-
-      executor::Event::Subscribed* subscribed = event.mutable_subscribed();
-      subscribed->mutable_executor_info()->CopyFrom(executor->info);
-      subscribed->mutable_framework_info()->MergeFrom(framework->info);
-      subscribed->mutable_slave_info()->CopyFrom(info);
-      subscribed->mutable_container_id()->CopyFrom(executor->containerId);
-
-      executor->send(event);
-
       // Handle all the pending updates.
       // The status update manager might have already checkpointed some
       // of these pending updates (for example, if the slave died right
@@ -3699,45 +3672,6 @@ void Slave::subscribe(
             None());
       }
 
-      // Update the resource limits for the container. Note that the
-      // resource limits include the currently queued tasks because we
-      // want the container to have enough resources to hold the
-      // upcoming tasks.
-      Resources resources = executor->resources;
-
-      foreachvalue (const TaskInfo& task, executor->queuedTasks) {
-        resources += task.resources();
-      }
-
-      // We maintain a copy of the tasks in `queuedTaskGroups` also in
-      // `queuedTasks`. Hence, we need to ensure that we don't send the same
-      // tasks to the executor twice.
-      LinkedHashMap<TaskID, TaskInfo> queuedTasks;
-      foreachpair (const TaskID& taskId,
-                   const TaskInfo& taskInfo,
-                   executor->queuedTasks) {
-        queuedTasks[taskId] = taskInfo;
-      }
-
-      foreach (const TaskGroupInfo& taskGroup, executor->queuedTaskGroups) {
-        foreach (const TaskInfo& task, taskGroup.tasks()) {
-          const TaskID& taskId = task.task_id();
-          if (queuedTasks.contains(taskId)) {
-            queuedTasks.erase(taskId);
-          }
-        }
-      }
-
-      containerizer->update(executor->containerId, resources)
-        .onAny(defer(self(),
-                     &Self::___run,
-                     lambda::_1,
-                     framework->id(),
-                     executor->id,
-                     executor->containerId,
-                     queuedTasks.values(),
-                     executor->queuedTaskGroups));
-
       hashmap<TaskID, TaskInfo> unackedTasks;
       foreach (const TaskInfo& task, subscribe.unacknowledged_tasks()) {
         unackedTasks[task.task_id()] = task;
@@ -3784,6 +3718,73 @@ void Slave::subscribe(
         }
       }
 
+      // Shutdown the executor if all of its initial tasks are killed.
+      // See MESOS-8411. This is a workaround for those executors (e.g.,
+      // command executor, default executor) that do not have a proper
+      // self terminating logic when they haven't received the task or
+      // task group within a timeout.
+      if (!executor->everSentTask() && executor->queuedTasks.empty()) {
+        LOG(WARNING) << "Shutting down executor " << *executor
+                     << " because it has never been sent a task and all of"
+                     << " its queued tasks have been killed before delivery";
+
+        _shutdownExecutor(framework, executor);
+
+        return;
+      }
+
+      // Tell executor it's registered and give it any queued tasks
+      // or task groups.
+      executor::Event event;
+      event.set_type(executor::Event::SUBSCRIBED);
+
+      executor::Event::Subscribed* subscribed = event.mutable_subscribed();
+      subscribed->mutable_executor_info()->CopyFrom(executor->info);
+      subscribed->mutable_framework_info()->MergeFrom(framework->info);
+      subscribed->mutable_slave_info()->CopyFrom(info);
+      subscribed->mutable_container_id()->CopyFrom(executor->containerId);
+
+      executor->send(event);
+
+      // We maintain a copy of the tasks in `queuedTaskGroups` also in
+      // `queuedTasks`. Hence, we need to ensure that we don't send the same
+      // tasks to the executor twice.
+      LinkedHashMap<TaskID, TaskInfo> queuedTasks;
+      foreachpair (const TaskID& taskId,
+                   const TaskInfo& taskInfo,
+                  executor->queuedTasks) {
+        queuedTasks[taskId] = taskInfo;
+      }
+
+      foreach (const TaskGroupInfo& taskGroup, executor->queuedTaskGroups) {
+        foreach (const TaskInfo& task, taskGroup.tasks()) {
+          const TaskID& taskId = task.task_id();
+          if (queuedTasks.contains(taskId)) {
+            queuedTasks.erase(taskId);
+          }
+        }
+      }
+
+      // Update the resource limits for the container. Note that the
+      // resource limits include the currently queued tasks because we
+      // want the container to have enough resources to hold the
+      // upcoming tasks.
+      Resources resources = executor->resources;
+
+      foreachvalue (const TaskInfo& task, executor->queuedTasks) {
+        resources += task.resources();
+      }
+
+      containerizer->update(executor->containerId, resources)
+        .onAny(defer(self(),
+                     &Self::___run,
+                     lambda::_1,
+                     framework->id(),
+                     executor->id,
+                     executor->containerId,
+                     queuedTasks.values(),
+                     executor->queuedTaskGroups));
+
       break;
     }
     default:

Reply via email to