Repository: mesos Updated Branches: refs/heads/master 0750549cb -> 937bf8a89
Refactored task launching in the master. This adds addTask(), which is similar to addSlave() and addFramework(). The benefit is that we allow the caller to add the task and have control over when the RunTaskMessage is sent. Review: https://reviews.apache.org/r/28364 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/937bf8a8 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/937bf8a8 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/937bf8a8 Branch: refs/heads/master Commit: 937bf8a89c642b1f49deec4bca3a3a55579df08c Parents: 0750549 Author: Vinod Kone <[email protected]> Authored: Tue Dec 2 14:24:48 2014 -0800 Committer: Benjamin Mahler <[email protected]> Committed: Tue Dec 2 14:39:08 2014 -0800 ---------------------------------------------------------------------- src/master/master.cpp | 46 +++++++++++++++++++++++----------------------- src/master/master.hpp | 12 ++++++------ 2 files changed, 29 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/937bf8a8/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 8fcda4b..c840d49 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -2422,15 +2422,15 @@ Future<bool> Master::authorizeTask( } -Resources Master::launchTask( +Resources Master::addTask( const TaskInfo& task, Framework* framework, Slave* slave) { CHECK_NOTNULL(framework); CHECK_NOTNULL(slave); - CHECK(slave->connected) << "Launching task " << task.task_id() - << " on disconnected slave " << *slave; + CHECK(slave->connected) << "Adding task " << task.task_id() + << " to disconnected slave " << *slave; // The resources consumed. Resources resources = task.resources(); @@ -2474,19 +2474,6 @@ Resources Master::launchTask( slave->addTask(t); framework->addTask(t); - // Tell the slave to launch the task! - LOG(INFO) << "Launching task " << task.task_id() - << " of framework " << *framework - << " with resources " << task.resources() - << " on slave " << *slave; - - RunTaskMessage message; - message.mutable_framework()->MergeFrom(framework->info); - message.mutable_framework_id()->MergeFrom(framework->id); - message.set_pid(framework->pid); - message.mutable_task()->MergeFrom(task); - send(slave->pid, message); - return resources; } @@ -2541,7 +2528,6 @@ void Master::_launchTasks( } Resources usedResources; // Accumulated resources used. - size_t index = 0; foreach (const Future<bool>& authorization, authorizations.get()) { const TaskInfo& task = tasks[index++]; @@ -2587,21 +2573,21 @@ void Master::_launchTasks( } // Validate the task. - const Option<Error>& validation = validateTask( + const Option<Error>& validationError = validateTask( task, framework, slave, totalResources, usedResources); - if (validation.isSome()) { + if (validationError.isSome()) { const StatusUpdate& update = protobuf::createStatusUpdate( framework->id, task.slave_id(), task.task_id(), TASK_ERROR, TaskStatus::SOURCE_MASTER, - validation.get().message, + validationError.get().message, TaskStatus::REASON_TASK_INVALID); metrics.tasks_error++; @@ -2612,9 +2598,24 @@ void Master::_launchTasks( continue; } - // Launch task. + // Add task. if (pending) { - usedResources += launchTask(task, framework, slave); + usedResources += addTask(task, framework, slave); + + // TODO(bmahler): Consider updating this log message to + // indicate when the executor is also being launched. + LOG(INFO) << "Launching task " << task.task_id() + << " of framework " << *framework + << " with resources " << task.resources() + << " on slave " << *slave; + + RunTaskMessage message; + message.mutable_framework()->MergeFrom(framework->info); + message.mutable_framework_id()->MergeFrom(framework->id); + message.set_pid(framework->pid); + message.mutable_task()->MergeFrom(task); + + send(slave->pid, message); } } @@ -3073,7 +3074,6 @@ void Master::reregisterSlave( return; } - if (slaves.removed.get(slaveInfo.id()).isSome()) { // To compensate for the case where a non-strict registrar is // being used, we explicitly deny removed slaves from http://git-wip-us.apache.org/repos/asf/mesos/blob/937bf8a8/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 79b9ba7..e6ed87d 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -364,12 +364,6 @@ protected: const TaskInfo& task, Framework* framework); - // Launch a task from a task description. - Resources launchTask( - const TaskInfo& task, - Framework* framework, - Slave* slave); - // 'launchTasks()' continuation. void _launchTasks( const FrameworkID& frameworkId, @@ -379,6 +373,12 @@ protected: const Filters& filters, const process::Future<std::list<process::Future<bool>>>& authorizations); + // Add the task and its executor (if not already running) to the + // framework and slave. Returns the resources consumed as a result, + // which includes resources for the task and its executor + // (if not already running). + Resources addTask(const TaskInfo& task, Framework* framework, Slave* slave); + // Transitions the task, and recovers resources if the task becomes // terminal. void updateTask(Task* task, const StatusUpdate& update);
