This is an automated email from the ASF dual-hosted git repository. grag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit ef19f297be6c192f4d2cea0f9ed413a1dfaaf882 Author: Greg Mann <[email protected]> AuthorDate: Mon Jul 15 10:25:42 2019 -0700 Refactored the agent's task-killing code. This patch factors the code responsible for killing tasks out into two helper functions. This will facilitate the calling of this common code by the agent-draining handler. Review: https://reviews.apache.org/r/70899/ --- src/slave/slave.cpp | 133 +++++++++++++++++++++++++++++++--------------------- src/slave/slave.hpp | 19 +++++++- 2 files changed, 97 insertions(+), 55 deletions(-) diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index fc688dc..741c1f6 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -3670,10 +3670,6 @@ void Slave::killTask( return; } - CHECK(framework->state == Framework::RUNNING || - framework->state == Framework::TERMINATING) - << framework->state; - // We don't send a status update here because a terminating // framework cannot send acknowledgements. if (framework->state == Framework::TERMINATING) { @@ -3683,54 +3679,10 @@ void Slave::killTask( return; } - // If the task is pending, we send a TASK_KILLED immediately. - // This will trigger a synchronous removal of the pending task, - // which prevents it from being launched. - if (framework->isPending(taskId)) { - LOG(WARNING) << "Killing task " << taskId - << " of framework " << frameworkId - << " before it was launched"; - - Option<TaskGroupInfo> taskGroup = - framework->getTaskGroupForPendingTask(taskId); - - vector<StatusUpdate> updates; - if (taskGroup.isSome()) { - foreach (const TaskInfo& task, taskGroup->tasks()) { - updates.push_back(protobuf::createStatusUpdate( - frameworkId, - info.id(), - task.task_id(), - TASK_KILLED, - TaskStatus::SOURCE_SLAVE, - id::UUID::random(), - "A task within the task group was killed before" - " delivery to the executor", - TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH, - CHECK_NOTNONE( - framework->getExecutorIdForPendingTask(task.task_id())))); - } - } else { - updates.push_back(protobuf::createStatusUpdate( - frameworkId, - info.id(), - taskId, - TASK_KILLED, - TaskStatus::SOURCE_SLAVE, - id::UUID::random(), - "Killed before delivery to the executor", - TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH, - CHECK_NOTNONE( - framework->getExecutorIdForPendingTask(taskId)))); - } + CHECK(framework->state == Framework::RUNNING) << framework->state; - foreach (const StatusUpdate& update, updates) { - // NOTE: Sending a terminal update (TASK_KILLED) synchronously - // removes the task/task group from 'framework->pendingTasks' - // and 'framework->pendingTaskGroups', so that it will not be - // launched. - statusUpdate(update, UPID()); - } + if (framework->isPending(taskId)) { + killPendingTask(frameworkId, framework, taskId); return; } @@ -3763,6 +3715,80 @@ void Slave::killTask( return; } + kill(frameworkId, + framework, + executor, + taskId, + (killTaskMessage.has_kill_policy() + ? killTaskMessage.kill_policy() + : Option<KillPolicy>::none())); +} + + +void Slave::killPendingTask( + const FrameworkID& frameworkId, + Framework* framework, + const TaskID& taskId) +{ + LOG(WARNING) << "Killing task " << taskId + << " of framework " << frameworkId + << " before it was launched"; + + Option<TaskGroupInfo> taskGroup = + framework->getTaskGroupForPendingTask(taskId); + + vector<StatusUpdate> updates; + if (taskGroup.isSome()) { + foreach (const TaskInfo& task, taskGroup->tasks()) { + updates.push_back(protobuf::createStatusUpdate( + frameworkId, + info.id(), + task.task_id(), + TASK_KILLED, + TaskStatus::SOURCE_SLAVE, + id::UUID::random(), + "A task within the task group was killed before" + " delivery to the executor", + TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH, + CHECK_NOTNONE( + framework->getExecutorIdForPendingTask(task.task_id())))); + } + } else { + updates.push_back(protobuf::createStatusUpdate( + frameworkId, + info.id(), + taskId, + TASK_KILLED, + TaskStatus::SOURCE_SLAVE, + id::UUID::random(), + "Killed before delivery to the executor", + TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH, + CHECK_NOTNONE( + framework->getExecutorIdForPendingTask(taskId)))); + } + + foreach (const StatusUpdate& update, updates) { + // NOTE: Sending a terminal update (TASK_KILLED) synchronously + // removes the task/task group from 'framework->pendingTasks' + // and 'framework->pendingTaskGroups', so that it will not be + // launched. + statusUpdate(update, UPID()); + } +} + + +void Slave::kill( + const FrameworkID& frameworkId, + Framework* framework, + Executor* executor, + const TaskID& taskId, + const Option<KillPolicy>& killPolicy) +{ + // This function should only be called on tasks which are queued or launched, + // so both the framework and executor should always exist. + CHECK_NOTNULL(framework); + CHECK_NOTNULL(executor); + switch (executor->state) { case Executor::REGISTERING: { LOG(WARNING) << "Transitioning the state of task " << taskId @@ -3888,9 +3914,8 @@ void Slave::killTask( KillTaskMessage message; message.mutable_framework_id()->MergeFrom(frameworkId); message.mutable_task_id()->MergeFrom(taskId); - if (killTaskMessage.has_kill_policy()) { - message.mutable_kill_policy()->MergeFrom( - killTaskMessage.kill_policy()); + if (killPolicy.isSome()) { + message.mutable_kill_policy()->MergeFrom(killPolicy.get()); } executor->send(message); diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index dbcceed..58bdd2a 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -226,11 +226,28 @@ public: const std::vector<ResourceVersionUUID>& resourceVersionUuids, const Option<bool>& launchExecutor); - // Made 'virtual' for Slave mocking. + // Handler for the `KillTaskMessage`. Made 'virtual' for Slave mocking. virtual void killTask( const process::UPID& from, const KillTaskMessage& killTaskMessage); + // Helper to kill a pending task, which may or may not be associated with a + // valid `Executor` struct. + void killPendingTask( + const FrameworkID& frameworkId, + Framework* framework, + const TaskID& taskId); + + // Helper to kill a task belonging to a valid framework and executor. This + // function should be used to kill tasks which are queued or launched, but + // not tasks which are pending. + void kill( + const FrameworkID& frameworkId, + Framework* framework, + Executor* executor, + const TaskID& taskId, + const Option<KillPolicy>& killPolicy); + // Made 'virtual' for Slave mocking. virtual void shutdownExecutor( const process::UPID& from,
