This is an automated email from the ASF dual-hosted git repository. grag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 504af58dda64b73581b0398c83952421aea64d39 Author: Greg Mann <[email protected]> AuthorDate: Fri Mar 20 10:35:37 2020 -0700 Added master validation for task resource limits and shared cgroups. Review: https://reviews.apache.org/r/72216/ --- src/master/validation.cpp | 230 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 224 insertions(+), 6 deletions(-) diff --git a/src/master/validation.cpp b/src/master/validation.cpp index 084f281..5b1bcb5 100644 --- a/src/master/validation.cpp +++ b/src/master/validation.cpp @@ -17,6 +17,7 @@ #include "master/validation.hpp" #include <algorithm> +#include <cmath> #include <iterator> #include <set> #include <string> @@ -29,6 +30,7 @@ #include <mesos/type_utils.hpp> #include <process/authenticator.hpp> +#include <process/owned.hpp> #include <stout/foreach.hpp> #include <stout/hashmap.hpp> @@ -45,6 +47,8 @@ #include "master/master.hpp" +using process::Owned; + using process::http::authentication::Principal; using std::pair; @@ -1540,6 +1544,77 @@ Option<Error> validateContainerInfo(const TaskInfo& task) } +Option<Error> validateResourceLimits( + const TaskInfo& task, + Slave* slave) +{ + auto limits = task.limits(); + + if (!limits.empty()) { + if (!slave->capabilities.taskResourceLimits) { + return Error("Agent is not capable of handling task resource limits"); + } + + // Ensure that only "cpus" and "mem" are included. + const size_t cpuCount = limits.count("cpus"); + const size_t memCount = limits.count("mem"); + + if (limits.size() > cpuCount + memCount) { + return Error( + "Only cpus and mem may be included in a task's resource limits"); + } + + if (cpuCount) { + Option<double> taskCpus = Resources(task.resources()).cpus(); + if (taskCpus.isNone()) { + return Error( + "When a CPU limit is specified, a CPU request must also be " + "specified"); + } + + if (limits.at("cpus").value() < taskCpus.get()) { + return Error( + "The cpu limit must be greater than or equal to the cpu request"); + } + } + + if (memCount) { + Option<Bytes> taskMem = Resources(task.resources()).mem(); + if (taskMem.isNone()) { + return Error( + "When a memory limit is specified, a memory request must also be " + "specified"); + } + + if (!std::isinf(limits.at("mem").value()) && + Bytes(limits.at("mem").value(), Bytes::MEGABYTES) < taskMem.get()) { + return Error( + "The memory limit must be greater" + " than or equal to the memory request"); + } + } + } + + return None(); +} + + +// This validation function should only be executed for tasks which are launched +// via the LAUNCH operation, not the LAUNCH_GROUP operation. +Option<Error> validateShareCgroups(const TaskInfo& task) +{ + if (task.has_container() && + task.container().has_linux_info() && + task.container().linux_info().has_share_cgroups() && + !task.container().linux_info().share_cgroups()) { + return Error( + "Only tasks in a task group may have 'share_cgroups' set to 'false'"); + } + + return None(); +} + + // Validates task specific fields except its executor (if it exists). Option<Error> validateTask( const TaskInfo& task, @@ -1561,7 +1636,8 @@ Option<Error> validateTask( lambda::bind(internal::validateHealthCheck, task), lambda::bind(internal::validateResources, task), lambda::bind(internal::validateCommandInfo, task), - lambda::bind(internal::validateContainerInfo, task) + lambda::bind(internal::validateContainerInfo, task), + lambda::bind(internal::validateResourceLimits, task, slave) }; foreach (const lambda::function<Option<Error>()>& validator, validators) { @@ -1659,6 +1735,15 @@ Option<Error> validateExecutor( << "in future releases."; } + if (executor.has_container() && + executor.container().has_linux_info() && + executor.container().linux_info().has_share_cgroups() && + executor.container().linux_info().share_cgroups()) { + return Error( + "The 'share_cgroups' field cannot be set to 'true'" + " on executor containers"); + } + if (!slave->hasExecutor(framework->id(), task.executor().executor_id())) { total += executorResources; } @@ -1698,7 +1783,8 @@ Option<Error> validate( vector<lambda::function<Option<Error>()>> validators = { lambda::bind(internal::validateTask, task, framework, slave), - lambda::bind(internal::validateExecutor, task, framework, slave, offered) + lambda::bind(internal::validateExecutor, task, framework, slave, offered), + lambda::bind(internal::validateShareCgroups, task) }; foreach (const lambda::function<Option<Error>()>& validator, validators) { @@ -1751,6 +1837,15 @@ Option<Error> validateTask( } } + if (!task.limits().empty() && + !(task.has_container() && + task.container().has_linux_info() && + !task.container().linux_info().share_cgroups())) { + return Error( + "Resource limits may only be set for tasks within a task group when " + "the 'share_cgroups' field is set to 'false'."); + } + return None(); } @@ -1901,6 +1996,117 @@ Option<Error> validateExecutor( return None(); } + +Option<Error> validateShareCgroups( + const TaskGroupInfo& taskGroup, + const ExecutorInfo& executorInfo, + Framework* framework, + Slave* slave) +{ + if (executorInfo.has_container() && + executorInfo.container().has_linux_info() && + executorInfo.container().linux_info().has_share_cgroups() && + executorInfo.container().linux_info().share_cgroups()) { + return Error( + "The 'share_cgroups' field cannot be set to 'true' on " + "executor containers"); + } + + // If any task in a task group has 'share_cgroups' set to 'false', then all + // tasks in the task group must have it set to 'false'. We use this local + // variable to track the value. + Option<bool> shareCgroups; + + // Helper function to determine whether or not we've seen a task in this task + // group or under this executor with a different value of 'share_cgroups'. + auto validateShareCgroupsForTask = []( + Option<bool>& shareCgroups, + const Option<ContainerInfo>& container) -> Option<Error> { + // If the task does not have 'LinuxInfo' set, then we treat it as + // having 'share_cgroups==true' for validation purposes, since that is + // the default behavior. + bool taskShareCgroups = + (container.isSome() && + container->has_linux_info() && + container->linux_info().has_share_cgroups()) ? + container->linux_info().share_cgroups() : + true; + + if (shareCgroups.isNone()) { + shareCgroups = taskShareCgroups; + } else if (taskShareCgroups != shareCgroups.get()) { + return Error( + "If set, the value of 'share_cgroups' must be the same for all " + "tasks in each task group and under a single executor"); + } + + return None(); + }; + + foreach (const TaskInfo& task, taskGroup.tasks()) { + Option<Error> error = validateShareCgroupsForTask( + shareCgroups, + task.has_container() ? + task.container() : + Option<ContainerInfo>::none()); + + if (error.isSome()) { + return error; + } + } + + CHECK_NOTNULL(framework); + + // If this executor already exists, ensure that all tasks under it + // have the same value of 'share_cgroups'. + if (shareCgroups.isSome() && + framework->executors.contains(slave->id) && + framework->executors.at(slave->id) + .contains(executorInfo.executor_id())) { + foreachvalue (const Task* task, framework->tasks) { + CHECK_NOTNULL(task); + + if (task->slave_id() == slave->id && + task->has_executor_id() && + task->executor_id() == executorInfo.executor_id()) { + Option<Error> error = validateShareCgroupsForTask( + shareCgroups, + task->has_container() ? + task->container() : + Option<ContainerInfo>::none()); + + if (error.isSome()) { + return error; + } + } + } + + // Unreachable tasks are held in the `Framework` struct and in the `Slaves` + // struct. Rather than passing `Slaves` to this function to find unreachable + // tasks for only one agent, we look through all unreachable tasks via the + // `Framework` struct, which is already available. + foreachvalue (const Owned<Task>& task, framework->unreachableTasks) { + CHECK_NOTNULL(task); + + if (task->slave_id() == slave->id && + task->has_executor_id() && + task->executor_id() == executorInfo.executor_id()) { + Option<Error> error = validateShareCgroupsForTask( + shareCgroups, + task->has_container() ? + task->container() : + Option<ContainerInfo>::none()); + + if (error.isSome()) { + return error; + } + } + } + } + + return None(); +} + } // namespace internal { @@ -1922,11 +2128,23 @@ Option<Error> validate( } } - Option<Error> error = - internal::validateExecutor(taskGroup, executor, framework, slave, offered); + vector<lambda::function<Option<Error>()>> validators = { + lambda::bind( + internal::validateExecutor, + taskGroup, + executor, + framework, + slave, + offered), + lambda::bind( + internal::validateShareCgroups, taskGroup, executor, framework, slave) + }; - if (error.isSome()) { - return error; + foreach (const lambda::function<Option<Error>()>& validator, validators) { + Option<Error> error = validator(); + if (error.isSome()) { + return error; + } } return None();
