Enforced task launch order on the agent. Up until now, Mesos has not guaranteed that the agent will launch tasks in the same order in which they are received. There are two asynchronous steps (unschedule GC and task authorization) in the agent task launch path. Depending on the CPU scheduling order, a later task launch may finish these two steps earlier than its predecessors, resulting in out-of-order task delivery.
This patch enforces the task delivery order by sequencing task launch after the two asynchronous steps, specifically right before entering `__run()`. Review: https://reviews.apache.org/r/66144/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/493173a1 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/493173a1 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/493173a1 Branch: refs/heads/master Commit: 493173a1378fd9d3693fed09bb3bfb3149e32a55 Parents: e90ae4d Author: Meng Zhu <[email protected]> Authored: Thu Apr 5 17:44:29 2018 -0700 Committer: Greg Mann <[email protected]> Committed: Thu Apr 5 17:57:27 2018 -0700 ---------------------------------------------------------------------- src/slave/slave.cpp | 156 +++++++++++++++++++++++++++++++++++------------ src/slave/slave.hpp | 14 +++++ 2 files changed, 132 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/493173a1/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index cff7101..e5d6c3f 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -2206,10 +2206,12 @@ void Slave::run( return unschedules; }; - // Handle any unschedule GC failure. If unschedule GC succeeds, trigger - // the next continuations. + // `taskLaunch` encapsulates each task's launch steps from this point + // to the end of `_run` (the completion of task authorization). Future<Nothing> taskLaunch = collect(unschedules) + // Handle the failure iff unschedule GC fails. .repair(defer(self(), onUnscheduleGCFailure)) + // If unschedule GC succeeds, trigger the next continuation. .then(defer( self(), &Self::_run, @@ -2220,27 +2222,80 @@ void Slave::run( resourceVersionUuids, launchExecutor)); - taskLaunch - .onReady(defer( - self(), - &Self::__run, - frameworkInfo, - executorInfo, - task, - taskGroup, - resourceVersionUuids, - launchExecutor)) - .onFailed(defer(self(), [=](const string& failure) { - if (launchExecutor.isSome() && launchExecutor.get()) { - // Master expects new executor to be launched for this task launch. - // To keep the master executor entries updated, the agent needs to send - // 'ExitedExecutorMessage' even though no executor launched. - sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); - } + // Use a sequence to ensure that task launch order is preserved. + framework->taskLaunchSequences[executorId] + .add<Nothing>([taskLaunch]() -> Future<Nothing> { + // We use this sequence only to maintain the task launching order. If the + // sequence is deleted, we do not want the resulting discard event to + // propagate up the chain, which would prevent the previous `.then()` or + // `.repair()` callbacks from being invoked. Thus, we use `undiscardable` + // to protect each `taskLaunch`. + return undiscardable(taskLaunch); + }) + // We register `onAny` on the future returned by the sequence (referred to + // as `seqFuture` below). The following scenarios could happen: + // + // (1) `seqFuture` becomes ready. This happens when all previous tasks' + // `taskLaunch` futures are in non-pending state AND this task's own + // `taskLaunch` future is in ready state. The `onReady` call registered + // below will be triggered and continue the success path. + // + // (2) `seqFuture` becomes failed. This happens when all previous tasks' + // `taskLaunch` futures are in non-pending state AND this task's own + // `taskLaunch` future is in failed state (e.g. due to unschedule GC + // failure or some other failure). The `onFailed` call registered below + // will be triggered to handle the failure. + // + // (3) `seqFuture` becomes discarded. This happens when the sequence is + // destructed (see declaration of `taskLaunchSequences` on its lifecycle) + // while the `seqFuture` is still pending. In this case, we wait until + // this task's own `taskLaunch` future becomes non-pending and trigger + // callbacks accordingly. + // + // TODO(mzhu): In case (3), the destruction of the sequence means that the + // agent will eventually discover that the executor is absent and drop + // the task. While `__run` is capable of handling this case, it is more + // optimal to handle the failure earlier here rather than waiting for + // the `taskLaunch` transition and directing control to `__run`. + .onAny(defer(self(), [=](const Future<Nothing>&) { + // We only want to execute the following callbacks once the work performed + // in the `taskLaunch` chain is complete. Thus, we add them onto the + // `taskLaunch` chain rather than dispatching directly. + taskLaunch + .onReady(defer( + self(), + &Self::__run, + frameworkInfo, + executorInfo, + task, + taskGroup, + resourceVersionUuids, + launchExecutor)) + .onFailed(defer(self(), [=](const string& failure) { + Framework* _framework = getFramework(frameworkId); + if (_framework == nullptr) { + LOG(WARNING) << "Ignoring running " + << taskOrTaskGroup(task, taskGroup) + << " because the framework " << stringify(frameworkId) + << " does not exist"; + } + + if (launchExecutor.isSome() && launchExecutor.get()) { + // Master expects a new executor to be launched for this task(s). + // To keep the master executor entries updated, the agent needs to + // send `ExitedExecutorMessage` even though no executor launched. + sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); + + // See the declaration of `taskLaunchSequences` regarding its + // lifecycle management. + if (_framework != nullptr) { + _framework->taskLaunchSequences.erase(executorInfo.executor_id()); + } + } + })); })); - // TODO(mzhu): Consolidate error handling code in `__run` here with - // then/recover pattern. + // TODO(mzhu): Consolidate error handling code in `__run` here. } @@ -2377,10 +2432,8 @@ Future<Nothing> Slave::_run( }; return collect(authorizations) - .recover(defer(self(), + .repair(defer(self(), [=](const Future<list<bool>>& future) -> Future<list<bool>> { - CHECK(future.isFailed()); - Framework* _framework = getFramework(frameworkId); if (_framework == nullptr) { const string error = @@ -2469,10 +2522,13 @@ void Slave::__run( << " does not exist"; if (launchExecutor.isSome() && launchExecutor.get()) { - // Master expects new executor to be launched for this task(s) launch. + // Master expects a new executor to be launched for this task(s). // To keep the master executor entries updated, the agent needs to send - // 'ExitedExecutorMessage' even though no executor launched. + // `ExitedExecutorMessage` even though no executor launched. sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); + + // There is no need to clean up the task launch sequence here since + // the framework (along with the sequence) no longer exists. } return; @@ -2498,10 +2554,14 @@ void Slave::__run( } if (launchExecutor.isSome() && launchExecutor.get()) { - // Master expects new executor to be launched for this task(s) launch. + // Master expects a new executor to be launched for this task(s). // To keep the master executor entries updated, the agent needs to send - // 'ExitedExecutorMessage' even though no executor launched. + // `ExitedExecutorMessage` even though no executor launched. sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); + + // See the declaration of `taskLaunchSequences` regarding its lifecycle + // management. + framework->taskLaunchSequences.erase(executorInfo.executor_id()); } return; @@ -2530,10 +2590,14 @@ void Slave::__run( << " because it has been killed in the meantime"; if (launchExecutor.isSome() && launchExecutor.get()) { - // Master expects new executor to be launched for this task(s) launch. + // Master expects a new executor to be launched for this task(s). // To keep the master executor entries updated, the agent needs to send - // 'ExitedExecutorMessage' even though no executor launched. + // `ExitedExecutorMessage` even though no executor launched. sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); + + // See the declaration of `taskLaunchSequences` regarding its lifecycle + // management. + framework->taskLaunchSequences.erase(executorInfo.executor_id()); } return; @@ -2621,10 +2685,14 @@ void Slave::__run( } if (launchExecutor.isSome() && launchExecutor.get()) { - // Master expects new executor to be launched for this task(s) launch. + // Master expects a new executor to be launched for this task(s). // To keep the master executor entries updated, the agent needs to send - // 'ExitedExecutorMessage' even though no executor launched. + // `ExitedExecutorMessage` even though no executor launched. sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); + + // See the declaration of `taskLaunchSequences` regarding its lifecycle + // management. + framework->taskLaunchSequences.erase(executorInfo.executor_id()); } return; @@ -2695,10 +2763,14 @@ void Slave::__run( } if (launchExecutor.isSome() && launchExecutor.get()) { - // Master expects new executor to be launched for this task(s) launch. + // Master expects a new executor to be launched for this task(s). // To keep the master executor entries updated, the agent needs to send - // 'ExitedExecutorMessage' even though no executor launched. + // `ExitedExecutorMessage` even though no executor launched. sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); + + // See the declaration of `taskLaunchSequences` regarding its lifecycle + // management. + framework->taskLaunchSequences.erase(executorInfo.executor_id()); } return; @@ -2755,10 +2827,14 @@ void Slave::__run( } if (launchExecutor.isSome() && launchExecutor.get()) { - // Master expects new executor to be launched for this task(s) launch. + // Master expects a new executor to be launched for this task(s). // To keep the master executor entries updated, the agent needs to send - // 'ExitedExecutorMessage' even though no executor launched. + // `ExitedExecutorMessage` even though no executor launched. sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); + + // See the declaration of `taskLaunchSequences` regarding its lifecycle + // management. + framework->taskLaunchSequences.erase(executorInfo.executor_id()); } return; @@ -2865,9 +2941,9 @@ void Slave::__run( statusUpdate(update, UPID()); } - // Master expects new executor to be launched for this task(s) launch. + // Master expects a new executor to be launched for this task(s). // To keep the master executor entries updated, the agent needs to - // send 'ExitedExecutorMessage' even though no executor launched. + // send `ExitedExecutorMessage` even though no executor launched. if (executor->state == Executor::TERMINATED) { sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); } else { @@ -8946,6 +9022,10 @@ void Framework::destroyExecutor(const ExecutorID& executorId) Executor* executor = executors[executorId]; executors.erase(executorId); + // See the declaration of `taskLaunchSequences` regarding its + // lifecycle management. + taskLaunchSequences.erase(executorId); + // Pass ownership of the executor pointer. completedExecutors.push_back(Owned<Executor>(executor)); } http://git-wip-us.apache.org/repos/asf/mesos/blob/493173a1/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 0b812ef..d00c7b2 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -53,6 +53,7 @@ #include <process/process.hpp> #include <process/protobuf.hpp> #include <process/shared.hpp> +#include <process/sequence.hpp> #include <stout/boundedhashmap.hpp> #include <stout/bytes.hpp> @@ -1143,6 +1144,19 @@ public: // Executors with pending tasks. hashmap<ExecutorID, hashmap<TaskID, TaskInfo>> pendingTasks; + // Sequences in this map are used to enforce the order of tasks launched on + // each executor. + // + // Note on the lifecycle of the sequence: if the corresponding executor struct + // has not been created, we tie the lifecycle of the sequence to the first + // task in the sequence (which must have the `launch_executor` flag set to + // true modulo MESOS-3870). If the task fails to launch before creating the + // executor struct, we will delete the sequence. Once the executor struct is + // created, we tie the lifecycle of the sequence to the executor struct. + // + // TODO(mzhu): Create the executor struct early and put the sequence in it. + hashmap<ExecutorID, process::Sequence> taskLaunchSequences; + // Pending task groups. This is needed for correctly sending // TASK_KILLED status updates for all tasks in the group if // any of the tasks are killed while pending.
