This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit ef19f297be6c192f4d2cea0f9ed413a1dfaaf882
Author: Greg Mann <[email protected]>
AuthorDate: Mon Jul 15 10:25:42 2019 -0700

    Refactored the agent's task-killing code.
    
    This patch factors the code responsible for killing tasks
    out into two helper functions. This will facilitate the
    calling of this common code by the agent-draining handler.
    
    Review: https://reviews.apache.org/r/70899/
---
 src/slave/slave.cpp | 133 +++++++++++++++++++++++++++++++---------------------
 src/slave/slave.hpp |  19 +++++++-
 2 files changed, 97 insertions(+), 55 deletions(-)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index fc688dc..741c1f6 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3670,10 +3670,6 @@ void Slave::killTask(
     return;
   }
 
-  CHECK(framework->state == Framework::RUNNING ||
-        framework->state == Framework::TERMINATING)
-    << framework->state;
-
   // We don't send a status update here because a terminating
   // framework cannot send acknowledgements.
   if (framework->state == Framework::TERMINATING) {
@@ -3683,54 +3679,10 @@ void Slave::killTask(
     return;
   }
 
-  // If the task is pending, we send a TASK_KILLED immediately.
-  // This will trigger a synchronous removal of the pending task,
-  // which prevents it from being launched.
-  if (framework->isPending(taskId)) {
-    LOG(WARNING) << "Killing task " << taskId
-                 << " of framework " << frameworkId
-                 << " before it was launched";
-
-    Option<TaskGroupInfo> taskGroup =
-      framework->getTaskGroupForPendingTask(taskId);
-
-    vector<StatusUpdate> updates;
-    if (taskGroup.isSome()) {
-      foreach (const TaskInfo& task, taskGroup->tasks()) {
-        updates.push_back(protobuf::createStatusUpdate(
-            frameworkId,
-            info.id(),
-            task.task_id(),
-            TASK_KILLED,
-            TaskStatus::SOURCE_SLAVE,
-            id::UUID::random(),
-            "A task within the task group was killed before"
-            " delivery to the executor",
-            TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
-            CHECK_NOTNONE(
-                framework->getExecutorIdForPendingTask(task.task_id()))));
-      }
-    } else {
-      updates.push_back(protobuf::createStatusUpdate(
-          frameworkId,
-          info.id(),
-          taskId,
-          TASK_KILLED,
-          TaskStatus::SOURCE_SLAVE,
-          id::UUID::random(),
-          "Killed before delivery to the executor",
-          TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
-          CHECK_NOTNONE(
-              framework->getExecutorIdForPendingTask(taskId))));
-    }
+  CHECK(framework->state == Framework::RUNNING) << framework->state;
 
-    foreach (const StatusUpdate& update, updates) {
-      // NOTE: Sending a terminal update (TASK_KILLED) synchronously
-      // removes the task/task group from 'framework->pendingTasks'
-      // and 'framework->pendingTaskGroups', so that it will not be
-      // launched.
-      statusUpdate(update, UPID());
-    }
+  if (framework->isPending(taskId)) {
+    killPendingTask(frameworkId, framework, taskId);
 
     return;
   }
@@ -3763,6 +3715,80 @@ void Slave::killTask(
     return;
   }
 
+  kill(frameworkId,
+       framework,
+       executor,
+       taskId,
+       (killTaskMessage.has_kill_policy()
+          ? killTaskMessage.kill_policy()
+          : Option<KillPolicy>::none()));
+}
+
+
+void Slave::killPendingTask(
+    const FrameworkID& frameworkId,
+    Framework* framework,
+    const TaskID& taskId)
+{
+  LOG(WARNING) << "Killing task " << taskId
+               << " of framework " << frameworkId
+               << " before it was launched";
+
+  Option<TaskGroupInfo> taskGroup =
+    framework->getTaskGroupForPendingTask(taskId);
+
+  vector<StatusUpdate> updates;
+  if (taskGroup.isSome()) {
+    foreach (const TaskInfo& task, taskGroup->tasks()) {
+      updates.push_back(protobuf::createStatusUpdate(
+          frameworkId,
+          info.id(),
+          task.task_id(),
+          TASK_KILLED,
+          TaskStatus::SOURCE_SLAVE,
+          id::UUID::random(),
+          "A task within the task group was killed before"
+          " delivery to the executor",
+          TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
+          CHECK_NOTNONE(
+              framework->getExecutorIdForPendingTask(task.task_id()))));
+    }
+  } else {
+    updates.push_back(protobuf::createStatusUpdate(
+        frameworkId,
+        info.id(),
+        taskId,
+        TASK_KILLED,
+        TaskStatus::SOURCE_SLAVE,
+        id::UUID::random(),
+        "Killed before delivery to the executor",
+        TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
+        CHECK_NOTNONE(
+            framework->getExecutorIdForPendingTask(taskId))));
+  }
+
+  foreach (const StatusUpdate& update, updates) {
+    // NOTE: Sending a terminal update (TASK_KILLED) synchronously
+    // removes the task/task group from 'framework->pendingTasks'
+    // and 'framework->pendingTaskGroups', so that it will not be
+    // launched.
+    statusUpdate(update, UPID());
+  }
+}
+
+
+void Slave::kill(
+    const FrameworkID& frameworkId,
+    Framework* framework,
+    Executor* executor,
+    const TaskID& taskId,
+    const Option<KillPolicy>& killPolicy)
+{
+  // This function should only be called on tasks which are queued or launched,
+  // so both the framework and executor should always exist.
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(executor);
+
   switch (executor->state) {
     case Executor::REGISTERING: {
       LOG(WARNING) << "Transitioning the state of task " << taskId
@@ -3888,9 +3914,8 @@ void Slave::killTask(
         KillTaskMessage message;
         message.mutable_framework_id()->MergeFrom(frameworkId);
         message.mutable_task_id()->MergeFrom(taskId);
-        if (killTaskMessage.has_kill_policy()) {
-          message.mutable_kill_policy()->MergeFrom(
-              killTaskMessage.kill_policy());
+        if (killPolicy.isSome()) {
+          message.mutable_kill_policy()->MergeFrom(killPolicy.get());
         }
 
         executor->send(message);
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index dbcceed..58bdd2a 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -226,11 +226,28 @@ public:
       const std::vector<ResourceVersionUUID>& resourceVersionUuids,
       const Option<bool>& launchExecutor);
 
-  // Made 'virtual' for Slave mocking.
+  // Handler for the `KillTaskMessage`. Made 'virtual' for Slave mocking.
   virtual void killTask(
       const process::UPID& from,
       const KillTaskMessage& killTaskMessage);
 
+  // Helper to kill a pending task, which may or may not be associated with a
+  // valid `Executor` struct.
+  void killPendingTask(
+      const FrameworkID& frameworkId,
+      Framework* framework,
+      const TaskID& taskId);
+
+  // Helper to kill a task belonging to a valid framework and executor. This
+  // function should be used to kill tasks which are queued or launched, but
+  // not tasks which are pending.
+  void kill(
+      const FrameworkID& frameworkId,
+      Framework* framework,
+      Executor* executor,
+      const TaskID& taskId,
+      const Option<KillPolicy>& killPolicy);
+
   // Made 'virtual' for Slave mocking.
   virtual void shutdownExecutor(
       const process::UPID& from,

Reply via email to