Repository: mesos Updated Branches: refs/heads/master 5ccf43e5a -> 526e1ee61
Split authorization from task validation. Review: https://reviews.apache.org/r/26817 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/526e1ee6 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/526e1ee6 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/526e1ee6 Branch: refs/heads/master Commit: 526e1ee615806e7f64b9bc137f6784b809aa4c55 Parents: 5ccf43e Author: Dominic Hamon <[email protected]> Authored: Thu Oct 16 11:47:43 2014 -0700 Committer: Dominic Hamon <[email protected]> Committed: Tue Oct 21 13:01:37 2014 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 86 +++++++++++++++++++++++++++++++++------------- src/master/master.hpp | 16 ++++++--- 2 files changed, 74 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/526e1ee6/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 0a5c9a3..be910d9 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -2316,10 +2316,10 @@ void Master::launchTasks( << " on slave " << *slave << " for framework " << *framework; - // Validate each task and launch if valid. - list<Future<Option<Error> > > futures; + // Validate each task. + vector<Option<Error>> validations; foreach (const TaskInfo& task, tasks) { - futures.push_back(validateTask(task, framework, slave, used)); + validations.push_back(validateTask(task, framework, slave, used)); // Add to pending tasks. // NOTE: We need to do this here after validation because of the @@ -2329,22 +2329,28 @@ void Master::launchTasks( stats.tasks[TASK_STAGING]++; } - // Wait for all the tasks to be validated. - // NOTE: We wait for all tasks because currently the allocator - // is expected to get 'resourcesRecovered()' once per 'launchTasks()'. + // Authorize each task. + list<Future<bool>> futures; + foreach (const TaskInfo& task, tasks) { + // TODO(dhamon): Only authorize if there's no validation error. + futures.push_back(authorizeTask(task, framework)); + } + + // Wait for all the tasks to be authorized. await(futures) .onAny(defer(self(), &Master::_launchTasks, - framework->id, + frameworkId, slaveId.get(), tasks, used, filters, + validations, lambda::_1)); } -Future<Option<Error> > Master::validateTask( +Option<Error> Master::validateTask( const TaskInfo& task, Framework* framework, Slave* slave, @@ -2381,9 +2387,17 @@ Future<Option<Error> > Master::validateTask( return Error(error.get().message); } + return None(); +} + + +Future<bool> Master::authorizeTask( + const TaskInfo& task, + Framework* framework) +{ if (authorizer.isNone()) { // Authorization is disabled. - return None(); + return true; } // Authorize the task. @@ -2407,10 +2421,7 @@ Future<Option<Error> > Master::validateTask( } request.mutable_users()->add_values(user); - return authorizer.get()->authorize(request).then( - lambda::bind(&_authorize, - "Not authorized to launch as user '" + user + "'", - lambda::_1)); + return authorizer.get()->authorize(request); } @@ -2482,10 +2493,12 @@ void Master::_launchTasks( const vector<TaskInfo>& tasks, const Resources& totalResources, const Filters& filters, - const Future<list<Future<Option<Error> > > >& validationErrors) + const vector<Option<Error>>& validations, + const Future<list<Future<bool>>>& authorizations) { - CHECK_READY(validationErrors); - CHECK_EQ(validationErrors.get().size(), tasks.size()); + CHECK_EQ(validations.size(), tasks.size()); + CHECK_READY(authorizations); + CHECK_EQ(authorizations.get().size(), tasks.size()); Framework* framework = getFramework(frameworkId); if (framework == NULL) { @@ -2524,8 +2537,11 @@ void Master::_launchTasks( Resources usedResources; // Accumulated resources used. size_t index = 0; - foreach (const Future<Option<Error> >& future, validationErrors.get()) { - const TaskInfo& task = tasks[index++]; + foreach (const Future<bool>& authorization, authorizations.get()) { + const TaskInfo& task = tasks[index]; + const Option<Error>& validation = validations[index]; + + ++index; // NOTE: The task will not be in 'pendingTasks' if 'killTask()' // for the task was called before we are here. @@ -2535,18 +2551,40 @@ void Master::_launchTasks( framework->pendingTasks.erase(task.task_id()); // Remove from pending tasks. - CHECK(!future.isDiscarded()); - if (future.isFailed() || future.get().isSome()) { - const string error = future.isFailed() - ? "Authorization failure: " + future.failure() - : future.get().get().message; + if (validation.isSome()) { + const StatusUpdate& update = protobuf::createStatusUpdate( + framework->id, + task.slave_id(), + task.task_id(), + TASK_LOST, + validation.get().message); + + metrics.tasks_lost++; + stats.tasks[TASK_LOST]++; + + forward(update, UPID(), framework); + + continue; + } + + CHECK(!authorization.isDiscarded()); + + if (authorization.isFailed() || !authorization.get()) { + string user = framework->info.user(); // Default user. + if (task.has_command() && task.command().has_user()) { + user = task.command().user(); + } else if (task.has_executor() && task.executor().command().has_user()) { + user = task.executor().command().user(); + } const StatusUpdate& update = protobuf::createStatusUpdate( framework->id, task.slave_id(), task.task_id(), TASK_LOST, - error); + authorization.isFailed() ? + "Authorization failure: " + authorization.failure() : + "Not authorized to launch as user '" + user + "'"); metrics.tasks_lost++; stats.tasks[TASK_LOST]++; http://git-wip-us.apache.org/repos/asf/mesos/blob/526e1ee6/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 14f1d0f..18898e9 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -346,16 +346,23 @@ protected: const std::vector<StatusUpdate>& updates, const process::Future<bool>& removed); - // Validates the task including authorization. + // Validates the task. // Returns None if the task is valid. // Returns Error if the task is invalid. - // Returns Failure if authorization returns 'Failure'. - process::Future<Option<Error> > validateTask( + Option<Error> validateTask( const TaskInfo& task, Framework* framework, Slave* slave, const Resources& totalResources); + // Authorizes the task. + // Returns true if task is authorized. + // Returns false if task is not authorized. + // Returns failure for transient authorization failures. + process::Future<bool> authorizeTask( + const TaskInfo& task, + Framework* framework); + // Launch a task from a task description. void launchTask(const TaskInfo& task, Framework* framework, Slave* slave); @@ -366,7 +373,8 @@ protected: const std::vector<TaskInfo>& tasks, const Resources& totalResources, const Filters& filters, - const process::Future<std::list<process::Future<Option<Error> > > >& f); + const std::vector<Option<Error>>& validations, + const process::Future<std::list<process::Future<bool>>>& authorizations); // Transitions the task, and recovers resources if the task becomes // terminal.
