Repository: mesos Updated Branches: refs/heads/master e021a7b8b -> a06da4e48
Moved all resources validation logic to ResourceUsageChecker. Review: https://reviews.apache.org/r/27843/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a06da4e4 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a06da4e4 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a06da4e4 Branch: refs/heads/master Commit: a06da4e48b58e99bb431bff6d16db502a2342026 Parents: a0d2b58 Author: Jie Yu <[email protected]> Authored: Mon Nov 10 14:59:01 2014 -0800 Committer: Jie Yu <[email protected]> Committed: Thu Nov 13 10:45:13 2014 -0800 ---------------------------------------------------------------------- src/master/master.cpp | 245 ++++++++++++++++--------------- src/master/master.hpp | 8 +- src/tests/resource_offers_tests.cpp | 3 +- 3 files changed, 135 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/a06da4e4/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 00fb3e3..0f89d1f 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1851,37 +1851,40 @@ void Master::resourceRequest( // We use the visitor pattern to abstract the process of performing // any validations, aggregations, etc. of tasks that a framework -// attempts to run within the resources provided by offers. A -// visitor can return an optional error (typedef'ed as an option of a -// string) which will cause the master to send a failed status update -// back to the framework for only that task description. An instance -// will be reused for each task description from same 'launchTasks()', -// but not for task descriptions from different offers. +// attempts to run within the resources provided by offers. A visitor +// can return an optional error (typedef'ed as an option of a string) +// which will cause the master to send a failed status update back to +// the framework for only that task description. An instance will be +// reused for each task description from same 'launchTasks()', but not +// for task descriptions from different offers. struct TaskInfoVisitor { virtual Option<Error> operator () ( const TaskInfo& task, - const Resources& resources, const Framework& framework, - const Slave& slave) = 0; + const Slave& slave, + const Resources& totalResources, + const Resources& usedResources) = 0; virtual ~TaskInfoVisitor() {} }; -// Checks that a task id is valid, i.e., contains only valid characters. +// Checks that a task id is valid, i.e., contains only valid +// characters. struct TaskIDChecker : TaskInfoVisitor { virtual Option<Error> operator () ( const TaskInfo& task, - const Resources& resources, const Framework& framework, - const Slave& slave) + const Slave& slave, + const Resources& totalResources, + const Resources& usedResources) { const string& id = task.task_id().value(); if (std::count_if(id.begin(), id.end(), invalid) > 0) { - return "TaskID '" + id + "' contains invalid characters"; + return Error("TaskID '" + id + "' contains invalid characters"); } return None(); @@ -1899,13 +1902,15 @@ struct SlaveIDChecker : TaskInfoVisitor { virtual Option<Error> operator () ( const TaskInfo& task, - const Resources& resources, const Framework& framework, - const Slave& slave) + const Slave& slave, + const Resources& totalResources, + const Resources& usedResources) { if (!(task.slave_id() == slave.id)) { - return "Task uses invalid slave " + task.slave_id().value() + - " while slave " + slave.id.value() + " is expected"; + return Error( + "Task uses invalid slave " + task.slave_id().value() + + " while slave " + slave.id.value() + " is expected"); } return None(); @@ -1921,15 +1926,17 @@ struct UniqueTaskIDChecker : TaskInfoVisitor { virtual Option<Error> operator () ( const TaskInfo& task, - const Resources& resources, const Framework& framework, - const Slave& slave) + const Slave& slave, + const Resources& totalResources, + const Resources& usedResources) { const TaskID& taskId = task.task_id(); if (framework.tasks.contains(taskId)) { - return "Task has duplicate ID: " + taskId.value(); + return Error("Task has duplicate ID: " + taskId.value()); } + return None(); } }; @@ -1945,43 +1952,38 @@ struct ResourceUsageChecker : TaskInfoVisitor { virtual Option<Error> operator () ( const TaskInfo& task, - const Resources& resources, const Framework& framework, - const Slave& slave) + const Slave& slave, + const Resources& totalResources, + const Resources& usedResources) { if (task.resources().size() == 0) { - return stringify("Task uses no resources"); + return Error("Task uses no resources"); } foreach (const Resource& resource, task.resources()) { if (!Resources::isAllocatable(resource)) { - return "Task uses invalid resources: " + stringify(resource); + return Error("Task uses invalid resources: " + stringify(resource)); } } - // Check if this task uses more resources than offered. - const Resources& taskResources = task.resources(); - - if (!(taskResources <= resources)) { - return "Task " + stringify(task.task_id()) + " attempted to use " + - stringify(taskResources) + " which is greater than offered " + - stringify(resources); - } - // Check this task's executor's resources. - if (task.has_executor()) { - const Resources& executorResources = task.executor().resources(); + Resources executorResources; - foreach (const Resource& resource, executorResources) { + if (task.has_executor()) { + foreach (const Resource& resource, task.executor().resources()) { if (!Resources::isAllocatable(resource)) { // TODO(benh): Send back the invalid resources? - return "Executor for task " + stringify(task.task_id()) + - " uses invalid resources " + stringify(resource); + return Error( + "Executor for task " + stringify(task.task_id()) + + " uses invalid resources " + stringify(resource)); } } - // Check minimal cpus and memory resources of executor - // and log warnings if not set. + executorResources = task.executor().resources(); + + // Check minimal cpus and memory resources of executor and log + // warnings if not set. // TODO(martin): MESOS-1807. Return Error instead of logging a // warning in 0.22.0. Option<double> cpus = executorResources.cpus(); @@ -1995,6 +1997,7 @@ struct ResourceUsageChecker : TaskInfoVisitor << "). Please update your executor, as this will be mandatory " << "in future releases."; } + Option<Bytes> mem = executorResources.mem(); if (mem.isNone() || mem.get() < MIN_MEM) { LOG(WARNING) @@ -2008,6 +2011,20 @@ struct ResourceUsageChecker : TaskInfoVisitor } } + // Check if resources needed by the task (and its executor in case + // the executor is new) are available. + Resources resources = task.resources(); + + if (!slave.hasExecutor(framework.id, task.executor().executor_id())) { + resources += task.executor().resources(); + } + + if (!(resources + usedResources <= totalResources)) { + return Error( + "Task uses more resources " + stringify(resources) + + " than available " + stringify(totalResources - usedResources)); + } + return None(); } }; @@ -2019,33 +2036,35 @@ struct ExecutorInfoChecker : TaskInfoVisitor { virtual Option<Error> operator () ( const TaskInfo& task, - const Resources& resources, const Framework& framework, - const Slave& slave) + const Slave& slave, + const Resources& totalResources, + const Resources& usedResources) { if (task.has_executor() == task.has_command()) { - return stringify( - "Task should have at least one (but not both) of CommandInfo or" - " ExecutorInfo present"); + return Error( + "Task should have at least one (but not both) of CommandInfo or " + "ExecutorInfo present"); } if (task.has_executor()) { - // The master currently expects ExecutorInfo.framework_id - // to be set even though it is an optional field. - // Currently, the scheduler driver ensures that the field - // is set. For schedulers not using the driver, we need to - // do the validation here. + // The master currently expects ExecutorInfo.framework_id to be + // set even though it is an optional field. Currently, the + // scheduler driver ensures that the field is set. For + // schedulers not using the driver, we need to do the validation + // here. // TODO(bmahler): Set this field in the master instead of // depending on the scheduler driver do it. if (!task.executor().has_framework_id()) { - return stringify( + return Error( "Task has invalid ExecutorInfo: missing field 'framework_id'"); } if (!(task.executor().framework_id() == framework.id)) { - return string("ExecutorInfo has an invalid FrameworkID") + - " (Actual: " + stringify(task.executor().framework_id()) + - " vs Expected: " + stringify(framework.id) + ")"; + return Error( + "ExecutorInfo has an invalid FrameworkID" + " (Actual: " + stringify(task.executor().framework_id()) + + " vs Expected: " + stringify(framework.id) + ")"); } const ExecutorID& executorId = task.executor().executor_id(); @@ -2056,15 +2075,16 @@ struct ExecutorInfoChecker : TaskInfoVisitor } if (executorInfo.isSome() && !(task.executor() == executorInfo.get())) { - return "Task has invalid ExecutorInfo (existing ExecutorInfo" - " with same ExecutorID is not compatible).\n" - "------------------------------------------------------------\n" - "Existing ExecutorInfo:\n" + - stringify(executorInfo.get()) + "\n" - "------------------------------------------------------------\n" - "Task's ExecutorInfo:\n" + - stringify(task.executor()) + "\n" - "------------------------------------------------------------\n"; + return Error( + "Task has invalid ExecutorInfo (existing ExecutorInfo" + " with same ExecutorID is not compatible).\n" + "------------------------------------------------------------\n" + "Existing ExecutorInfo:\n" + + stringify(executorInfo.get()) + "\n" + "------------------------------------------------------------\n" + "Task's ExecutorInfo:\n" + + stringify(task.executor()) + "\n" + "------------------------------------------------------------\n"); } } @@ -2079,21 +2099,24 @@ struct CheckpointChecker : TaskInfoVisitor { virtual Option<Error> operator () ( const TaskInfo& task, - const Resources& resources, const Framework& framework, - const Slave& slave) + const Slave& slave, + const Resources& totalResources, + const Resources& usedResources) { if (framework.info.checkpoint() && !slave.info.checkpoint()) { - return "Task asked to be checkpointed but slave " + - stringify(slave.id) + " has checkpointing disabled"; + return Error( + "Task asked to be checkpointed but slave " + + stringify(slave.id) + " has checkpointing disabled"); } + return None(); } }; -// OfferVisitors are similar to the TaskInfoVisitor pattern and -// are used for validation and aggregation of offers. +// OfferVisitors are similar to the TaskInfoVisitor pattern and are +// used for validation and aggregation of offers. // The error reporting scheme is also similar to TaskInfoVisitor. // However, offer processing (and subsequent task processing) is // aborted altogether if offer visitor reports an error. @@ -2150,9 +2173,10 @@ struct FrameworkChecker : OfferVisitor { } if (!(framework.id == offer->framework_id())) { - return "Offer " + stringify(offer->id()) + + return Error( + "Offer " + stringify(offer->id()) + " has invalid framework " + stringify(offer->framework_id()) + - " while framework " + stringify(framework.id) + " is expected"; + " while framework " + stringify(framework.id) + " is expected"); } return None(); @@ -2160,8 +2184,8 @@ struct FrameworkChecker : OfferVisitor { }; -// Checks that the slave is valid and ensures that all offers belong to -// the same slave. +// Checks that the slave is valid and ensures that all offers belong +// to the same slave. struct SlaveChecker : OfferVisitor { virtual Option<Error> operator () ( @@ -2171,7 +2195,7 @@ struct SlaveChecker : OfferVisitor { Offer* offer = getOffer(master, offerId); if (offer == NULL) { - return "Offer " + stringify(offerId) + " is no longer valid"; + return Error("Offer " + stringify(offerId) + " is no longer valid"); } Slave* slave = getSlave(master, offer->slave_id()); @@ -2190,10 +2214,11 @@ struct SlaveChecker : OfferVisitor // Set slave id and use as base case for validation. slaveId = slave->id; } else if (!(slave->id == slaveId.get())) { - return "Aggregated offers must belong to one single slave. Offer " + + return Error( + "Aggregated offers must belong to one single slave. Offer " + stringify(offerId) + " uses slave " + stringify(slave->id) + " and slave " + - stringify(slaveId.get()); + stringify(slaveId.get())); } return None(); @@ -2212,7 +2237,7 @@ struct UniqueOfferIDChecker : OfferVisitor Master* master) { if (offers.contains(offerId)) { - return "Duplicate offer " + stringify(offerId) + " in offer list"; + return Error("Duplicate offer " + stringify(offerId) + " in offer list"); } offers.insert(offerId); @@ -2243,6 +2268,7 @@ void Master::launchTasks( << "Ignoring launch tasks message for offers " << stringify(offerIds) << " of framework " << frameworkId << " because the framework cannot be found"; + return; } @@ -2252,6 +2278,7 @@ void Master::launchTasks( << " of framework " << frameworkId << " from '" << from << "' because it is not from the registered framework '" << framework->pid << "'"; + return; } @@ -2319,6 +2346,7 @@ void Master::launchTasks( forward(update, UPID(), framework); } + return; } @@ -2365,21 +2393,25 @@ Option<Error> Master::validateTask( const TaskInfo& task, Framework* framework, Slave* slave, - const Resources& totalResources) + const Resources& totalResources, + const Resources& usedResources) { CHECK_NOTNULL(framework); CHECK_NOTNULL(slave); - // Create task visitors. + // Create task visitors. The order in which the following checkers + // are executed does matter! For example, ResourceUsageChecker + // assumes that ExecutorInfo is valid which is verified by + // ExecutorInfoChecker. // TODO(vinod): Create the visitors on the stack and make the visit // operation const. list<Owned<TaskInfoVisitor>> taskVisitors; taskVisitors.push_back(Owned<TaskInfoVisitor>(new TaskIDChecker())); taskVisitors.push_back(Owned<TaskInfoVisitor>(new SlaveIDChecker())); taskVisitors.push_back(Owned<TaskInfoVisitor>(new UniqueTaskIDChecker())); - taskVisitors.push_back(Owned<TaskInfoVisitor>(new ResourceUsageChecker())); - taskVisitors.push_back(Owned<TaskInfoVisitor>(new ExecutorInfoChecker())); taskVisitors.push_back(Owned<TaskInfoVisitor>(new CheckpointChecker())); + taskVisitors.push_back(Owned<TaskInfoVisitor>(new ExecutorInfoChecker())); + taskVisitors.push_back(Owned<TaskInfoVisitor>(new ResourceUsageChecker())); // TODO(benh): Add a HealthCheckChecker visitor. @@ -2388,7 +2420,7 @@ Option<Error> Master::validateTask( // Invoke each visitor. Option<Error> error = None(); foreach (const Owned<TaskInfoVisitor>& visitor, taskVisitors) { - error = (*visitor)(task, totalResources, *framework, *slave); + error = (*visitor)(task, *framework, *slave, totalResources, usedResources); if (error.isSome()) { break; } @@ -2436,7 +2468,7 @@ Future<bool> Master::authorizeTask( } -void Master::launchTask( +Resources Master::launchTask( const TaskInfo& task, Framework* framework, Slave* slave) @@ -2446,6 +2478,9 @@ void Master::launchTask( CHECK(slave->connected) << "Launching task " << task.task_id() << " on disconnected slave " << *slave; + // The resources consumed. + Resources resources = task.resources(); + // Determine if this task launches an executor, and if so make sure // the slave and framework state has been updated accordingly. Option<ExecutorID> executorId; @@ -2460,6 +2495,8 @@ void Master::launchTask( slave->addExecutor(framework->id, task.executor()); framework->addExecutor(slave->id, task.executor()); + + resources += task.executor().resources(); } executorId = task.executor().executor_id(); @@ -2494,7 +2531,7 @@ void Master::launchTask( message.mutable_task()->MergeFrom(task); send(slave->pid, message); - return; + return resources; } @@ -2593,8 +2630,12 @@ void Master::_launchTasks( } // Validate the task. - const Option<Error>& validation = - validateTask(task, framework, slave, totalResources); + const Option<Error>& validation = validateTask( + task, + framework, + slave, + totalResources, + usedResources); if (validation.isSome()) { const StatusUpdate& update = protobuf::createStatusUpdate( @@ -2614,40 +2655,8 @@ void Master::_launchTasks( continue; } - // Check if resources needed by the task (and its executor in case - // the executor is new) are available. These resources will be - // added by 'launchTask()' below. - Resources resources = task.resources(); - if (task.has_executor() && - !slave->hasExecutor(framework->id, task.executor().executor_id())) { - resources += task.executor().resources(); - } - - if (!(usedResources + resources <= totalResources)) { - const string error = - "Task uses more resources " + stringify(resources) + - " than available " + stringify(totalResources - usedResources); - - const StatusUpdate& update = protobuf::createStatusUpdate( - framework->id, - task.slave_id(), - task.task_id(), - TASK_LOST, - TaskStatus::SOURCE_MASTER, - error, - TaskStatus::REASON_TASK_INVALID); - - metrics.tasks_lost++; - stats.tasks[TASK_LOST]++; - - forward(update, UPID(), framework); - - continue; - } - // Launch task. - launchTask(task, framework, slave); - usedResources += resources; + usedResources += launchTask(task, framework, slave); } // All used resources should be allocatable, enforced by our validators. http://git-wip-us.apache.org/repos/asf/mesos/blob/a06da4e4/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index a5e8e08..47f3bc9 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -350,7 +350,8 @@ protected: const TaskInfo& task, Framework* framework, Slave* slave, - const Resources& totalResources); + const Resources& totalResources, + const Resources& usedResources); // Authorizes the task. // Returns true if task is authorized. @@ -361,7 +362,10 @@ protected: Framework* framework); // Launch a task from a task description. - void launchTask(const TaskInfo& task, Framework* framework, Slave* slave); + Resources launchTask( + const TaskInfo& task, + Framework* framework, + Slave* slave); // 'launchTasks()' continuation. void _launchTasks( http://git-wip-us.apache.org/repos/asf/mesos/blob/a06da4e4/src/tests/resource_offers_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp index 3c78a21..21cb5ad 100644 --- a/src/tests/resource_offers_tests.cpp +++ b/src/tests/resource_offers_tests.cpp @@ -354,8 +354,9 @@ TEST_F(ResourceOffersTest, TaskUsesMoreResourcesThanOffered) EXPECT_EQ(TASK_ERROR, status.get().state()); EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason()); EXPECT_TRUE(status.get().has_message()); + EXPECT_TRUE(strings::contains( - status.get().message(), "greater than offered")); + status.get().message(), "Task uses more resources")); driver.stop(); driver.join();
