Refactored sending a TASK_DROPPED status update.

Refactored how the agent sends a TASK_DROPPED status update to
remove code duplication. This also makes the agent consistently
send the executor ID in all these status updates.

Review: https://reviews.apache.org/r/66708/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4366d551
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4366d551
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4366d551

Branch: refs/heads/master
Commit: 4366d55104b467cb2767d60128e1ee713708433e
Parents: 7132365
Author: James Peach <[email protected]>
Authored: Fri Apr 20 08:57:23 2018 -0700
Committer: James Peach <[email protected]>
Committed: Fri Apr 20 08:57:23 2018 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 184 +++++++++++++----------------------------------
 1 file changed, 48 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4366d551/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 455e3cc..9d2d192 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2536,6 +2536,34 @@ void Slave::__run(
 
   const ExecutorID& executorId = executorInfo.executor_id();
 
+  // We report TASK_DROPPED to the framework because the task was
+  // never launched. For non-partition-aware frameworks, we report
+  // TASK_LOST for backward compatibility.
+  auto sendTaskDroppedUpdate =
+    [&](TaskStatus::Reason reason, const string& message) {
+      mesos::TaskState taskState = TASK_DROPPED;
+
+      if (!protobuf::frameworkHasCapability(
+              frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
+        taskState = TASK_LOST;
+      }
+
+      foreach (const TaskInfo& _task, tasks) {
+        const StatusUpdate update = protobuf::createStatusUpdate(
+            frameworkId,
+            info.id(),
+            _task.task_id(),
+            taskState,
+            TaskStatus::SOURCE_SLAVE,
+            id::UUID::random(),
+            message,
+            reason,
+            executorId);
+
+        statusUpdate(update, UPID());
+      }
+    };
+
   // We don't send a status update here because a terminating
   // framework cannot send acknowledgements.
   if (framework->state == Framework::TERMINATING) {
@@ -2654,29 +2682,9 @@ void Slave::__run(
   }
 
   if (kill) {
-    // We report TASK_DROPPED to the framework because the task was
-    // never launched. For non-partition-aware frameworks, we report
-    // TASK_LOST for backward compatibility.
-    mesos::TaskState taskState = TASK_DROPPED;
-    if (!protobuf::frameworkHasCapability(
-            frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
-      taskState = TASK_LOST;
-    }
-
-    foreach (const TaskInfo& _task, tasks) {
-      const StatusUpdate update = protobuf::createStatusUpdate(
-          frameworkId,
-          info.id(),
-          _task.task_id(),
-          taskState,
-          TaskStatus::SOURCE_SLAVE,
-          id::UUID::random(),
-          "Tasks assumes outdated resource state",
-          TaskStatus::REASON_INVALID_OFFERS,
-          executorId);
-
-      statusUpdate(update, UPID());
-    }
+    sendTaskDroppedUpdate(
+        TaskStatus::REASON_INVALID_OFFERS,
+        "Task assumes outdated resource state");
 
     // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.
@@ -2732,29 +2740,10 @@ void Slave::__run(
   }
 
   if (kill) {
-    // We report TASK_DROPPED to the framework because the task was
-    // never launched. For non-partition-aware frameworks, we report
-    // TASK_LOST for backward compatibility.
-    mesos::TaskState taskState = TASK_DROPPED;
-    if (!protobuf::frameworkHasCapability(
-            frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
-      taskState = TASK_LOST;
-    }
-
-    foreach (const TaskInfo& _task, tasks) {
-      const StatusUpdate update = protobuf::createStatusUpdate(
-          frameworkId,
-          info.id(),
-          _task.task_id(),
-          taskState,
-          TaskStatus::SOURCE_SLAVE,
-          id::UUID::random(),
-          "The checkpointed resources being used by the task or task group are 
"
-          "unknown to the agent",
-          TaskStatus::REASON_RESOURCES_UNKNOWN);
-
-      statusUpdate(update, UPID());
-    }
+    sendTaskDroppedUpdate(
+        TaskStatus::REASON_RESOURCES_UNKNOWN,
+        "The checkpointed resources being used by the task or task group are "
+        "unknown to the agent");
 
     // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.
@@ -2795,30 +2784,10 @@ void Slave::__run(
   }
 
   if (kill) {
-    // We report TASK_DROPPED to the framework because the task was
-    // never launched. For non-partition-aware frameworks, we report
-    // TASK_LOST for backward compatibility.
-    mesos::TaskState taskState = TASK_DROPPED;
-    if (!protobuf::frameworkHasCapability(
-            frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
-      taskState = TASK_LOST;
-    }
-
-    foreach (const TaskInfo& _task, tasks) {
-      const StatusUpdate update = protobuf::createStatusUpdate(
-          frameworkId,
-          info.id(),
-          _task.task_id(),
-          taskState,
-          TaskStatus::SOURCE_SLAVE,
-          id::UUID::random(),
-          "The checkpointed resources being used by the executor are unknown "
-          "to the agent",
-          TaskStatus::REASON_RESOURCES_UNKNOWN,
-          executorId);
-
-      statusUpdate(update, UPID());
-    }
+    sendTaskDroppedUpdate(
+        TaskStatus::REASON_RESOURCES_UNKNOWN,
+        "The checkpointed resources being used by the executor are unknown "
+        "to the agent");
 
     // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.
@@ -2887,29 +2856,9 @@ void Slave::__run(
         // but the agent is still waiting for all the status updates to be
         // acked before removing the executor struct.
 
-        // We report TASK_DROPPED to the framework because the task was
-        // never launched. For non-partition-aware frameworks, we report
-        // TASK_LOST for backward compatibility.
-        mesos::TaskState taskState = TASK_DROPPED;
-        if (!protobuf::frameworkHasCapability(
-            frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
-          taskState = TASK_LOST;
-        }
-
-        foreach (const TaskInfo& _task, tasks) {
-          const StatusUpdate update = protobuf::createStatusUpdate(
-              frameworkId,
-              info.id(),
-              _task.task_id(),
-              taskState,
-              TaskStatus::SOURCE_SLAVE,
-              id::UUID::random(),
-              "Master wants to launch executor, but there already exits one",
-              TaskStatus::REASON_EXECUTOR_TERMINATED,
-              executorId);
-
-          statusUpdate(update, UPID());
-        }
+        sendTaskDroppedUpdate(
+            TaskStatus::REASON_EXECUTOR_TERMINATED,
+            "Master wants to launch executor, but one already exists");
 
         // Master expects a new executor to be launched for this task(s).
         // To keep the master executor entries updated, the agent needs to
@@ -2955,30 +2904,10 @@ void Slave::__run(
         // or dropped hence did not launch the executor but the master doesn't
         // know about it yet because the `ExitedExecutorMessage` is still in
         // flight. In this case, we will drop the task.
-        //
-        // We report TASK_DROPPED to the framework because the task was
-        // never launched. For non-partition-aware frameworks, we report
-        // TASK_LOST for backward compatibility.
-        mesos::TaskState taskState = TASK_DROPPED;
-        if (!protobuf::frameworkHasCapability(
-            frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
-          taskState = TASK_LOST;
-        }
 
-        foreach (const TaskInfo& _task, tasks) {
-          const StatusUpdate update = protobuf::createStatusUpdate(
-              frameworkId,
-              info.id(),
-              _task.task_id(),
-              taskState,
-              TaskStatus::SOURCE_SLAVE,
-              id::UUID::random(),
-              "No executor is expected to launch and there is none running",
-              TaskStatus::REASON_EXECUTOR_TERMINATED,
-              executorId);
-
-          statusUpdate(update, UPID());
-        }
+        sendTaskDroppedUpdate(
+            TaskStatus::REASON_EXECUTOR_TERMINATED,
+            "No executor is expected to launch and there is none running");
 
         // We do not send `ExitedExecutorMessage` here because the expectation
         // is that there is already one on the fly to master. If the message
@@ -2998,26 +2927,9 @@ void Slave::__run(
     if (added.isError()) {
       CHECK(framework->getExecutor(executorId) == nullptr);
 
-      mesos::TaskState taskState = TASK_DROPPED;
-      if (!protobuf::frameworkHasCapability(
-          frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
-        taskState = TASK_LOST;
-      }
-
-      foreach (const TaskInfo& _task, tasks) {
-        const StatusUpdate update = protobuf::createStatusUpdate(
-            frameworkId,
-            info.id(),
-            _task.task_id(),
-            taskState,
-            TaskStatus::SOURCE_SLAVE,
-            id::UUID::random(),
-            added.error(),
-            TaskStatus::REASON_EXECUTOR_TERMINATED,
-            executorId);
-
-        statusUpdate(update, UPID());
-      }
+      sendTaskDroppedUpdate(
+          TaskStatus::REASON_EXECUTOR_TERMINATED,
+          added.error());
 
       // Refer to the comment after 'framework->removePendingTask' above
       // for why we need this.

Reply via email to