Repository: mesos Updated Branches: refs/heads/master 70ce35bb7 -> 4366d5510
Refactored the executor launch path. Refactored the executor launch path so to make the conditional logic clearer. After this there is just one place that adds and launches a new executor and we will subsequently be able to add error handling there. Review: https://reviews.apache.org/r/66704/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b848b09e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b848b09e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b848b09e Branch: refs/heads/master Commit: b848b09e1fc769e9524c11ab6135f585649441bc Parents: 70ce35b Author: James Peach <[email protected]> Authored: Fri Apr 20 08:56:36 2018 -0700 Committer: James Peach <[email protected]> Committed: Fri Apr 20 08:56:36 2018 -0700 ---------------------------------------------------------------------- src/slave/slave.cpp | 220 +++++++++++++++++++++++------------------------ 1 file changed, 109 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b848b09e/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index e5d6c3f..6885124 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -2891,136 +2891,134 @@ void Slave::__run( Executor* executor = framework->getExecutor(executorId); - if (launchExecutor.isNone()) { - // This is the legacy case where the master did not set the - // `launch_executor` flag. Executor will be launched if there is none. - if (executor == nullptr) { - executor = doLaunchExecutor(); - } - } else { + // If launchExecutor is NONE, this is the legacy case where the master + // did not set the `launch_executor` flag. Executor will be launched if + // there is none. + + if (launchExecutor.isSome()) { if (taskGroup.isNone() && task->has_command()) { // We are dealing with command task; a new command executor will be // launched. CHECK(executor == nullptr); - executor = doLaunchExecutor(); } else { // Master set the `launch_executor` flag and this is not a command task. - if (launchExecutor.get()) { - // Master requests launching a new executor. - if (executor == nullptr) { - executor = doLaunchExecutor(); - } else { - // Master requests launching executor but an executor still exits - // on the agent. In this case we will drop tasks. This could happen if - // the executor is already terminated on the agent (and agent has sent - // out the `ExitedExecutorMessage` and it was received by the master) - // but the agent is still waiting for all the status updates to be - // acked before removing the executor struct. - - // We report TASK_DROPPED to the framework because the task was - // never launched. For non-partition-aware frameworks, we report - // TASK_LOST for backward compatibility. - mesos::TaskState taskState = TASK_DROPPED; - if (!protobuf::frameworkHasCapability( - frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) { - taskState = TASK_LOST; - } - - foreach (const TaskInfo& _task, tasks) { - const StatusUpdate update = protobuf::createStatusUpdate( - frameworkId, - info.id(), - _task.task_id(), - taskState, - TaskStatus::SOURCE_SLAVE, - id::UUID::random(), - "Master wants to launch executor, but there already exits one", - TaskStatus::REASON_EXECUTOR_TERMINATED, - executorId); - - statusUpdate(update, UPID()); - } + if (launchExecutor.get() && executor != nullptr) { + // Master requests launching executor but an executor still exits + // on the agent. In this case we will drop tasks. This could happen if + // the executor is already terminated on the agent (and agent has sent + // out the `ExitedExecutorMessage` and it was received by the master) + // but the agent is still waiting for all the status updates to be + // acked before removing the executor struct. + + // We report TASK_DROPPED to the framework because the task was + // never launched. For non-partition-aware frameworks, we report + // TASK_LOST for backward compatibility. + mesos::TaskState taskState = TASK_DROPPED; + if (!protobuf::frameworkHasCapability( + frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) { + taskState = TASK_LOST; + } - // Master expects a new executor to be launched for this task(s). - // To keep the master executor entries updated, the agent needs to - // send `ExitedExecutorMessage` even though no executor launched. - if (executor->state == Executor::TERMINATED) { - sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); - } else { - // This could happen if the following sequence of events happen: - // - // (1) Master sends `runTaskMessage` to agent with - // `launch_executor = true`; - // - // (2) Before the agent got the `runTaskMessage`, it reconnects and - // reconciles with the master. Master then removes the executor - // entry it asked the agent to launch in step (1); - // - // (3) Agent got the `runTaskMessage` sent in step (1), launches - // the task and the executor (that the master does not know - // about). - // - // (4) Master now sends another `runTaskMessage` for the same - // executor id with `launch_executor = true`. - // - // The agent ends up with a lingering executor that the master does - // not know about. We will shutdown the executor. - // - // TODO(mzhu): This could be avoided if the agent can - // tell whether the master's message was sent before or after the - // reconnection and discard the message in the former case. - // - // TODO(mzhu): Master needs to do proper executor reconciliation - // with the agent to avoid this from happening. - _shutdownExecutor(framework, executor); - } + foreach (const TaskInfo& _task, tasks) { + const StatusUpdate update = protobuf::createStatusUpdate( + frameworkId, + info.id(), + _task.task_id(), + taskState, + TaskStatus::SOURCE_SLAVE, + id::UUID::random(), + "Master wants to launch executor, but there already exits one", + TaskStatus::REASON_EXECUTOR_TERMINATED, + executorId); - return; + statusUpdate(update, UPID()); } - } else { - // Master does not want to launch executor. - if (executor == nullptr) { - // Master wants no new executor launched and there is none running on - // the agent. This could happen if the task expects some previous - // tasks to launch the executor. However, the earlier task got killed - // or dropped hence did not launch the executor but the master doesn't - // know about it yet because the `ExitedExecutorMessage` is still in - // flight. In this case, we will drop the task. + + // Master expects a new executor to be launched for this task(s). + // To keep the master executor entries updated, the agent needs to + // send `ExitedExecutorMessage` even though no executor launched. + if (executor->state == Executor::TERMINATED) { + sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); + } else { + // This could happen if the following sequence of events happen: // - // We report TASK_DROPPED to the framework because the task was - // never launched. For non-partition-aware frameworks, we report - // TASK_LOST for backward compatibility. - mesos::TaskState taskState = TASK_DROPPED; - if (!protobuf::frameworkHasCapability( - frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) { - taskState = TASK_LOST; - } + // (1) Master sends `runTaskMessage` to agent with + // `launch_executor = true`; + // + // (2) Before the agent got the `runTaskMessage`, it reconnects and + // reconciles with the master. Master then removes the executor + // entry it asked the agent to launch in step (1); + // + // (3) Agent got the `runTaskMessage` sent in step (1), launches + // the task and the executor (that the master does not know + // about). + // + // (4) Master now sends another `runTaskMessage` for the same + // executor id with `launch_executor = true`. + // + // The agent ends up with a lingering executor that the master does + // not know about. We will shutdown the executor. + // + // TODO(mzhu): This could be avoided if the agent can + // tell whether the master's message was sent before or after the + // reconnection and discard the message in the former case. + // + // TODO(mzhu): Master needs to do proper executor reconciliation + // with the agent to avoid this from happening. + _shutdownExecutor(framework, executor); + } - foreach (const TaskInfo& _task, tasks) { - const StatusUpdate update = protobuf::createStatusUpdate( - frameworkId, - info.id(), - _task.task_id(), - taskState, - TaskStatus::SOURCE_SLAVE, - id::UUID::random(), - "No executor is expected to launch and there is none running", - TaskStatus::REASON_EXECUTOR_TERMINATED, - executorId); + return; + } - statusUpdate(update, UPID()); - } + if (!launchExecutor.get() && executor == nullptr) { + // Master wants no new executor launched and there is none running on + // the agent. This could happen if the task expects some previous + // tasks to launch the executor. However, the earlier task got killed + // or dropped hence did not launch the executor but the master doesn't + // know about it yet because the `ExitedExecutorMessage` is still in + // flight. In this case, we will drop the task. + // + // We report TASK_DROPPED to the framework because the task was + // never launched. For non-partition-aware frameworks, we report + // TASK_LOST for backward compatibility. + mesos::TaskState taskState = TASK_DROPPED; + if (!protobuf::frameworkHasCapability( + frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) { + taskState = TASK_LOST; + } - // We do not send `ExitedExecutorMessage` here because the expectation - // is that there is already one on the fly to master. If the message - // gets dropped, we will hopefully reconcile with the master later. + foreach (const TaskInfo& _task, tasks) { + const StatusUpdate update = protobuf::createStatusUpdate( + frameworkId, + info.id(), + _task.task_id(), + taskState, + TaskStatus::SOURCE_SLAVE, + id::UUID::random(), + "No executor is expected to launch and there is none running", + TaskStatus::REASON_EXECUTOR_TERMINATED, + executorId); - return; + statusUpdate(update, UPID()); } + + // We do not send `ExitedExecutorMessage` here because the expectation + // is that there is already one on the fly to master. If the message + // gets dropped, we will hopefully reconcile with the master later. + + return; } } } + // Either the master explicitly requests launching a new executor + // or we are in the legacy case of launching one if there wasn't + // one already. Either way, let's launch executor now. + if (executor == nullptr) { + executor = doLaunchExecutor(); + } + CHECK_NOTNULL(executor); switch (executor->state) {
