This is an automated email from the ASF dual-hosted git repository.

qianzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 4688371cafed058890cc11c8aa8514db6b08bb2b
Author: Qian Zhang <zhq527...@gmail.com>
AuthorDate: Tue Dec 3 21:37:43 2019 +0800

    Set resource limits when launching executor container.
    
    Review: https://reviews.apache.org/r/71858
---
 src/slave/slave.cpp | 126 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 src/slave/slave.hpp |   5 +++
 2 files changed, 131 insertions(+)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index a914de4..f214560 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -188,6 +188,11 @@ static CommandInfo defaultExecutorCommandInfo(
     const Option<std::string>& user);
 
 
+// Sets the executor resource limit (the `limit` parameter) based on the 
resource
+// passed in (the `value` parameter).
+static void setLimit(Option<Value::Scalar>& limit, const Value::Scalar& value);
+
+
 Slave::Slave(const string& id,
              const slave::Flags& _flags,
              MasterDetector* _detector,
@@ -3255,6 +3260,7 @@ void Slave::__run(
           lambda::_1,
           frameworkId,
           executorInfo_,
+          computeExecutorLimits(executorInfo.resources(), tasks),
           taskGroup.isNone() ? task.get() : Option<TaskInfo>::none()));
   }
 
@@ -3635,6 +3641,7 @@ void Slave::launchExecutor(
     const Future<Option<Secret>>& authenticationToken,
     const FrameworkID& frameworkId,
     const ExecutorInfo& executorInfo,
+    const google::protobuf::Map<string, Value::Scalar>& executorLimits,
     const Option<TaskInfo>& taskInfo)
 {
   Framework* framework = getFramework(frameworkId);
@@ -3716,6 +3723,10 @@ void Slave::launchExecutor(
   *containerConfig.mutable_resources() = executorInfo.resources();
   containerConfig.set_directory(executor->directory);
 
+  if (!executorLimits.empty()) {
+    *containerConfig.mutable_limits() = executorLimits;
+  }
+
   if (executor->user.isSome()) {
     containerConfig.set_user(executor->user.get());
   }
@@ -9967,6 +9978,100 @@ void Slave::initializeResourceProviderManager(
 }
 
 
+google::protobuf::Map<string, Value::Scalar> Slave::computeExecutorLimits(
+    const Resources& executorResources,
+    const vector<TaskInfo>& tasks) const
+{
+  Option<Value::Scalar> executorCpuLimit, executorMemLimit;
+  Value::Scalar cpuRequest, memRequest;
+  foreach (const TaskInfo& task, tasks) {
+    // Count the task's CPU limit into the executor's CPU limit.
+    if (task.limits().count("cpus")) {
+      setLimit(executorCpuLimit, task.limits().at("cpus"));
+    } else {
+      Option<Value::Scalar> taskCpus =
+        Resources(task.resources()).get<Value::Scalar>("cpus");
+
+      if (taskCpus.isSome()) {
+        cpuRequest += taskCpus.get();
+      }
+    }
+
+    // Count the task's memory limit into the executor's memory limit.
+    if (task.limits().count("mem")) {
+      setLimit(executorMemLimit, task.limits().at("mem"));
+    } else {
+      Option<Value::Scalar> taskMem =
+        Resources(task.resources()).get<Value::Scalar>("mem");
+
+      if (taskMem.isSome()) {
+        memRequest += taskMem.get();
+      }
+    }
+  }
+
+  if (executorCpuLimit.isSome()) {
+    // Count the executor's CPU request into its CPU limit as well, this is to
+    // ensure the executor's CPU limit is always greater than its CPU request.
+    Option<Value::Scalar> executorCpus =
+      executorResources.get<Value::Scalar>("cpus");
+
+    if (executorCpus.isSome()) {
+      setLimit(executorCpuLimit, executorCpus.get());
+    }
+
+    // For the tasks which do not have CPU limit, count their CPU requests
+    // into the executor's CPU limit as well, this is also to ensure the
+    // executor's CPU limit is always greater than its CPU request. Please
+    // note that if the flag `cgroups_enable_cfs` is not enabled, we should
+    // not set the executor's CPU limit, otherwise the tasks which do not
+    // have CPU limit will be throttled implicitly by the executor's CPU limit.
+    if (cpuRequest.value() > 0) {
+#ifdef __linux__
+      if (flags.cgroups_enable_cfs) {
+        setLimit(executorCpuLimit, cpuRequest);
+      } else {
+        executorCpuLimit = None();
+      }
+#else
+      setLimit(executorCpuLimit, cpuRequest);
+#endif // __linux__
+    }
+  }
+
+  if (executorMemLimit.isSome()) {
+    // Count the executor's memory request into its memory limit as well,
+    // this is to ensure the executor's memory limit is always greater
+    // than its memory request.
+    Option<Value::Scalar> executorMem =
+      executorResources.get<Value::Scalar>("mem");
+
+    if (executorMem.isSome()) {
+      setLimit(executorMemLimit, executorMem.get());
+    }
+
+    // For the tasks which do not have memory limit, count their memory
+    // requests into the executor's memory limit as well, this is also
+    // to ensure the executor's memory limit is always greater than its
+    // memory request.
+    if (memRequest.value() > 0) {
+      setLimit(executorMemLimit, memRequest);
+    }
+  }
+
+  google::protobuf::Map<string, Value::Scalar> executorLimits;
+  if (executorCpuLimit.isSome()) {
+    executorLimits.insert({"cpus", executorCpuLimit.get()});
+  }
+
+  if (executorMemLimit.isSome()) {
+    executorLimits.insert({"mem", executorMemLimit.get()});
+  }
+
+  return executorLimits;
+}
+
+
 void Slave::updateDrainStatus()
 {
   if (drainConfig.isNone()) {
@@ -11367,6 +11472,27 @@ static CommandInfo defaultExecutorCommandInfo(
   return commandInfo;
 }
 
+
+static void setLimit(Option<Value::Scalar>& limit, const Value::Scalar& delta)
+{
+  if (limit.isSome() && std::isinf(limit->value())) {
+    // Just return if the limit is already infinite.
+    return;
+  }
+
+  Value::Scalar scalar;
+  if (limit.isNone() || std::isinf(delta.value())) {
+    // Set limit directly if it is the first time or the value to be
+    // added is infinite.
+    scalar.set_value(delta.value());
+  } else {
+    // Add the value into the limit.
+    scalar.set_value(limit->value() + delta.value());
+  }
+
+  limit = scalar;
+};
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 03279db..a5a367e 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -476,6 +476,7 @@ public:
       const process::Future<Option<Secret>>& authorizationToken,
       const FrameworkID& frameworkId,
       const ExecutorInfo& executorInfo,
+      const google::protobuf::Map<std::string, Value::Scalar>& executorLimits,
       const Option<TaskInfo>& taskInfo);
 
   void fileAttached(const process::Future<Nothing>& result,
@@ -778,6 +779,10 @@ private:
       const Flags& flags,
       const SlaveID& slaveId);
 
+  google::protobuf::Map<std::string, Value::Scalar> computeExecutorLimits(
+      const Resources& executorResources,
+      const std::vector<TaskInfo>& tasks) const;
+
   protobuf::master::Capabilities requiredMasterCapabilities;
 
   const Flags flags;

Reply via email to