Made master set `launch_executor` in the RunTask(Group)Message. By setting a new field `launch_executor` in the RunTask(Group)Message, the master is able to control executor creation on the agent.
Also refactored the `addTask()` logic. Added two new functions: `isTaskLaunchExecutor()` checks if a task needs to launch an executor; `addExecutor()` adds an executor to the framework and slave. Review: https://reviews.apache.org/r/65504/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/10aa875d Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/10aa875d Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/10aa875d Branch: refs/heads/1.5.x Commit: 10aa875df8947f8cbfb318820101984d99259070 Parents: 08e0ceb Author: Meng Zhu <m...@mesosphere.io> Authored: Tue Feb 13 22:44:58 2018 -0800 Committer: Greg Mann <gregorywm...@gmail.com> Committed: Wed Feb 14 03:41:16 2018 -0800 ---------------------------------------------------------------------- src/master/master.cpp | 112 +++++++++++++++++++++++++++++++-------------- src/master/master.hpp | 19 ++++++-- 2 files changed, 92 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/10aa875d/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 2758000..2b093d6 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -3883,44 +3883,56 @@ Future<bool> Master::authorizeSlave(const Option<string>& principal) } -Resources Master::addTask( - const TaskInfo& task, +bool Master::isLaunchExecutor( + const ExecutorID& executorId, Framework* framework, - Slave* slave) + Slave* slave) const { CHECK_NOTNULL(framework); CHECK_NOTNULL(slave); - CHECK(slave->connected) << "Adding task " << task.task_id() - << " to disconnected agent " << *slave; - // The resources consumed. - Resources resources = task.resources(); + if (!slave->hasExecutor(framework->id(), executorId)) { + CHECK(!framework->hasExecutor(slave->id, executorId)) + << "Executor '" << executorId + << "' known to the framework " << *framework + << " but unknown to the agent " << *slave; + return true; + } + + return false; +} + - // Determine if this task launches an executor, and if so make sure - // the slave and framework state has been updated accordingly. +void Master::addExecutor( + const ExecutorInfo& executorInfo, + Framework* framework, + Slave* slave) +{ + CHECK_NOTNULL(framework); + CHECK_NOTNULL(slave); + CHECK(slave->connected) << "Adding executor " << executorInfo.executor_id() + << " to disconnected agent " << *slave; - if (task.has_executor()) { - // TODO(benh): Refactor this code into Slave::addTask. - if (!slave->hasExecutor(framework->id(), task.executor().executor_id())) { - CHECK(!framework->hasExecutor(slave->id, task.executor().executor_id())) - << "Executor '" << task.executor().executor_id() - << "' known to the framework " << *framework - << " but unknown to the agent " << *slave; + slave->addExecutor(framework->id(), executorInfo); + framework->addExecutor(slave->id, executorInfo); +} - slave->addExecutor(framework->id(), task.executor()); - framework->addExecutor(slave->id, task.executor()); - resources += task.executor().resources(); - } - } +void Master::addTask( + const TaskInfo& task, + Framework* framework, + Slave* slave) +{ + CHECK_NOTNULL(framework); + CHECK_NOTNULL(slave); + CHECK(slave->connected) << "Adding task " << task.task_id() + << " to disconnected agent " << *slave; // Add the task to the framework and slave. Task* t = new Task(protobuf::createTask(task, TASK_STAGING, framework->id())); slave->addTask(t); framework->addTask(t); - - return resources; } @@ -4953,7 +4965,23 @@ void Master::_accept( // Add task. if (pending) { - const Resources consumed = addTask(task, framework, slave); + Resources consumed; + + bool launchExecutor = true; + if (task.has_executor()) { + launchExecutor = isLaunchExecutor( + task.executor().executor_id(), framework, slave); + + // Master tracks the new executor only if the task is not a + // command task. + if (launchExecutor) { + addExecutor(task.executor(), framework, slave); + consumed += task.executor().resources(); + } + } + + addTask(task, framework, slave); + consumed += task.resources(); CHECK(available.contains(consumed)) << available << " does not contain " << consumed; @@ -4995,6 +5023,8 @@ void Master::_accept( message.set_pid(framework->pid.getOrElse(UPID())); message.mutable_task()->MergeFrom(task); + message.set_launch_executor(launchExecutor); + if (HookManager::hooksAvailable()) { // Set labels retrieved from label-decorator hooks. message.mutable_task()->mutable_labels()->CopyFrom( @@ -5013,11 +5043,11 @@ void Master::_accept( CHECK_SOME(downgradeResources(&message)); } - // TODO(bmahler): Consider updating this log message to - // indicate when the executor is also being launched. LOG(INFO) << "Launching task " << task.task_id() << " of framework " << *framework << " with resources " << task.resources() - << " on agent " << *slave; + << " on agent " << *slave << " on " + << (launchExecutor ? + " new executor" : " existing executor"); send(slave->pid, message); } @@ -5176,18 +5206,25 @@ void Master::_accept( set<TaskID> taskIds; Resources totalResources; + Resources executorResources; + + bool launchExecutor = + isLaunchExecutor(executor.executor_id(), framework, slave); + + if (launchExecutor) { + addExecutor(executor, framework, slave); + executorResources = executor.resources(); + totalResources += executorResources; + } + + message.set_launch_executor(launchExecutor); foreach ( TaskInfo& task, *message.mutable_task_group()->mutable_tasks()) { taskIds.insert(task.task_id()); totalResources += task.resources(); - const Resources consumed = addTask(task, framework, slave); - - CHECK(_offeredResources.contains(consumed)) - << _offeredResources << " does not contain " << consumed; - - _offeredResources -= consumed; + addTask(task, framework, slave); if (HookManager::hooksAvailable()) { // Set labels retrieved from label-decorator hooks. @@ -5199,6 +5236,11 @@ void Master::_accept( } } + CHECK(_offeredResources.contains(totalResources)) + << _offeredResources << " does not contain " << totalResources; + + _offeredResources -= totalResources; + // If the agent does not support reservation refinement, downgrade // the task and executor resources to the "pre-reservation-refinement" // format. This cannot contain any refined reservations since @@ -5210,7 +5252,9 @@ void Master::_accept( LOG(INFO) << "Launching task group " << stringify(taskIds) << " of framework " << *framework << " with resources " - << totalResources << " on agent " << *slave; + << totalResources - executorResources << " on agent " + << *slave << " on " + << (launchExecutor ? " new executor" : " existing executor"); send(slave->pid, message); http://git-wip-us.apache.org/repos/asf/mesos/blob/10aa875d/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index a94ef38..9030cad 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -845,11 +845,20 @@ protected: const Offer::Operation::Destroy& destroy, const Option<process::http::authentication::Principal>& principal); - // Add the task and its executor (if not already running) to the - // framework and slave. Returns the resources consumed as a result, - // which includes resources for the task and its executor - // (if not already running). - Resources addTask(const TaskInfo& task, Framework* framework, Slave* slave); + // Determine if a new executor needs to be launched. + bool isLaunchExecutor ( + const ExecutorID& executorId, + Framework* framework, + Slave* slave) const; + + // Add executor to the framework and slave. + void addExecutor( + const ExecutorInfo& executorInfo, + Framework* framework, + Slave* slave); + + // Add task to the framework and slave. + void addTask(const TaskInfo& task, Framework* framework, Slave* slave); // Transitions the task, and recovers resources if the task becomes // terminal.