Perform task validation after authorization is done. Review: https://reviews.apache.org/r/27769
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a0d2b582 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a0d2b582 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a0d2b582 Branch: refs/heads/master Commit: a0d2b582e718c0bb84e23560f520075b92c1c984 Parents: e021a7b Author: Jie Yu <[email protected]> Authored: Fri Nov 7 18:09:11 2014 -0800 Committer: Jie Yu <[email protected]> Committed: Thu Nov 13 10:45:13 2014 -0800 ---------------------------------------------------------------------- src/master/master.cpp | 91 +++++++++++++++++++--------------------------- src/master/master.hpp | 5 +-- 2 files changed, 39 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/a0d2b582/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index fbf6375..00fb3e3 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1927,8 +1927,7 @@ struct UniqueTaskIDChecker : TaskInfoVisitor { const TaskID& taskId = task.task_id(); - if (framework.pendingTasks.contains(taskId) || - framework.tasks.contains(taskId)) { + if (framework.tasks.contains(taskId)) { return "Task has duplicate ID: " + taskId.value(); } return None(); @@ -2054,21 +2053,6 @@ struct ExecutorInfoChecker : TaskInfoVisitor if (slave.hasExecutor(framework.id, executorId)) { executorInfo = slave.executors.get(framework.id).get().get(executorId); - } else { - // See if any of the pending tasks have the same executor - // on the same slave. - // Note that picking the first matching executor is ok because - // all the matching executors have been added to - // 'framework.pendingTasks' after validation and hence have - // the same executor info. - foreachvalue (const TaskInfo& task_, framework.pendingTasks) { - if (task_.has_executor() && - task_.executor().executor_id() == executorId && - task_.slave_id() == task.slave_id()) { - executorInfo = task_.executor(); - break; - } - } } if (executorInfo.isSome() && !(task.executor() == executorInfo.get())) { @@ -2345,26 +2329,25 @@ void Master::launchTasks( << " on slave " << *slave << " for framework " << *framework; - // Validate each task. - vector<Option<Error>> validations; + // Authorize each task. A task is in 'framework->pendingTasks' + // before it is authorized. + list<Future<bool>> futures; foreach (const TaskInfo& task, tasks) { - validations.push_back(validateTask(task, framework, slave, used)); + futures.push_back(authorizeTask(task, framework)); // Add to pending tasks. - // NOTE: We need to do this here after validation because of the - // way task validators work. - framework->pendingTasks[task.task_id()] = task; + // NOTE: The task ID here hasn't been validated yet, but it + // doesn't matter. If the task ID is not valid, the task won't be + // launched anyway. If two tasks have the same ID, the second one + // will not be put into 'framework->pendingTasks', therefore will + // not be launched. + if (!framework->pendingTasks.contains(task.task_id())) { + framework->pendingTasks[task.task_id()] = task; + } stats.tasks[TASK_STAGING]++; } - // Authorize each task. - list<Future<bool>> futures; - foreach (const TaskInfo& task, tasks) { - // TODO(dhamon): Only authorize if there's no validation error. - futures.push_back(authorizeTask(task, framework)); - } - // Wait for all the tasks to be authorized. await(futures) .onAny(defer(self(), @@ -2374,7 +2357,6 @@ void Master::launchTasks( tasks, used, filters, - validations, lambda::_1)); } @@ -2522,10 +2504,8 @@ void Master::_launchTasks( const vector<TaskInfo>& tasks, const Resources& totalResources, const Filters& filters, - const vector<Option<Error>>& validations, const Future<list<Future<bool>>>& authorizations) { - CHECK_EQ(validations.size(), tasks.size()); CHECK_READY(authorizations); CHECK_EQ(authorizations.get().size(), tasks.size()); @@ -2571,10 +2551,7 @@ void Master::_launchTasks( size_t index = 0; foreach (const Future<bool>& authorization, authorizations.get()) { - const TaskInfo& task = tasks[index]; - const Option<Error>& validation = validations[index]; - - ++index; + const TaskInfo& task = tasks[index++]; // NOTE: The task will not be in 'pendingTasks' if 'killTask()' // for the task was called before we are here. @@ -2582,17 +2559,30 @@ void Master::_launchTasks( continue; } - framework->pendingTasks.erase(task.task_id()); // Remove from pending tasks. + // Remove from pending tasks. + framework->pendingTasks.erase(task.task_id()); + + // Check authorization result. + CHECK(!authorization.isDiscarded()); + + if (authorization.isFailed() || !authorization.get()) { + string user = framework->info.user(); // Default user. + if (task.has_command() && task.command().has_user()) { + user = task.command().user(); + } else if (task.has_executor() && task.executor().command().has_user()) { + user = task.executor().command().user(); + } - if (validation.isSome()) { const StatusUpdate& update = protobuf::createStatusUpdate( framework->id, task.slave_id(), task.task_id(), TASK_ERROR, TaskStatus::SOURCE_MASTER, - validation.get().message, - TaskStatus::REASON_TASK_INVALID); + authorization.isFailed() ? + "Authorization failure: " + authorization.failure() : + "Not authorized to launch as user '" + user + "'", + TaskStatus::REASON_TASK_UNAUTHORIZED); metrics.tasks_error++; stats.tasks[TASK_ERROR]++; @@ -2602,26 +2592,19 @@ void Master::_launchTasks( continue; } - CHECK(!authorization.isDiscarded()); - - if (authorization.isFailed() || !authorization.get()) { - string user = framework->info.user(); // Default user. - if (task.has_command() && task.command().has_user()) { - user = task.command().user(); - } else if (task.has_executor() && task.executor().command().has_user()) { - user = task.executor().command().user(); - } + // Validate the task. + const Option<Error>& validation = + validateTask(task, framework, slave, totalResources); + if (validation.isSome()) { const StatusUpdate& update = protobuf::createStatusUpdate( framework->id, task.slave_id(), task.task_id(), TASK_ERROR, TaskStatus::SOURCE_MASTER, - authorization.isFailed() ? - "Authorization failure: " + authorization.failure() : - "Not authorized to launch as user '" + user + "'", - TaskStatus::REASON_TASK_UNAUTHORIZED); + validation.get().message, + TaskStatus::REASON_TASK_INVALID); metrics.tasks_error++; stats.tasks[TASK_ERROR]++; http://git-wip-us.apache.org/repos/asf/mesos/blob/a0d2b582/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index b3bdec6..a5e8e08 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -370,7 +370,6 @@ protected: const std::vector<TaskInfo>& tasks, const Resources& totalResources, const Filters& filters, - const std::vector<Option<Error>>& validations, const process::Future<std::list<process::Future<bool>>>& authorizations); // Transitions the task, and recovers resources if the task becomes @@ -1172,8 +1171,8 @@ struct Framework process::Time reregisteredTime; process::Time unregisteredTime; - // Tasks that have not yet been launched because they are being - // validated (e.g., authorized). + // Tasks that have not yet been launched because they are currently + // being authorized. hashmap<TaskID, TaskInfo> pendingTasks; hashmap<TaskID, Task*> tasks;
