This is an automated email from the ASF dual-hosted git repository.

qianzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 929932fc2bd753f097a26caea5b3e7f7f3ac9118
Author: Qian Zhang <[email protected]>
AuthorDate: Thu Apr 30 09:29:49 2020 +0800

    Updated Docker containerizer to set Docker container's resource limits.
    
    This is to ensure the resource limits of Docker container where custom
    executor runs can be correctly updated when a new task is launched or
    an existing task terminates. And the `resource` field in the `Container`
    struct is also renamed to `resourceRequests`.
    
    Review: https://reviews.apache.org/r/72391
---
 include/mesos/values.hpp           |   1 +
 src/common/values.cpp              |   6 +
 src/slave/containerizer/docker.cpp | 242 ++++++++++++++++++++++++-------------
 src/slave/containerizer/docker.hpp |  15 ++-
 4 files changed, 174 insertions(+), 90 deletions(-)

diff --git a/include/mesos/values.hpp b/include/mesos/values.hpp
index 27f71d1..9288503 100644
--- a/include/mesos/values.hpp
+++ b/include/mesos/values.hpp
@@ -27,6 +27,7 @@ namespace mesos {
 
 std::ostream& operator<<(std::ostream& stream, const Value::Scalar& scalar);
 bool operator==(const Value::Scalar& left, const Value::Scalar& right);
+bool operator!=(const Value::Scalar& left, const Value::Scalar& right);
 bool operator<(const Value::Scalar& left, const Value::Scalar& right);
 bool operator<=(const Value::Scalar& left, const Value::Scalar& right);
 bool operator>(const Value::Scalar& left, const Value::Scalar& right);
diff --git a/src/common/values.cpp b/src/common/values.cpp
index 7520382..d7bc91b 100644
--- a/src/common/values.cpp
+++ b/src/common/values.cpp
@@ -99,6 +99,12 @@ bool operator==(const Value::Scalar& left, const 
Value::Scalar& right)
 }
 
 
