Refactored sending a TASK_DROPPED status update. Refactored how the agent sends a TASK_DROPPED status update to remove code duplication. This also makes the agent consistently send the executor ID in all these status updates.
Review: https://reviews.apache.org/r/66708/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4366d551 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4366d551 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4366d551 Branch: refs/heads/master Commit: 4366d55104b467cb2767d60128e1ee713708433e Parents: 7132365 Author: James Peach <[email protected]> Authored: Fri Apr 20 08:57:23 2018 -0700 Committer: James Peach <[email protected]> Committed: Fri Apr 20 08:57:23 2018 -0700 ---------------------------------------------------------------------- src/slave/slave.cpp | 184 +++++++++++++---------------------------------- 1 file changed, 48 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/4366d551/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 455e3cc..9d2d192 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -2536,6 +2536,34 @@ void Slave::__run( const ExecutorID& executorId = executorInfo.executor_id(); + // We report TASK_DROPPED to the framework because the task was + // never launched. For non-partition-aware frameworks, we report + // TASK_LOST for backward compatibility. + auto sendTaskDroppedUpdate = + [&](TaskStatus::Reason reason, const string& message) { + mesos::TaskState taskState = TASK_DROPPED; + + if (!protobuf::frameworkHasCapability( + frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) { + taskState = TASK_LOST; + } + + foreach (const TaskInfo& _task, tasks) { + const StatusUpdate update = protobuf::createStatusUpdate( + frameworkId, + info.id(), + _task.task_id(), + taskState, + TaskStatus::SOURCE_SLAVE, + id::UUID::random(), + message, + reason, + executorId); + + statusUpdate(update, UPID()); + } + }; + // We don't send a status update here because a terminating // framework cannot send acknowledgements. if (framework->state == Framework::TERMINATING) { @@ -2654,29 +2682,9 @@ void Slave::__run( } if (kill) { - // We report TASK_DROPPED to the framework because the task was - // never launched. For non-partition-aware frameworks, we report - // TASK_LOST for backward compatibility. - mesos::TaskState taskState = TASK_DROPPED; - if (!protobuf::frameworkHasCapability( - frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) { - taskState = TASK_LOST; - } - - foreach (const TaskInfo& _task, tasks) { - const StatusUpdate update = protobuf::createStatusUpdate( - frameworkId, - info.id(), - _task.task_id(), - taskState, - TaskStatus::SOURCE_SLAVE, - id::UUID::random(), - "Tasks assumes outdated resource state", - TaskStatus::REASON_INVALID_OFFERS, - executorId); - - statusUpdate(update, UPID()); - } + sendTaskDroppedUpdate( + TaskStatus::REASON_INVALID_OFFERS, + "Task assumes outdated resource state"); // Refer to the comment after 'framework->removePendingTask' above // for why we need this. @@ -2732,29 +2740,10 @@ void Slave::__run( } if (kill) { - // We report TASK_DROPPED to the framework because the task was - // never launched. For non-partition-aware frameworks, we report - // TASK_LOST for backward compatibility. - mesos::TaskState taskState = TASK_DROPPED; - if (!protobuf::frameworkHasCapability( - frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) { - taskState = TASK_LOST; - } - - foreach (const TaskInfo& _task, tasks) { - const StatusUpdate update = protobuf::createStatusUpdate( - frameworkId, - info.id(), - _task.task_id(), - taskState, - TaskStatus::SOURCE_SLAVE, - id::UUID::random(), - "The checkpointed resources being used by the task or task group are " - "unknown to the agent", - TaskStatus::REASON_RESOURCES_UNKNOWN); - - statusUpdate(update, UPID()); - } + sendTaskDroppedUpdate( + TaskStatus::REASON_RESOURCES_UNKNOWN, + "The checkpointed resources being used by the task or task group are " + "unknown to the agent"); // Refer to the comment after 'framework->removePendingTask' above // for why we need this. @@ -2795,30 +2784,10 @@ void Slave::__run( } if (kill) { - // We report TASK_DROPPED to the framework because the task was - // never launched. For non-partition-aware frameworks, we report - // TASK_LOST for backward compatibility. - mesos::TaskState taskState = TASK_DROPPED; - if (!protobuf::frameworkHasCapability( - frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) { - taskState = TASK_LOST; - } - - foreach (const TaskInfo& _task, tasks) { - const StatusUpdate update = protobuf::createStatusUpdate( - frameworkId, - info.id(), - _task.task_id(), - taskState, - TaskStatus::SOURCE_SLAVE, - id::UUID::random(), - "The checkpointed resources being used by the executor are unknown " - "to the agent", - TaskStatus::REASON_RESOURCES_UNKNOWN, - executorId); - - statusUpdate(update, UPID()); - } + sendTaskDroppedUpdate( + TaskStatus::REASON_RESOURCES_UNKNOWN, + "The checkpointed resources being used by the executor are unknown " + "to the agent"); // Refer to the comment after 'framework->removePendingTask' above // for why we need this. @@ -2887,29 +2856,9 @@ void Slave::__run( // but the agent is still waiting for all the status updates to be // acked before removing the executor struct. - // We report TASK_DROPPED to the framework because the task was - // never launched. For non-partition-aware frameworks, we report - // TASK_LOST for backward compatibility. - mesos::TaskState taskState = TASK_DROPPED; - if (!protobuf::frameworkHasCapability( - frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) { - taskState = TASK_LOST; - } - - foreach (const TaskInfo& _task, tasks) { - const StatusUpdate update = protobuf::createStatusUpdate( - frameworkId, - info.id(), - _task.task_id(), - taskState, - TaskStatus::SOURCE_SLAVE, - id::UUID::random(), - "Master wants to launch executor, but there already exits one", - TaskStatus::REASON_EXECUTOR_TERMINATED, - executorId); - - statusUpdate(update, UPID()); - } + sendTaskDroppedUpdate( + TaskStatus::REASON_EXECUTOR_TERMINATED, + "Master wants to launch executor, but one already exists"); // Master expects a new executor to be launched for this task(s). // To keep the master executor entries updated, the agent needs to @@ -2955,30 +2904,10 @@ void Slave::__run( // or dropped hence did not launch the executor but the master doesn't // know about it yet because the `ExitedExecutorMessage` is still in // flight. In this case, we will drop the task. - // - // We report TASK_DROPPED to the framework because the task was - // never launched. For non-partition-aware frameworks, we report - // TASK_LOST for backward compatibility. - mesos::TaskState taskState = TASK_DROPPED; - if (!protobuf::frameworkHasCapability( - frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) { - taskState = TASK_LOST; - } - foreach (const TaskInfo& _task, tasks) { - const StatusUpdate update = protobuf::createStatusUpdate( - frameworkId, - info.id(), - _task.task_id(), - taskState, - TaskStatus::SOURCE_SLAVE, - id::UUID::random(), - "No executor is expected to launch and there is none running", - TaskStatus::REASON_EXECUTOR_TERMINATED, - executorId); - - statusUpdate(update, UPID()); - } + sendTaskDroppedUpdate( + TaskStatus::REASON_EXECUTOR_TERMINATED, + "No executor is expected to launch and there is none running"); // We do not send `ExitedExecutorMessage` here because the expectation // is that there is already one on the fly to master. If the message @@ -2998,26 +2927,9 @@ void Slave::__run( if (added.isError()) { CHECK(framework->getExecutor(executorId) == nullptr); - mesos::TaskState taskState = TASK_DROPPED; - if (!protobuf::frameworkHasCapability( - frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) { - taskState = TASK_LOST; - } - - foreach (const TaskInfo& _task, tasks) { - const StatusUpdate update = protobuf::createStatusUpdate( - frameworkId, - info.id(), - _task.task_id(), - taskState, - TaskStatus::SOURCE_SLAVE, - id::UUID::random(), - added.error(), - TaskStatus::REASON_EXECUTOR_TERMINATED, - executorId); - - statusUpdate(update, UPID()); - } + sendTaskDroppedUpdate( + TaskStatus::REASON_EXECUTOR_TERMINATED, + added.error()); // Refer to the comment after 'framework->removePendingTask' above // for why we need this.
