This is an automated email from the ASF dual-hosted git repository. qianzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 4688371cafed058890cc11c8aa8514db6b08bb2b Author: Qian Zhang <zhq527...@gmail.com> AuthorDate: Tue Dec 3 21:37:43 2019 +0800 Set resource limits when launching executor container. Review: https://reviews.apache.org/r/71858 --- src/slave/slave.cpp | 126 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/slave/slave.hpp | 5 +++ 2 files changed, 131 insertions(+) diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index a914de4..f214560 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -188,6 +188,11 @@ static CommandInfo defaultExecutorCommandInfo( const Option<std::string>& user); +// Sets the executor resource limit (the `limit` parameter) based on the resource +// passed in (the `value` parameter). +static void setLimit(Option<Value::Scalar>& limit, const Value::Scalar& value); + + Slave::Slave(const string& id, const slave::Flags& _flags, MasterDetector* _detector, @@ -3255,6 +3260,7 @@ void Slave::__run( lambda::_1, frameworkId, executorInfo_, + computeExecutorLimits(executorInfo.resources(), tasks), taskGroup.isNone() ? task.get() : Option<TaskInfo>::none())); } @@ -3635,6 +3641,7 @@ void Slave::launchExecutor( const Future<Option<Secret>>& authenticationToken, const FrameworkID& frameworkId, const ExecutorInfo& executorInfo, + const google::protobuf::Map<string, Value::Scalar>& executorLimits, const Option<TaskInfo>& taskInfo) { Framework* framework = getFramework(frameworkId); @@ -3716,6 +3723,10 @@ void Slave::launchExecutor( *containerConfig.mutable_resources() = executorInfo.resources(); containerConfig.set_directory(executor->directory); + if (!executorLimits.empty()) { + *containerConfig.mutable_limits() = executorLimits; + } + if (executor->user.isSome()) { containerConfig.set_user(executor->user.get()); } @@ -9967,6 +9978,100 @@ void Slave::initializeResourceProviderManager( } +google::protobuf::Map<string, Value::Scalar> Slave::computeExecutorLimits( + const Resources& executorResources, + const vector<TaskInfo>& tasks) const +{ + Option<Value::Scalar> executorCpuLimit, executorMemLimit; + Value::Scalar cpuRequest, memRequest; + foreach (const TaskInfo& task, tasks) { + // Count the task's CPU limit into the executor's CPU limit. + if (task.limits().count("cpus")) { + setLimit(executorCpuLimit, task.limits().at("cpus")); + } else { + Option<Value::Scalar> taskCpus = + Resources(task.resources()).get<Value::Scalar>("cpus"); + + if (taskCpus.isSome()) { + cpuRequest += taskCpus.get(); + } + } + + // Count the task's memory limit into the executor's memory limit. + if (task.limits().count("mem")) { + setLimit(executorMemLimit, task.limits().at("mem")); + } else { + Option<Value::Scalar> taskMem = + Resources(task.resources()).get<Value::Scalar>("mem"); + + if (taskMem.isSome()) { + memRequest += taskMem.get(); + } + } + } + + if (executorCpuLimit.isSome()) { + // Count the executor's CPU request into its CPU limit as well, this is to + // ensure the executor's CPU limit is always greater than its CPU request. + Option<Value::Scalar> executorCpus = + executorResources.get<Value::Scalar>("cpus"); + + if (executorCpus.isSome()) { + setLimit(executorCpuLimit, executorCpus.get()); + } + + // For the tasks which do not have CPU limit, count their CPU requests + // into the executor's CPU limit as well, this is also to ensure the + // executor's CPU limit is always greater than its CPU request. Please + // note that if the flag `cgroups_enable_cfs` is not enabled, we should + // not set the executor's CPU limit, otherwise the tasks which do not + // have CPU limit will be throttled implicitly by the executor's CPU limit. + if (cpuRequest.value() > 0) { +#ifdef __linux__ + if (flags.cgroups_enable_cfs) { + setLimit(executorCpuLimit, cpuRequest); + } else { + executorCpuLimit = None(); + } +#else + setLimit(executorCpuLimit, cpuRequest); +#endif // __linux__ + } + } + + if (executorMemLimit.isSome()) { + // Count the executor's memory request into its memory limit as well, + // this is to ensure the executor's memory limit is always greater + // than its memory request. + Option<Value::Scalar> executorMem = + executorResources.get<Value::Scalar>("mem"); + + if (executorMem.isSome()) { + setLimit(executorMemLimit, executorMem.get()); + } + + // For the tasks which do not have memory limit, count their memory + // requests into the executor's memory limit as well, this is also + // to ensure the executor's memory limit is always greater than its + // memory request. + if (memRequest.value() > 0) { + setLimit(executorMemLimit, memRequest); + } + } + + google::protobuf::Map<string, Value::Scalar> executorLimits; + if (executorCpuLimit.isSome()) { + executorLimits.insert({"cpus", executorCpuLimit.get()}); + } + + if (executorMemLimit.isSome()) { + executorLimits.insert({"mem", executorMemLimit.get()}); + } + + return executorLimits; +} + + void Slave::updateDrainStatus() { if (drainConfig.isNone()) { @@ -11367,6 +11472,27 @@ static CommandInfo defaultExecutorCommandInfo( return commandInfo; } + +static void setLimit(Option<Value::Scalar>& limit, const Value::Scalar& delta) +{ + if (limit.isSome() && std::isinf(limit->value())) { + // Just return if the limit is already infinite. + return; + } + + Value::Scalar scalar; + if (limit.isNone() || std::isinf(delta.value())) { + // Set limit directly if it is the first time or the value to be + // added is infinite. + scalar.set_value(delta.value()); + } else { + // Add the value into the limit. + scalar.set_value(limit->value() + delta.value()); + } + + limit = scalar; +}; + } // namespace slave { } // namespace internal { } // namespace mesos { diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 03279db..a5a367e 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -476,6 +476,7 @@ public: const process::Future<Option<Secret>>& authorizationToken, const FrameworkID& frameworkId, const ExecutorInfo& executorInfo, + const google::protobuf::Map<std::string, Value::Scalar>& executorLimits, const Option<TaskInfo>& taskInfo); void fileAttached(const process::Future<Nothing>& result, @@ -778,6 +779,10 @@ private: const Flags& flags, const SlaveID& slaveId); + google::protobuf::Map<std::string, Value::Scalar> computeExecutorLimits( + const Resources& executorResources, + const std::vector<TaskInfo>& tasks) const; + protobuf::master::Capabilities requiredMasterCapabilities; const Flags flags;