+bool operator!=(const Value::Scalar& left, const Value::Scalar& right)
+{
+  return !(left == right);
+}
+
+
 bool operator<(const Value::Scalar& left, const Value::Scalar& right)
 {
   return convertToFixed(left.value()) < convertToFixed(right.value());
diff --git a/src/slave/containerizer/docker.cpp 
b/src/slave/containerizer/docker.cpp
index 3aa6a99..8aed025 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -541,7 +541,7 @@ Try<Nothing> 
DockerContainerizerProcess::updatePersistentVolumes(
         continue;
       }
 
-      if (_container->resources.contains(resource)) {
+      if (_container->resourceRequests.contains(resource)) {
         isVolumeInUse = true;
         break;
       }
@@ -612,7 +612,7 @@ Future<Nothing> 
DockerContainerizerProcess::mountPersistentVolumes(
   container->state = Container::MOUNTING;
 
   if (!container->containerConfig.has_task_info() &&
-      !container->resources.persistentVolumes().empty()) {
+      !container->resourceRequests.persistentVolumes().empty()) {
     LOG(ERROR) << "Persistent volumes found with container '" << containerId
                << "' but are not supported with custom executors";
     return Nothing();
@@ -622,7 +622,7 @@ Future<Nothing> 
DockerContainerizerProcess::mountPersistentVolumes(
       containerId,
       container->containerWorkDir,
       Resources(),
-      container->resources);
+      container->resourceRequests);
 
   if (updateVolumes.isError()) {
     return Failure(updateVolumes.error());
@@ -1333,7 +1333,10 @@ Future<Containerizer::LaunchResult> 
DockerContainerizerProcess::_launch(
       // --cpu-quota to the 'docker run' call in
       // launchExecutorContainer.
       return update(
-          containerId, containerConfig.executor_info().resources(), {}, true)
+          containerId,
+          containerConfig.executor_info().resources(),
+          containerConfig.limits(),
+          true)
         .then([=]() {
           return Future<Docker::Container>(dockerContainer);
         });
@@ -1384,7 +1387,7 @@ Future<Docker::Container> 
DockerContainerizerProcess::launchExecutorContainer(
         containerName,
         container->containerWorkDir,
         flags.sandbox_directory,
-        container->resources,
+        container->resourceRequests,
 #ifdef __linux__
         flags.cgroups_enable_cfs,
 #else
@@ -1392,8 +1395,8 @@ Future<Docker::Container> 
DockerContainerizerProcess::launchExecutorContainer(
 #endif
         container->environment,
         None(), // No extra devices.
-        flags.docker_mesos_image.isNone() ? flags.default_container_dns : 
None()
-    );
+        flags.docker_mesos_image.isNone() ? flags.default_container_dns : 
None(),
+        container->resourceLimits);
 
     if (runOptions.isError()) {
       return Failure(runOptions.error());
@@ -1516,7 +1519,7 @@ Future<pid_t> 
DockerContainerizerProcess::launchExecutorProcess(
   Future<Nothing> allocateGpus = Nothing();
 
 #ifdef __linux__
-  Option<double> gpus = Resources(container->resources).gpus();
+  Option<double> gpus = Resources(container->resourceRequests).gpus();
 
   if (gpus.isSome() && gpus.get() > 0) {
     // Make sure that the `gpus` resource is not fractional.
@@ -1677,19 +1680,23 @@ Future<Nothing> DockerContainerizerProcess::update(
   }
 
   if (container->generatedForCommandTask) {
+    // Store the resources for usage().
+    container->resourceRequests = resourceRequests;
+    container->resourceLimits = resourceLimits;
+
     LOG(INFO) << "Ignoring updating container " << containerId
               << " because it is generated for a command task";
 
-    // Store the resources for usage().
-    container->resources = resourceRequests;
-
     return Nothing();
   }
 
-  if (container->resources == resourceRequests && !force) {
+  if (container->resourceRequests == resourceRequests &&
+      container->resourceLimits == resourceLimits &&
+      !force) {
     LOG(INFO) << "Ignoring updating container " << containerId
               << " because resources passed to update are identical to"
               << " existing resources";
+
     return Nothing();
   }
 
@@ -1699,17 +1706,21 @@ Future<Nothing> DockerContainerizerProcess::update(
   // TODO(gyliu): Support updating GPU resources.
 
   // Store the resources for usage().
-  container->resources = resourceRequests;
+  container->resourceRequests = resourceRequests;
+  container->resourceLimits = resourceLimits;
 
 #ifdef __linux__
-  if (!resourceRequests.cpus().isSome() && !resourceRequests.mem().isSome()) {
+  if (!resourceRequests.cpus().isSome() &&
+      !resourceRequests.mem().isSome() &&
+      !resourceLimits.count("cpus") &&
+      !resourceLimits.count("mem")) {
     LOG(WARNING) << "Ignoring update as no supported resources are present";
     return Nothing();
   }
 
   // Skip inspecting the docker container if we already have the cgroups.
   if (container->cpuCgroup.isSome() && container->memoryCgroup.isSome()) {
-    return __update(containerId, resourceRequests);
+    return __update(containerId, resourceRequests, resourceLimits);
   }
 
   string containerName = containers_.at(containerId)->containerName;
@@ -1753,7 +1764,13 @@ Future<Nothing> DockerContainerizerProcess::update(
       });
 
   return inspectLoop
-    .then(defer(self(), &Self::_update, containerId, resourceRequests, 
lambda::_1));
+    .then(defer(
+        self(),
+        &Self::_update,
+        containerId,
+        resourceRequests,
+        resourceLimits,
+        lambda::_1));
 #else
   return Nothing();
 #endif // __linux__
@@ -1763,7 +1780,8 @@ Future<Nothing> DockerContainerizerProcess::update(
 #ifdef __linux__
 Future<Nothing> DockerContainerizerProcess::_update(
     const ContainerID& containerId,
-    const Resources& _resources,
+    const Resources& resourceRequests,
+    const google::protobuf::Map<string, Value::Scalar>& resourceLimits,
     const Docker::Container& container)
 {
   if (container.pid.isNone()) {
@@ -1832,13 +1850,14 @@ Future<Nothing> DockerContainerizerProcess::_update(
     return Nothing();
   }
 
-  return __update(containerId, _resources);
+  return __update(containerId, resourceRequests, resourceLimits);
 }
 
 
 Future<Nothing> DockerContainerizerProcess::__update(
     const ContainerID& containerId,
-    const Resources& _resources)
+    const Resources& resourceRequests,
+    const google::protobuf::Map<string, Value::Scalar>& resourceLimits)
 {
   CHECK(containers_.contains(containerId));
 
@@ -1849,7 +1868,7 @@ Future<Nothing> DockerContainerizerProcess::__update(
   // we make these static so we can reuse the result for subsequent
   // calls.
   static Result<string> cpuHierarchy = cgroups::hierarchy("cpu");
-  static Result<string> memoryHierarchy = cgroups::hierarchy("memory");
+  static Result<string> memHierarchy = cgroups::hierarchy("memory");
 
   if (cpuHierarchy.isError()) {
     return Failure("Failed to determine the cgroup hierarchy "
@@ -1857,111 +1876,164 @@ Future<Nothing> DockerContainerizerProcess::__update(
                    cpuHierarchy.error());
   }
 
-  if (memoryHierarchy.isError()) {
+  if (memHierarchy.isError()) {
     return Failure("Failed to determine the cgroup hierarchy "
                    "where the 'memory' subsystem is mounted: " +
-                   memoryHierarchy.error());
+                   memHierarchy.error());
   }
 
   Option<string> cpuCgroup = container->cpuCgroup;
-  Option<string> memoryCgroup = container->memoryCgroup;
-
-  // Update the CPU shares (if applicable).
-  if (cpuHierarchy.isSome() &&
-      cpuCgroup.isSome() &&
-      _resources.cpus().isSome()) {
-    double cpuShares = _resources.cpus().get();
+  Option<string> memCgroup = container->memoryCgroup;
 
-    uint64_t shares =
-      std::max((uint64_t) (CPU_SHARES_PER_CPU * cpuShares), MIN_CPU_SHARES);
+  Option<double> cpuRequest = resourceRequests.cpus();
+  Option<Bytes> memRequest = resourceRequests.mem();
 
-    Try<Nothing> write =
-      cgroups::cpu::shares(cpuHierarchy.get(), cpuCgroup.get(), shares);
-
-    if (write.isError()) {
-      return Failure("Failed to update 'cpu.shares': " + write.error());
+  Option<double> cpuLimit, memLimit;
+  foreach (auto&& limit, resourceLimits) {
+    if (limit.first == "cpus") {
+      cpuLimit = limit.second.value();
+    } else if (limit.first == "mem") {
+      memLimit = limit.second.value();
     }
+  }
 
-    LOG(INFO) << "Updated 'cpu.shares' to " << shares
-              << " at " << path::join(cpuHierarchy.get(), cpuCgroup.get())
-              << " for container " << containerId;
+  // Update the CPU shares and CFS quota (if applicable).
+  if (cpuHierarchy.isSome() && cpuCgroup.isSome()) {
+    if (cpuRequest.isSome()) {
+      uint64_t shares = std::max(
+          (uint64_t) (CPU_SHARES_PER_CPU * cpuRequest.get()), MIN_CPU_SHARES);
 
-    // Set cfs quota if enabled.
-    if (flags.cgroups_enable_cfs) {
-      write = cgroups::cpu::cfs_period_us(
-          cpuHierarchy.get(),
-          cpuCgroup.get(),
-          CPU_CFS_PERIOD);
+      Try<Nothing> write =
+        cgroups::cpu::shares(cpuHierarchy.get(), cpuCgroup.get(), shares);
 
       if (write.isError()) {
-        return Failure("Failed to update 'cpu.cfs_period_us': " +
-                       write.error());
+        return Failure("Failed to update 'cpu.shares': " + write.error());
       }
 
-      Duration quota = std::max(CPU_CFS_PERIOD * cpuShares, MIN_CPU_CFS_QUOTA);
+      LOG(INFO) << "Updated 'cpu.shares' to " << shares
+                << " at " << path::join(cpuHierarchy.get(), cpuCgroup.get())
+                << " for container " << containerId;
+    }
 
-      write = cgroups::cpu::cfs_quota_us(
+    // Set CFS quota to CPU limit (if any) or to CPU request (if the
+    // flag `--cgroups_enable_cfs` is true).
+    if (cpuLimit.isSome() || (flags.cgroups_enable_cfs && 
cpuRequest.isSome())) {
+      Try<Nothing> write = cgroups::cpu::cfs_period_us(
           cpuHierarchy.get(),
           cpuCgroup.get(),
-          quota);
+          CPU_CFS_PERIOD);
 
       if (write.isError()) {
-        return Failure("Failed to update 'cpu.cfs_quota_us': " + 
write.error());
+        return Failure(
+            "Failed to update 'cpu.cfs_period_us': " + write.error());
       }
 
-      LOG(INFO) << "Updated 'cpu.cfs_period_us' to " << CPU_CFS_PERIOD
-                << " and 'cpu.cfs_quota_us' to " << quota
-                << " (cpus " << cpuShares << ")"
-                << " for container " << containerId;
+      if (cpuLimit.isSome() && std::isinf(cpuLimit.get())) {
+        write = cgroups::write(
+            cpuHierarchy.get(), cpuCgroup.get(), "cpu.cfs_quota_us", "-1");
+
+        if (write.isError()) {
+          return Failure(
+              "Failed to update 'cpu.cfs_quota_us': " + write.error());
+        }
+
+        LOG(INFO) << "Updated 'cpu.cfs_period_us' to " << CPU_CFS_PERIOD
+                  << " and 'cpu.cfs_quota_us' to -1 at "
+                  << path::join(cpuHierarchy.get(), cpuCgroup.get())
+                  << " for container " << containerId;
+      } else {
+        const double& quota =
+          cpuLimit.isSome() ? cpuLimit.get() : cpuRequest.get();
+
+        Duration duration = std::max(CPU_CFS_PERIOD * quota, 
MIN_CPU_CFS_QUOTA);
+
+        write = cgroups::cpu::cfs_quota_us(
+            cpuHierarchy.get(), cpuCgroup.get(), duration);
+
+        if (write.isError()) {
+          return Failure(
+              "Failed to update 'cpu.cfs_quota_us': " + write.error());
+        }
+
+        LOG(INFO) << "Updated 'cpu.cfs_period_us' to " << CPU_CFS_PERIOD
+                  << " and 'cpu.cfs_quota_us' to " << duration << " (cpus "
+                  << quota << ") at "
+                  << path::join(cpuHierarchy.get(), cpuCgroup.get())
+                  << " for container " << containerId;
+      }
     }
   }
 
   // Update the memory limits (if applicable).
-  if (memoryHierarchy.isSome() &&
-      memoryCgroup.isSome() &&
-      _resources.mem().isSome()) {
+  if (memHierarchy.isSome() && memCgroup.isSome()) {
     // TODO(tnachen): investigate and handle OOM with docker.
-    Bytes mem = _resources.mem().get();
-    Bytes limit = std::max(mem, MIN_MEMORY);
+    if (memRequest.isSome()) {
+      Bytes softLimit = std::max(memRequest.get(), MIN_MEMORY);
 
-    // Always set the soft limit.
-    Try<Nothing> write =
-      cgroups::memory::soft_limit_in_bytes(
-          memoryHierarchy.get(), memoryCgroup.get(), limit);
+      // Always set the soft limit.
+      Try<Nothing> write = cgroups::memory::soft_limit_in_bytes(
+          memHierarchy.get(), memCgroup.get(), softLimit);
 
-    if (write.isError()) {
-      return Failure("Failed to set 'memory.soft_limit_in_bytes': " +
-                     write.error());
+      if (write.isError()) {
+        return Failure("Failed to set 'memory.soft_limit_in_bytes': " +
+                       write.error());
+      }
+
+      LOG(INFO) << "Updated 'memory.soft_limit_in_bytes' to " << softLimit
+                << " at " << path::join(memHierarchy.get(), memCgroup.get())
+                << " for container " << containerId;
     }
 
-    LOG(INFO) << "Updated 'memory.soft_limit_in_bytes' to " << limit
-              << " for container " << containerId;
+    // Read the existing hard limit.
+    Try<Bytes> currentHardLimit = cgroups::memory::limit_in_bytes(
+        memHierarchy.get(), memCgroup.get());
 
-    // Read the existing limit.
-    Try<Bytes> currentLimit =
-      cgroups::memory::limit_in_bytes(
-          memoryHierarchy.get(), memoryCgroup.get());
+    if (currentHardLimit.isError()) {
+      return Failure(
+          "Failed to read 'memory.limit_in_bytes': " +
+          currentHardLimit.error());
+    }
 
-    if (currentLimit.isError()) {
-      return Failure("Failed to read 'memory.limit_in_bytes': " +
-                     currentLimit.error());
+    bool isInfiniteLimit = false;
+    Option<Bytes> hardLimit = None();
+    if (memLimit.isSome()) {
+      if (std::isinf(memLimit.get())) {
+        isInfiniteLimit = true;
+      } else {
+        hardLimit = std::max(
+            Megabytes(static_cast<uint64_t>(memLimit.get())), MIN_MEMORY);
+      }
+    } else if (memRequest.isSome()) {
+      hardLimit = std::max(memRequest.get(), MIN_MEMORY);
     }
 
-    // Only update if new limit is higher.
+    // Only update if new limit is infinite or higher than current limit.
     // TODO(benh): Introduce a MemoryWatcherProcess which monitors the
     // discrepancy between usage and soft limit and introduces a
     // "manual oom" if necessary.
-    if (limit > currentLimit.get()) {
-      write = cgroups::memory::limit_in_bytes(
-          memoryHierarchy.get(), memoryCgroup.get(), limit);
+    if (isInfiniteLimit) {
+      Try<Nothing> write = cgroups::write(
+          memHierarchy.get(), memCgroup.get(), "memory.limit_in_bytes", "-1");
 
       if (write.isError()) {
-        return Failure("Failed to set 'memory.limit_in_bytes': " +
-                       write.error());
+        return Failure(
+            "Failed to update 'memory.limit_in_bytes': " + write.error());
+      }
+
+      LOG(INFO) << "Updated 'memory.limit_in_bytes' to -1 at "
+                << path::join(memHierarchy.get(), memCgroup.get())
+                << " for container " << containerId;
+    } else if (hardLimit.isSome() && hardLimit.get() > currentHardLimit.get()) 
{
+      Try<Nothing> write = cgroups::memory::limit_in_bytes(
+          memHierarchy.get(), memCgroup.get(), hardLimit.get());
+
+      if (write.isError()) {
+        return Failure(
+            "Failed to set 'memory.limit_in_bytes': " + write.error());
       }
 
-      LOG(INFO) << "Updated 'memory.limit_in_bytes' to " << limit << " at "
-                << path::join(memoryHierarchy.get(), memoryCgroup.get())
+      LOG(INFO) << "Updated 'memory.limit_in_bytes' to " << hardLimit.get()
+                << " at " << path::join(memHierarchy.get(), memCgroup.get())
                 << " for container " << containerId;
     }
   }
@@ -2011,7 +2083,7 @@ Future<ResourceStatistics> 
DockerContainerizerProcess::usage(
 #endif // __linux__
 
     // Set the resource allocations.
-    const Resources& resource = container->resources;
+    const Resources& resource = container->resourceRequests;
     const Option<Bytes> mem = resource.mem();
     if (mem.isSome()) {
       result.set_mem_limit_bytes(mem->bytes());
diff --git a/src/slave/containerizer/docker.hpp 
b/src/slave/containerizer/docker.hpp
index d3d5f3a..8bb51bb 100644
--- a/src/slave/containerizer/docker.hpp
+++ b/src/slave/containerizer/docker.hpp
@@ -256,12 +256,14 @@ private:
 #ifdef __linux__
   process::Future<Nothing> _update(
       const ContainerID& containerId,
-      const Resources& resources,
+      const Resources& resourceRequests,
+      const google::protobuf::Map<std::string, Value::Scalar>& resourceLimits,
       const Docker::Container& container);
 
   process::Future<Nothing> __update(
       const ContainerID& containerId,
-      const Resources& resources);
+      const Resources& resourceRequests,
+      const google::protobuf::Map<std::string, Value::Scalar>& resourceLimits);
 #endif // __linux__
 
   process::Future<Nothing> mountPersistentVolumes(
@@ -366,10 +368,12 @@ private:
       // perfect check because an executor might always have a subset
       // of it's resources that match a task, nevertheless, it's
       // better than nothing).
-      resources = containerConfig.resources();
+      resourceRequests = containerConfig.resources();
+      resourceLimits = containerConfig.limits();
 
       if (containerConfig.has_task_info()) {
-        CHECK(resources.contains(containerConfig.task_info().resources()));
+        CHECK(
+            
resourceRequests.contains(containerConfig.task_info().resources()));
       }
 
       if (_command.isSome()) {
@@ -506,7 +510,8 @@ private:
     // the ResourceStatistics limits in usage(). Note that this is
     // different than just what we might get from TaskInfo::resources
     // or ExecutorInfo::resources because they can change dynamically.
-    Resources resources;
+    Resources resourceRequests;
+    google::protobuf::Map<std::string, Value::Scalar> resourceLimits;
 
     // The docker pull future is stored so we can discard when
     // destroy is called while docker is pulling the image.

Reply via email to