This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 504af58dda64b73581b0398c83952421aea64d39
Author: Greg Mann <[email protected]>
AuthorDate: Fri Mar 20 10:35:37 2020 -0700

    Added master validation for task resource limits and shared cgroups.
    
    Review: https://reviews.apache.org/r/72216/
---
 src/master/validation.cpp | 230 ++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 224 insertions(+), 6 deletions(-)

diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 084f281..5b1bcb5 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -17,6 +17,7 @@
 #include "master/validation.hpp"
 
 #include <algorithm>
+#include <cmath>
 #include <iterator>
 #include <set>
 #include <string>
@@ -29,6 +30,7 @@
 #include <mesos/type_utils.hpp>
 
 #include <process/authenticator.hpp>
+#include <process/owned.hpp>
 
 #include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
@@ -45,6 +47,8 @@
 
 #include "master/master.hpp"
 
+using process::Owned;
+
 using process::http::authentication::Principal;
 
 using std::pair;
@@ -1540,6 +1544,77 @@ Option<Error> validateContainerInfo(const TaskInfo& task)
 }
 
 
+Option<Error> validateResourceLimits(
+    const TaskInfo& task,
+    Slave* slave)
+{
+  auto limits = task.limits();
+
+  if (!limits.empty()) {
+    if (!slave->capabilities.taskResourceLimits) {
+      return Error("Agent is not capable of handling task resource limits");
+    }
+
+    // Ensure that only "cpus" and "mem" are included.
+    const size_t cpuCount = limits.count("cpus");
+    const size_t memCount = limits.count("mem");
+
+    if (limits.size() > cpuCount + memCount) {
+      return Error(
+          "Only cpus and mem may be included in a task's resource limits");
+    }
+
+    if (cpuCount) {
+      Option<double> taskCpus = Resources(task.resources()).cpus();
+      if (taskCpus.isNone()) {
+        return Error(
+            "When a CPU limit is specified, a CPU request must also be "
+            "specified");
+      }
+
+      if (limits.at("cpus").value() < taskCpus.get()) {
+        return Error(
+            "The cpu limit must be greater than or equal to the cpu request");
+      }
+    }
+
+    if (memCount) {
+      Option<Bytes> taskMem = Resources(task.resources()).mem();
+      if (taskMem.isNone()) {
+        return Error(
+            "When a memory limit is specified, a memory request must also be "
+            "specified");
+      }
+
+      if (!std::isinf(limits.at("mem").value()) &&
+          Bytes(limits.at("mem").value(), Bytes::MEGABYTES) < taskMem.get()) {
+        return Error(
+            "The memory limit must be greater"
+            " than or equal to the memory request");
+      }
+    }
+  }
+
+  return None();
+}
+
+
+// This validation function should only be executed for tasks which are 
launched
+// via the LAUNCH operation, not the LAUNCH_GROUP operation.
+Option<Error> validateShareCgroups(const TaskInfo& task)
+{
+  if (task.has_container() &&
+      task.container().has_linux_info() &&
+      task.container().linux_info().has_share_cgroups() &&
+      !task.container().linux_info().share_cgroups()) {
+    return Error(
+        "Only tasks in a task group may have 'share_cgroups' set to 'false'");
+  }
+
+  return None();
+}
+
+
 // Validates task specific fields except its executor (if it exists).
 Option<Error> validateTask(
     const TaskInfo& task,
@@ -1561,7 +1636,8 @@ Option<Error> validateTask(
     lambda::bind(internal::validateHealthCheck, task),
     lambda::bind(internal::validateResources, task),
     lambda::bind(internal::validateCommandInfo, task),
-    lambda::bind(internal::validateContainerInfo, task)
+    lambda::bind(internal::validateContainerInfo, task),
+    lambda::bind(internal::validateResourceLimits, task, slave)
   };
 
   foreach (const lambda::function<Option<Error>()>& validator, validators) {
@@ -1659,6 +1735,15 @@ Option<Error> validateExecutor(
         << "in future releases.";
     }
 
+    if (executor.has_container() &&
+        executor.container().has_linux_info() &&
+        executor.container().linux_info().has_share_cgroups() &&
+        executor.container().linux_info().share_cgroups()) {
+      return Error(
+          "The 'share_cgroups' field cannot be set to 'true'"
+          " on executor containers");
+    }
+
     if (!slave->hasExecutor(framework->id(), task.executor().executor_id())) {
       total += executorResources;
     }
@@ -1698,7 +1783,8 @@ Option<Error> validate(
 
   vector<lambda::function<Option<Error>()>> validators = {
     lambda::bind(internal::validateTask, task, framework, slave),
-    lambda::bind(internal::validateExecutor, task, framework, slave, offered)
+    lambda::bind(internal::validateExecutor, task, framework, slave, offered),
+    lambda::bind(internal::validateShareCgroups, task)
   };
 
   foreach (const lambda::function<Option<Error>()>& validator, validators) {
@@ -1751,6 +1837,15 @@ Option<Error> validateTask(
     }
   }
 
+  if (!task.limits().empty() &&
+      !(task.has_container() &&
+        task.container().has_linux_info() &&
+        !task.container().linux_info().share_cgroups())) {
+    return Error(
+        "Resource limits may only be set for tasks within a task group when "
+        "the 'share_cgroups' field is set to 'false'.");
+  }
+
   return None();
 }
 
@@ -1901,6 +1996,117 @@ Option<Error> validateExecutor(
   return None();
 }
 
+
+Option<Error> validateShareCgroups(
+    const TaskGroupInfo& taskGroup,
+    const ExecutorInfo& executorInfo,
+    Framework* framework,
+    Slave* slave)
+{
+  if (executorInfo.has_container() &&
+      executorInfo.container().has_linux_info() &&
+      executorInfo.container().linux_info().has_share_cgroups() &&
+      executorInfo.container().linux_info().share_cgroups()) {
+    return Error(
+        "The 'share_cgroups' field cannot be set to 'true' on "
+        "executor containers");
+  }
+
+  // If any task in a task group has 'share_cgroups' set to 'false', then all
+  // tasks in the task group must have it set to 'false'. We use this local
+  // variable to track the value.
+  Option<bool> shareCgroups;
+
+  // Helper function to determine whether or not we've seen a task in this task
+  // group or under this executor with a different value of 'share_cgroups'.
+  auto validateShareCgroupsForTask = [](
+      Option<bool>& shareCgroups,
+      const Option<ContainerInfo>& container) -> Option<Error> {
+    // If the task does not have 'LinuxInfo' set, then we treat it as
+    // having 'share_cgroups==true' for validation purposes, since that is
+    // the default behavior.
+    bool taskShareCgroups =
+      (container.isSome() &&
+       container->has_linux_info() &&
+       container->linux_info().has_share_cgroups()) ?
+         container->linux_info().share_cgroups() :
+         true;
+
+    if (shareCgroups.isNone()) {
+      shareCgroups = taskShareCgroups;
+    } else if (taskShareCgroups != shareCgroups.get()) {
+      return Error(
+          "If set, the value of 'share_cgroups' must be the same for all "
+          "tasks in each task group and under a single executor");
+    }
+
+    return None();
+  };
+
+  foreach (const TaskInfo& task, taskGroup.tasks()) {
+    Option<Error> error = validateShareCgroupsForTask(
+        shareCgroups,
+        task.has_container() ?
+          task.container() :
+          Option<ContainerInfo>::none());
+
+    if (error.isSome()) {
+      return error;
+    }
+  }
+
+  CHECK_NOTNULL(framework);
+
+  // If this executor already exists, ensure that all tasks under it
+  // have the same value of 'share_cgroups'.
+  if (shareCgroups.isSome() &&
+      framework->executors.contains(slave->id) &&
+      framework->executors.at(slave->id)
+        .contains(executorInfo.executor_id())) {
+    foreachvalue (const Task* task, framework->tasks) {
+      CHECK_NOTNULL(task);
+
+      if (task->slave_id() == slave->id &&
+          task->has_executor_id() &&
+          task->executor_id() == executorInfo.executor_id()) {
+        Option<Error> error = validateShareCgroupsForTask(
+            shareCgroups,
+            task->has_container() ?
+              task->container() :
+              Option<ContainerInfo>::none());
+
+        if (error.isSome()) {
+          return error;
+        }
+      }
+    }
+
+    // Unreachable tasks are held in the `Framework` struct and in the `Slaves`
+    // struct. Rather than passing `Slaves` to this function to find 
unreachable
+    // tasks for only one agent, we look through all unreachable tasks via the
+    // `Framework` struct, which is already available.
+    foreachvalue (const Owned<Task>& task, framework->unreachableTasks) {
+      CHECK_NOTNULL(task);
+
+      if (task->slave_id() == slave->id &&
+          task->has_executor_id() &&
+          task->executor_id() == executorInfo.executor_id()) {
+        Option<Error> error = validateShareCgroupsForTask(
+            shareCgroups,
+            task->has_container() ?
+              task->container() :
+              Option<ContainerInfo>::none());
+
+        if (error.isSome()) {
+          return error;
+        }
+      }
+    }
+  }
+
+  return None();
+}
+
 } // namespace internal {
 
 
@@ -1922,11 +2128,23 @@ Option<Error> validate(
     }
   }
 
-  Option<Error> error =
-    internal::validateExecutor(taskGroup, executor, framework, slave, offered);
+  vector<lambda::function<Option<Error>()>> validators = {
+    lambda::bind(
+        internal::validateExecutor,
+        taskGroup,
+        executor,
+        framework,
+        slave,
+        offered),
+    lambda::bind(
+        internal::validateShareCgroups, taskGroup, executor, framework, slave)
+  };
 
-  if (error.isSome()) {
-    return error;
+  foreach (const lambda::function<Option<Error>()>& validator, validators) {
+    Option<Error> error = validator();
+    if (error.isSome()) {
+      return error;
+    }
   }
 
   return None();

Reply via email to