This is an automated email from the ASF dual-hosted git repository. qianzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 929932fc2bd753f097a26caea5b3e7f7f3ac9118 Author: Qian Zhang <[email protected]> AuthorDate: Thu Apr 30 09:29:49 2020 +0800 Updated Docker containerizer to set Docker container's resource limits. This is to ensure the resource limits of Docker container where custom executor runs can be correctly updated when a new task is launched or an existing task terminates. And the `resource` field in the `Container` struct is also renamed to `resourceRequests`. Review: https://reviews.apache.org/r/72391 --- include/mesos/values.hpp | 1 + src/common/values.cpp | 6 + src/slave/containerizer/docker.cpp | 242 ++++++++++++++++++++++++------------- src/slave/containerizer/docker.hpp | 15 ++- 4 files changed, 174 insertions(+), 90 deletions(-) diff --git a/include/mesos/values.hpp b/include/mesos/values.hpp index 27f71d1..9288503 100644 --- a/include/mesos/values.hpp +++ b/include/mesos/values.hpp @@ -27,6 +27,7 @@ namespace mesos { std::ostream& operator<<(std::ostream& stream, const Value::Scalar& scalar); bool operator==(const Value::Scalar& left, const Value::Scalar& right); +bool operator!=(const Value::Scalar& left, const Value::Scalar& right); bool operator<(const Value::Scalar& left, const Value::Scalar& right); bool operator<=(const Value::Scalar& left, const Value::Scalar& right); bool operator>(const Value::Scalar& left, const Value::Scalar& right); diff --git a/src/common/values.cpp b/src/common/values.cpp index 7520382..d7bc91b 100644 --- a/src/common/values.cpp +++ b/src/common/values.cpp @@ -99,6 +99,12 @@ bool operator==(const Value::Scalar& left, const Value::Scalar& right) } +bool operator!=(const Value::Scalar& left, const Value::Scalar& right) +{ + return !(left == right); +} + + bool operator<(const Value::Scalar& left, const Value::Scalar& right) { return convertToFixed(left.value()) < convertToFixed(right.value()); diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp index 3aa6a99..8aed025 100644 --- a/src/slave/containerizer/docker.cpp +++ b/src/slave/containerizer/docker.cpp @@ -541,7 +541,7 @@ Try<Nothing> DockerContainerizerProcess::updatePersistentVolumes( continue; } - if (_container->resources.contains(resource)) { + if (_container->resourceRequests.contains(resource)) { isVolumeInUse = true; break; } @@ -612,7 +612,7 @@ Future<Nothing> DockerContainerizerProcess::mountPersistentVolumes( container->state = Container::MOUNTING; if (!container->containerConfig.has_task_info() && - !container->resources.persistentVolumes().empty()) { + !container->resourceRequests.persistentVolumes().empty()) { LOG(ERROR) << "Persistent volumes found with container '" << containerId << "' but are not supported with custom executors"; return Nothing(); @@ -622,7 +622,7 @@ Future<Nothing> DockerContainerizerProcess::mountPersistentVolumes( containerId, container->containerWorkDir, Resources(), - container->resources); + container->resourceRequests); if (updateVolumes.isError()) { return Failure(updateVolumes.error()); @@ -1333,7 +1333,10 @@ Future<Containerizer::LaunchResult> DockerContainerizerProcess::_launch( // --cpu-quota to the 'docker run' call in // launchExecutorContainer. return update( - containerId, containerConfig.executor_info().resources(), {}, true) + containerId, + containerConfig.executor_info().resources(), + containerConfig.limits(), + true) .then([=]() { return Future<Docker::Container>(dockerContainer); }); @@ -1384,7 +1387,7 @@ Future<Docker::Container> DockerContainerizerProcess::launchExecutorContainer( containerName, container->containerWorkDir, flags.sandbox_directory, - container->resources, + container->resourceRequests, #ifdef __linux__ flags.cgroups_enable_cfs, #else @@ -1392,8 +1395,8 @@ Future<Docker::Container> DockerContainerizerProcess::launchExecutorContainer( #endif container->environment, None(), // No extra devices. - flags.docker_mesos_image.isNone() ? flags.default_container_dns : None() - ); + flags.docker_mesos_image.isNone() ? flags.default_container_dns : None(), + container->resourceLimits); if (runOptions.isError()) { return Failure(runOptions.error()); @@ -1516,7 +1519,7 @@ Future<pid_t> DockerContainerizerProcess::launchExecutorProcess( Future<Nothing> allocateGpus = Nothing(); #ifdef __linux__ - Option<double> gpus = Resources(container->resources).gpus(); + Option<double> gpus = Resources(container->resourceRequests).gpus(); if (gpus.isSome() && gpus.get() > 0) { // Make sure that the `gpus` resource is not fractional. @@ -1677,19 +1680,23 @@ Future<Nothing> DockerContainerizerProcess::update( } if (container->generatedForCommandTask) { + // Store the resources for usage(). + container->resourceRequests = resourceRequests; + container->resourceLimits = resourceLimits; + LOG(INFO) << "Ignoring updating container " << containerId << " because it is generated for a command task"; - // Store the resources for usage(). - container->resources = resourceRequests; - return Nothing(); } - if (container->resources == resourceRequests && !force) { + if (container->resourceRequests == resourceRequests && + container->resourceLimits == resourceLimits && + !force) { LOG(INFO) << "Ignoring updating container " << containerId << " because resources passed to update are identical to" << " existing resources"; + return Nothing(); } @@ -1699,17 +1706,21 @@ Future<Nothing> DockerContainerizerProcess::update( // TODO(gyliu): Support updating GPU resources. // Store the resources for usage(). - container->resources = resourceRequests; + container->resourceRequests = resourceRequests; + container->resourceLimits = resourceLimits; #ifdef __linux__ - if (!resourceRequests.cpus().isSome() && !resourceRequests.mem().isSome()) { + if (!resourceRequests.cpus().isSome() && + !resourceRequests.mem().isSome() && + !resourceLimits.count("cpus") && + !resourceLimits.count("mem")) { LOG(WARNING) << "Ignoring update as no supported resources are present"; return Nothing(); } // Skip inspecting the docker container if we already have the cgroups. if (container->cpuCgroup.isSome() && container->memoryCgroup.isSome()) { - return __update(containerId, resourceRequests); + return __update(containerId, resourceRequests, resourceLimits); } string containerName = containers_.at(containerId)->containerName; @@ -1753,7 +1764,13 @@ Future<Nothing> DockerContainerizerProcess::update( }); return inspectLoop - .then(defer(self(), &Self::_update, containerId, resourceRequests, lambda::_1)); + .then(defer( + self(), + &Self::_update, + containerId, + resourceRequests, + resourceLimits, + lambda::_1)); #else return Nothing(); #endif // __linux__ @@ -1763,7 +1780,8 @@ Future<Nothing> DockerContainerizerProcess::update( #ifdef __linux__ Future<Nothing> DockerContainerizerProcess::_update( const ContainerID& containerId, - const Resources& _resources, + const Resources& resourceRequests, + const google::protobuf::Map<string, Value::Scalar>& resourceLimits, const Docker::Container& container) { if (container.pid.isNone()) { @@ -1832,13 +1850,14 @@ Future<Nothing> DockerContainerizerProcess::_update( return Nothing(); } - return __update(containerId, _resources); + return __update(containerId, resourceRequests, resourceLimits); } Future<Nothing> DockerContainerizerProcess::__update( const ContainerID& containerId, - const Resources& _resources) + const Resources& resourceRequests, + const google::protobuf::Map<string, Value::Scalar>& resourceLimits) { CHECK(containers_.contains(containerId)); @@ -1849,7 +1868,7 @@ Future<Nothing> DockerContainerizerProcess::__update( // we make these static so we can reuse the result for subsequent // calls. static Result<string> cpuHierarchy = cgroups::hierarchy("cpu"); - static Result<string> memoryHierarchy = cgroups::hierarchy("memory"); + static Result<string> memHierarchy = cgroups::hierarchy("memory"); if (cpuHierarchy.isError()) { return Failure("Failed to determine the cgroup hierarchy " @@ -1857,111 +1876,164 @@ Future<Nothing> DockerContainerizerProcess::__update( cpuHierarchy.error()); } - if (memoryHierarchy.isError()) { + if (memHierarchy.isError()) { return Failure("Failed to determine the cgroup hierarchy " "where the 'memory' subsystem is mounted: " + - memoryHierarchy.error()); + memHierarchy.error()); } Option<string> cpuCgroup = container->cpuCgroup; - Option<string> memoryCgroup = container->memoryCgroup; - - // Update the CPU shares (if applicable). - if (cpuHierarchy.isSome() && - cpuCgroup.isSome() && - _resources.cpus().isSome()) { - double cpuShares = _resources.cpus().get(); + Option<string> memCgroup = container->memoryCgroup; - uint64_t shares = - std::max((uint64_t) (CPU_SHARES_PER_CPU * cpuShares), MIN_CPU_SHARES); + Option<double> cpuRequest = resourceRequests.cpus(); + Option<Bytes> memRequest = resourceRequests.mem(); - Try<Nothing> write = - cgroups::cpu::shares(cpuHierarchy.get(), cpuCgroup.get(), shares); - - if (write.isError()) { - return Failure("Failed to update 'cpu.shares': " + write.error()); + Option<double> cpuLimit, memLimit; + foreach (auto&& limit, resourceLimits) { + if (limit.first == "cpus") { + cpuLimit = limit.second.value(); + } else if (limit.first == "mem") { + memLimit = limit.second.value(); } + } - LOG(INFO) << "Updated 'cpu.shares' to " << shares - << " at " << path::join(cpuHierarchy.get(), cpuCgroup.get()) - << " for container " << containerId; + // Update the CPU shares and CFS quota (if applicable). + if (cpuHierarchy.isSome() && cpuCgroup.isSome()) { + if (cpuRequest.isSome()) { + uint64_t shares = std::max( + (uint64_t) (CPU_SHARES_PER_CPU * cpuRequest.get()), MIN_CPU_SHARES); - // Set cfs quota if enabled. - if (flags.cgroups_enable_cfs) { - write = cgroups::cpu::cfs_period_us( - cpuHierarchy.get(), - cpuCgroup.get(), - CPU_CFS_PERIOD); + Try<Nothing> write = + cgroups::cpu::shares(cpuHierarchy.get(), cpuCgroup.get(), shares); if (write.isError()) { - return Failure("Failed to update 'cpu.cfs_period_us': " + - write.error()); + return Failure("Failed to update 'cpu.shares': " + write.error()); } - Duration quota = std::max(CPU_CFS_PERIOD * cpuShares, MIN_CPU_CFS_QUOTA); + LOG(INFO) << "Updated 'cpu.shares' to " << shares + << " at " << path::join(cpuHierarchy.get(), cpuCgroup.get()) + << " for container " << containerId; + } - write = cgroups::cpu::cfs_quota_us( + // Set CFS quota to CPU limit (if any) or to CPU request (if the + // flag `--cgroups_enable_cfs` is true). + if (cpuLimit.isSome() || (flags.cgroups_enable_cfs && cpuRequest.isSome())) { + Try<Nothing> write = cgroups::cpu::cfs_period_us( cpuHierarchy.get(), cpuCgroup.get(), - quota); + CPU_CFS_PERIOD); if (write.isError()) { - return Failure("Failed to update 'cpu.cfs_quota_us': " + write.error()); + return Failure( + "Failed to update 'cpu.cfs_period_us': " + write.error()); } - LOG(INFO) << "Updated 'cpu.cfs_period_us' to " << CPU_CFS_PERIOD - << " and 'cpu.cfs_quota_us' to " << quota - << " (cpus " << cpuShares << ")" - << " for container " << containerId; + if (cpuLimit.isSome() && std::isinf(cpuLimit.get())) { + write = cgroups::write( + cpuHierarchy.get(), cpuCgroup.get(), "cpu.cfs_quota_us", "-1"); + + if (write.isError()) { + return Failure( + "Failed to update 'cpu.cfs_quota_us': " + write.error()); + } + + LOG(INFO) << "Updated 'cpu.cfs_period_us' to " << CPU_CFS_PERIOD + << " and 'cpu.cfs_quota_us' to -1 at " + << path::join(cpuHierarchy.get(), cpuCgroup.get()) + << " for container " << containerId; + } else { + const double& quota = + cpuLimit.isSome() ? cpuLimit.get() : cpuRequest.get(); + + Duration duration = std::max(CPU_CFS_PERIOD * quota, MIN_CPU_CFS_QUOTA); + + write = cgroups::cpu::cfs_quota_us( + cpuHierarchy.get(), cpuCgroup.get(), duration); + + if (write.isError()) { + return Failure( + "Failed to update 'cpu.cfs_quota_us': " + write.error()); + } + + LOG(INFO) << "Updated 'cpu.cfs_period_us' to " << CPU_CFS_PERIOD + << " and 'cpu.cfs_quota_us' to " << duration << " (cpus " + << quota << ") at " + << path::join(cpuHierarchy.get(), cpuCgroup.get()) + << " for container " << containerId; + } } } // Update the memory limits (if applicable). - if (memoryHierarchy.isSome() && - memoryCgroup.isSome() && - _resources.mem().isSome()) { + if (memHierarchy.isSome() && memCgroup.isSome()) { // TODO(tnachen): investigate and handle OOM with docker. - Bytes mem = _resources.mem().get(); - Bytes limit = std::max(mem, MIN_MEMORY); + if (memRequest.isSome()) { + Bytes softLimit = std::max(memRequest.get(), MIN_MEMORY); - // Always set the soft limit. - Try<Nothing> write = - cgroups::memory::soft_limit_in_bytes( - memoryHierarchy.get(), memoryCgroup.get(), limit); + // Always set the soft limit. + Try<Nothing> write = cgroups::memory::soft_limit_in_bytes( + memHierarchy.get(), memCgroup.get(), softLimit); - if (write.isError()) { - return Failure("Failed to set 'memory.soft_limit_in_bytes': " + - write.error()); + if (write.isError()) { + return Failure("Failed to set 'memory.soft_limit_in_bytes': " + + write.error()); + } + + LOG(INFO) << "Updated 'memory.soft_limit_in_bytes' to " << softLimit + << " at " << path::join(memHierarchy.get(), memCgroup.get()) + << " for container " << containerId; } - LOG(INFO) << "Updated 'memory.soft_limit_in_bytes' to " << limit - << " for container " << containerId; + // Read the existing hard limit. + Try<Bytes> currentHardLimit = cgroups::memory::limit_in_bytes( + memHierarchy.get(), memCgroup.get()); - // Read the existing limit. - Try<Bytes> currentLimit = - cgroups::memory::limit_in_bytes( - memoryHierarchy.get(), memoryCgroup.get()); + if (currentHardLimit.isError()) { + return Failure( + "Failed to read 'memory.limit_in_bytes': " + + currentHardLimit.error()); + } - if (currentLimit.isError()) { - return Failure("Failed to read 'memory.limit_in_bytes': " + - currentLimit.error()); + bool isInfiniteLimit = false; + Option<Bytes> hardLimit = None(); + if (memLimit.isSome()) { + if (std::isinf(memLimit.get())) { + isInfiniteLimit = true; + } else { + hardLimit = std::max( + Megabytes(static_cast<uint64_t>(memLimit.get())), MIN_MEMORY); + } + } else if (memRequest.isSome()) { + hardLimit = std::max(memRequest.get(), MIN_MEMORY); } - // Only update if new limit is higher. + // Only update if new limit is infinite or higher than current limit. // TODO(benh): Introduce a MemoryWatcherProcess which monitors the // discrepancy between usage and soft limit and introduces a // "manual oom" if necessary. - if (limit > currentLimit.get()) { - write = cgroups::memory::limit_in_bytes( - memoryHierarchy.get(), memoryCgroup.get(), limit); + if (isInfiniteLimit) { + Try<Nothing> write = cgroups::write( + memHierarchy.get(), memCgroup.get(), "memory.limit_in_bytes", "-1"); if (write.isError()) { - return Failure("Failed to set 'memory.limit_in_bytes': " + - write.error()); + return Failure( + "Failed to update 'memory.limit_in_bytes': " + write.error()); + } + + LOG(INFO) << "Updated 'memory.limit_in_bytes' to -1 at " + << path::join(memHierarchy.get(), memCgroup.get()) + << " for container " << containerId; + } else if (hardLimit.isSome() && hardLimit.get() > currentHardLimit.get()) { + Try<Nothing> write = cgroups::memory::limit_in_bytes( + memHierarchy.get(), memCgroup.get(), hardLimit.get()); + + if (write.isError()) { + return Failure( + "Failed to set 'memory.limit_in_bytes': " + write.error()); } - LOG(INFO) << "Updated 'memory.limit_in_bytes' to " << limit << " at " - << path::join(memoryHierarchy.get(), memoryCgroup.get()) + LOG(INFO) << "Updated 'memory.limit_in_bytes' to " << hardLimit.get() + << " at " << path::join(memHierarchy.get(), memCgroup.get()) << " for container " << containerId; } } @@ -2011,7 +2083,7 @@ Future<ResourceStatistics> DockerContainerizerProcess::usage( #endif // __linux__ // Set the resource allocations. - const Resources& resource = container->resources; + const Resources& resource = container->resourceRequests; const Option<Bytes> mem = resource.mem(); if (mem.isSome()) { result.set_mem_limit_bytes(mem->bytes()); diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp index d3d5f3a..8bb51bb 100644 --- a/src/slave/containerizer/docker.hpp +++ b/src/slave/containerizer/docker.hpp @@ -256,12 +256,14 @@ private: #ifdef __linux__ process::Future<Nothing> _update( const ContainerID& containerId, - const Resources& resources, + const Resources& resourceRequests, + const google::protobuf::Map<std::string, Value::Scalar>& resourceLimits, const Docker::Container& container); process::Future<Nothing> __update( const ContainerID& containerId, - const Resources& resources); + const Resources& resourceRequests, + const google::protobuf::Map<std::string, Value::Scalar>& resourceLimits); #endif // __linux__ process::Future<Nothing> mountPersistentVolumes( @@ -366,10 +368,12 @@ private: // perfect check because an executor might always have a subset // of it's resources that match a task, nevertheless, it's // better than nothing). - resources = containerConfig.resources(); + resourceRequests = containerConfig.resources(); + resourceLimits = containerConfig.limits(); if (containerConfig.has_task_info()) { - CHECK(resources.contains(containerConfig.task_info().resources())); + CHECK( + resourceRequests.contains(containerConfig.task_info().resources())); } if (_command.isSome()) { @@ -506,7 +510,8 @@ private: // the ResourceStatistics limits in usage(). Note that this is // different than just what we might get from TaskInfo::resources // or ExecutorInfo::resources because they can change dynamically. - Resources resources; + Resources resourceRequests; + google::protobuf::Map<std::string, Value::Scalar> resourceLimits; // The docker pull future is stored so we can discard when // destroy is called while docker is pulling the image.
