This is an automated email from the ASF dual-hosted git repository. bteke pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 399299104c58 YARN-11674. Add CPUResourceHandler for cgroup v2. (#6751) 399299104c58 is described below commit 399299104c5837ed0edd42be027bc7dfc45c2bfe Author: Benjamin Teke <brumi1...@users.noreply.github.com> AuthorDate: Fri Apr 26 15:00:00 2024 +0200 YARN-11674. Add CPUResourceHandler for cgroup v2. (#6751) --- ...java => AbstractCGroupsCpuResourceHandler.java} | 96 ++----- .../linux/resources/AbstractCGroupsHandler.java | 3 +- .../resources/CGroupsBlkioResourceHandlerImpl.java | 2 +- .../resources/CGroupsCpuResourceHandlerImpl.java | 224 +++------------ .../linux/resources/CGroupsHandler.java | 3 +- .../resources/CGroupsV2CpuResourceHandlerImpl.java | 105 +++++++ .../util/CgroupsLCEResourcesHandler.java | 2 +- .../TestCGroupsBlkioResourceHandlerImpl.java | 2 +- .../TestCGroupsV2CpuResourceHandlerImpl.java | 313 +++++++++++++++++++++ 9 files changed, 482 insertions(+), 268 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsCpuResourceHandler.java similarity index 68% copy from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java copy to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsCpuResourceHandler.java index f724b8803d5f..92d33d972256 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsCpuResourceHandler.java @@ -18,12 +18,9 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; -import org.apache.hadoop.classification.VisibleForTesting; -import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; @@ -34,36 +31,21 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; -/** - * An implementation for using CGroups to restrict CPU usage on Linux. The - * implementation supports 3 different controls - restrict usage of all YARN - * containers, restrict relative usage of individual YARN containers and - * restrict usage of individual YARN containers. Admins can set the overall CPU - * to be used by all YARN containers - this is implemented by setting - * cpu.cfs_period_us and cpu.cfs_quota_us to the ratio desired. If strict - * resource usage mode is not enabled, cpu.shares is set for individual - * containers - this prevents containers from exceeding the overall limit for - * YARN containers but individual containers can use as much of the CPU as - * available(under the YARN limit). If strict resource usage is enabled, then - * container can only use the percentage of CPU allocated to them and this is - * again implemented using cpu.cfs_period_us and cpu.cfs_quota_us. - * - */ @InterfaceStability.Unstable @InterfaceAudience.Private -public class CGroupsCpuResourceHandlerImpl implements CpuResourceHandler { +public abstract class AbstractCGroupsCpuResourceHandler implements CpuResourceHandler { static final Logger LOG = - LoggerFactory.getLogger(CGroupsCpuResourceHandlerImpl.class); + LoggerFactory.getLogger(AbstractCGroupsCpuResourceHandler.class); - private CGroupsHandler cGroupsHandler; + protected CGroupsHandler cGroupsHandler; private boolean strictResourceUsageMode = false; private float yarnProcessors; private int nodeVCores; @@ -74,11 +56,8 @@ public class CGroupsCpuResourceHandlerImpl implements CpuResourceHandler { static final int MAX_QUOTA_US = 1000 * 1000; @VisibleForTesting static final int MIN_PERIOD_US = 1000; - @VisibleForTesting - static final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel - static final int CPU_DEFAULT_WEIGHT_OPPORTUNISTIC = 2; - CGroupsCpuResourceHandlerImpl(CGroupsHandler cGroupsHandler) { + AbstractCGroupsCpuResourceHandler(CGroupsHandler cGroupsHandler) { this.cGroupsHandler = cGroupsHandler; } @@ -103,44 +82,24 @@ public class CGroupsCpuResourceHandlerImpl implements CpuResourceHandler { yarnProcessors = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf); int systemProcessors = NodeManagerHardwareUtils.getNodeCPUs(plugin, conf); boolean existingCpuLimits; - try { - existingCpuLimits = - cpuLimitsExist(cGroupsHandler.getPathForCGroup(CPU, "")); - } catch (IOException ie) { - throw new ResourceHandlerException(ie); - } + existingCpuLimits = cpuLimitExists( + cGroupsHandler.getPathForCGroup(CPU, "")); + if (systemProcessors != (int) yarnProcessors) { LOG.info("YARN containers restricted to " + yarnProcessors + " cores"); int[] limits = getOverallLimits(yarnProcessors); - cGroupsHandler - .updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_PERIOD_US, - String.valueOf(limits[0])); - cGroupsHandler - .updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_QUOTA_US, - String.valueOf(limits[1])); + updateCgroupMaxCpuLimit("", String.valueOf(limits[1]), String.valueOf(limits[0])); } else if (existingCpuLimits) { LOG.info("Removing CPU constraints for YARN containers."); - cGroupsHandler - .updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_QUOTA_US, - String.valueOf(-1)); + updateCgroupMaxCpuLimit("", String.valueOf(-1), null); } return null; } - @InterfaceAudience.Private - public static boolean cpuLimitsExist(String path) - throws IOException { - File quotaFile = new File(path, - CPU.getName() + "." + CGroupsHandler.CGROUP_CPU_QUOTA_US); - if (quotaFile.exists()) { - String contents = FileUtils.readFileToString(quotaFile, StandardCharsets.UTF_8); - int quotaUS = Integer.parseInt(contents.trim()); - if (quotaUS != -1) { - return true; - } - } - return false; - } + protected abstract void updateCgroupMaxCpuLimit(String cgroupId, String quota, String period) + throws ResourceHandlerException; + protected abstract boolean cpuLimitExists(String path) throws ResourceHandlerException; + @VisibleForTesting @InterfaceAudience.Private @@ -215,26 +174,16 @@ public class CGroupsCpuResourceHandlerImpl implements CpuResourceHandler { ContainerTokenIdentifier id = container.getContainerTokenIdentifier(); if (id != null && id.getExecutionType() == ExecutionType.OPPORTUNISTIC) { - cGroupsHandler - .updateCGroupParam(CPU, cgroupId, - CGroupsHandler.CGROUP_CPU_SHARES, - String.valueOf(CPU_DEFAULT_WEIGHT_OPPORTUNISTIC)); + updateCgroupCpuWeight(cgroupId, getOpportunisticCpuWeight()); } else { - int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores; - cGroupsHandler - .updateCGroupParam(CPU, cgroupId, - CGroupsHandler.CGROUP_CPU_SHARES, - String.valueOf(cpuShares)); + updateCgroupCpuWeight(cgroupId, getCpuWeightByContainerVcores(containerVCores)); } if (strictResourceUsageMode) { if (nodeVCores != containerVCores) { float containerCPU = (containerVCores * yarnProcessors) / (float) nodeVCores; int[] limits = getOverallLimits(containerCPU); - cGroupsHandler.updateCGroupParam(CPU, cgroupId, - CGroupsHandler.CGROUP_CPU_PERIOD_US, String.valueOf(limits[0])); - cGroupsHandler.updateCGroupParam(CPU, cgroupId, - CGroupsHandler.CGROUP_CPU_QUOTA_US, String.valueOf(limits[1])); + updateCgroupMaxCpuLimit(cgroupId, String.valueOf(limits[1]), String.valueOf(limits[0])); } } } catch (ResourceHandlerException re) { @@ -246,6 +195,11 @@ public class CGroupsCpuResourceHandlerImpl implements CpuResourceHandler { return null; } + protected abstract int getOpportunisticCpuWeight(); + protected abstract int getCpuWeightByContainerVcores(int containerVcores); + protected abstract void updateCgroupCpuWeight(String cgroupId, int weight) + throws ResourceHandlerException; + @Override public List<PrivilegedOperation> postComplete(ContainerId containerId) throws ResourceHandlerException { @@ -260,6 +214,6 @@ public class CGroupsCpuResourceHandlerImpl implements CpuResourceHandler { @Override public String toString() { - return CGroupsCpuResourceHandlerImpl.class.getName(); + return AbstractCGroupsCpuResourceHandler.class.getName(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsHandler.java index 272f04ce7765..a8f528a20911 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsHandler.java @@ -414,9 +414,10 @@ public abstract class AbstractCGroupsHandler implements CGroupsHandler { public String createCGroup(CGroupController controller, String cGroupId) throws ResourceHandlerException { String path = getPathForCGroup(controller, cGroupId); + File cgroup = new File(path); LOG.debug("createCgroup: {}", path); - if (!new File(path).mkdir()) { + if (!cgroup.exists() && !cgroup.mkdir()) { throw new ResourceHandlerException("Failed to create cgroup at " + path); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java index 865d2b19fd02..b2829ae0f559 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java @@ -134,7 +134,7 @@ public class CGroupsBlkioResourceHandlerImpl implements DiskResourceHandler { .createCGroup(CGroupsHandler.CGroupController.BLKIO, cgroupId); try { cGroupsHandler.updateCGroupParam(CGroupsHandler.CGroupController.BLKIO, - cgroupId, CGroupsHandler.CGROUP_PARAM_BLKIO_WEIGHT, DEFAULT_WEIGHT); + cgroupId, CGroupsHandler.CGROUP_PARAM_WEIGHT, DEFAULT_WEIGHT); } catch (ResourceHandlerException re) { cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.BLKIO, cgroupId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java index f724b8803d5f..54686fddb2f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java @@ -18,28 +18,14 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; -import org.apache.hadoop.classification.VisibleForTesting; import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; -import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; -import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; +import org.apache.hadoop.classification.VisibleForTesting; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; /** * An implementation for using CGroups to restrict CPU usage on Linux. The @@ -58,208 +44,62 @@ import java.util.List; */ @InterfaceStability.Unstable @InterfaceAudience.Private -public class CGroupsCpuResourceHandlerImpl implements CpuResourceHandler { - - static final Logger LOG = - LoggerFactory.getLogger(CGroupsCpuResourceHandlerImpl.class); - - private CGroupsHandler cGroupsHandler; - private boolean strictResourceUsageMode = false; - private float yarnProcessors; - private int nodeVCores; +public class CGroupsCpuResourceHandlerImpl extends AbstractCGroupsCpuResourceHandler { private static final CGroupsHandler.CGroupController CPU = CGroupsHandler.CGroupController.CPU; - @VisibleForTesting - static final int MAX_QUOTA_US = 1000 * 1000; - @VisibleForTesting - static final int MIN_PERIOD_US = 1000; @VisibleForTesting static final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel static final int CPU_DEFAULT_WEIGHT_OPPORTUNISTIC = 2; + CGroupsCpuResourceHandlerImpl(CGroupsHandler cGroupsHandler) { - this.cGroupsHandler = cGroupsHandler; + super(cGroupsHandler); } @Override - public List<PrivilegedOperation> bootstrap(Configuration conf) - throws ResourceHandlerException { - return bootstrap( - ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf), conf); - } - - @VisibleForTesting - List<PrivilegedOperation> bootstrap( - ResourceCalculatorPlugin plugin, Configuration conf) - throws ResourceHandlerException { - this.strictResourceUsageMode = conf.getBoolean( - YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, - YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE); - this.cGroupsHandler.initializeCGroupController(CPU); - nodeVCores = NodeManagerHardwareUtils.getVCores(plugin, conf); - - // cap overall usage to the number of cores allocated to YARN - yarnProcessors = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf); - int systemProcessors = NodeManagerHardwareUtils.getNodeCPUs(plugin, conf); - boolean existingCpuLimits; - try { - existingCpuLimits = - cpuLimitsExist(cGroupsHandler.getPathForCGroup(CPU, "")); - } catch (IOException ie) { - throw new ResourceHandlerException(ie); - } - if (systemProcessors != (int) yarnProcessors) { - LOG.info("YARN containers restricted to " + yarnProcessors + " cores"); - int[] limits = getOverallLimits(yarnProcessors); + protected void updateCgroupMaxCpuLimit(String cgroupId, String quota, String period) throws ResourceHandlerException { + if (quota != null) { cGroupsHandler - .updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_PERIOD_US, - String.valueOf(limits[0])); - cGroupsHandler - .updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_QUOTA_US, - String.valueOf(limits[1])); - } else if (existingCpuLimits) { - LOG.info("Removing CPU constraints for YARN containers."); - cGroupsHandler - .updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_QUOTA_US, - String.valueOf(-1)); + .updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_QUOTA_US, quota); } - return null; - } - - @InterfaceAudience.Private - public static boolean cpuLimitsExist(String path) - throws IOException { - File quotaFile = new File(path, - CPU.getName() + "." + CGroupsHandler.CGROUP_CPU_QUOTA_US); - if (quotaFile.exists()) { - String contents = FileUtils.readFileToString(quotaFile, StandardCharsets.UTF_8); - int quotaUS = Integer.parseInt(contents.trim()); - if (quotaUS != -1) { - return true; - } - } - return false; - } - - @VisibleForTesting - @InterfaceAudience.Private - public static int[] getOverallLimits(float yarnProcessors) { - - int[] ret = new int[2]; - - if (yarnProcessors < 0.01f) { - throw new IllegalArgumentException("Number of processors can't be <= 0."); - } - - int quotaUS = MAX_QUOTA_US; - int periodUS = (int) (MAX_QUOTA_US / yarnProcessors); - if (yarnProcessors < 1.0f) { - periodUS = MAX_QUOTA_US; - quotaUS = (int) (periodUS * yarnProcessors); - if (quotaUS < MIN_PERIOD_US) { - LOG.warn("The quota calculated for the cgroup was too low." - + " The minimum value is " + MIN_PERIOD_US - + ", calculated value is " + quotaUS - + ". Setting quota to minimum value."); - quotaUS = MIN_PERIOD_US; - } - } - - // cfs_period_us can't be less than 1000 microseconds - // if the value of periodUS is less than 1000, we can't really use cgroups - // to limit cpu - if (periodUS < MIN_PERIOD_US) { - LOG.warn("The period calculated for the cgroup was too low." - + " The minimum value is " + MIN_PERIOD_US - + ", calculated value is " + periodUS - + ". Using all available CPU."); - periodUS = MAX_QUOTA_US; - quotaUS = -1; + if (period != null) { + cGroupsHandler + .updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_PERIOD_US, period); } - - ret[0] = periodUS; - ret[1] = quotaUS; - return ret; } @Override - public List<PrivilegedOperation> preStart(Container container) - throws ResourceHandlerException { - String cgroupId = container.getContainerId().toString(); - cGroupsHandler.createCGroup(CPU, cgroupId); - updateContainer(container); - List<PrivilegedOperation> ret = new ArrayList<>(); - ret.add(new PrivilegedOperation( - PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, - PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupsHandler - .getPathForCGroupTasks(CPU, cgroupId))); - return ret; + protected int getOpportunisticCpuWeight() { + return CPU_DEFAULT_WEIGHT_OPPORTUNISTIC; } - - @Override - public List<PrivilegedOperation> reacquireContainer(ContainerId containerId) - throws ResourceHandlerException { - return null; + protected int getCpuWeightByContainerVcores(int containerVCores) { + return containerVCores * CPU_DEFAULT_WEIGHT; } @Override - public List<PrivilegedOperation> updateContainer(Container container) - throws ResourceHandlerException { - Resource containerResource = container.getResource(); - String cgroupId = container.getContainerId().toString(); - File cgroup = new File(cGroupsHandler.getPathForCGroup(CPU, cgroupId)); - if (cgroup.exists()) { - try { - int containerVCores = containerResource.getVirtualCores(); - ContainerTokenIdentifier id = container.getContainerTokenIdentifier(); - if (id != null && id.getExecutionType() == - ExecutionType.OPPORTUNISTIC) { - cGroupsHandler - .updateCGroupParam(CPU, cgroupId, - CGroupsHandler.CGROUP_CPU_SHARES, - String.valueOf(CPU_DEFAULT_WEIGHT_OPPORTUNISTIC)); - } else { - int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores; - cGroupsHandler - .updateCGroupParam(CPU, cgroupId, - CGroupsHandler.CGROUP_CPU_SHARES, - String.valueOf(cpuShares)); - } - if (strictResourceUsageMode) { - if (nodeVCores != containerVCores) { - float containerCPU = - (containerVCores * yarnProcessors) / (float) nodeVCores; - int[] limits = getOverallLimits(containerCPU); - cGroupsHandler.updateCGroupParam(CPU, cgroupId, - CGroupsHandler.CGROUP_CPU_PERIOD_US, String.valueOf(limits[0])); - cGroupsHandler.updateCGroupParam(CPU, cgroupId, - CGroupsHandler.CGROUP_CPU_QUOTA_US, String.valueOf(limits[1])); - } - } - } catch (ResourceHandlerException re) { - cGroupsHandler.deleteCGroup(CPU, cgroupId); - LOG.warn("Could not update cgroup for container", re); - throw re; - } - } - return null; + protected void updateCgroupCpuWeight(String cgroupId, int weight) throws ResourceHandlerException { + cGroupsHandler.updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_SHARES, + String.valueOf(weight)); } @Override - public List<PrivilegedOperation> postComplete(ContainerId containerId) - throws ResourceHandlerException { - cGroupsHandler.deleteCGroup(CPU, containerId.toString()); - return null; - } - - @Override public List<PrivilegedOperation> teardown() - throws ResourceHandlerException { - return null; + public boolean cpuLimitExists(String cgroupPath) throws ResourceHandlerException { + try { + return checkCgroupV1CPULimitExists(cgroupPath); + } catch (IOException e) { + throw new ResourceHandlerException("Failed to check CPU limit", e); + } } - @Override - public String toString() { - return CGroupsCpuResourceHandlerImpl.class.getName(); + @InterfaceAudience.Private + public static boolean checkCgroupV1CPULimitExists(String path) throws IOException { + File quotaFile = new File(path, + CPU.getName() + "." + CGroupsHandler.CGROUP_CPU_QUOTA_US); + if (quotaFile.exists()) { + String contents = FileUtils.readFileToString(quotaFile, StandardCharsets.UTF_8); + return Integer.parseInt(contents.trim()) != -1; + } + return false; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java index 73e1443a2e9b..b8b4b2b7e3e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java @@ -122,11 +122,12 @@ public interface CGroupsHandler { // v2 specific params String CGROUP_CONTROLLERS_FILE = "cgroup.controllers"; String CGROUP_SUBTREE_CONTROL_FILE = "cgroup.subtree_control"; + String CGROUP_CPU_MAX = "max"; // present in v1 and v2 String CGROUP_PROCS_FILE = "cgroup.procs"; String CGROUP_PARAM_CLASSID = "classid"; - String CGROUP_PARAM_BLKIO_WEIGHT = "weight"; + String CGROUP_PARAM_WEIGHT = "weight"; /** * Mounts or initializes a cgroup controller. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsV2CpuResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsV2CpuResourceHandlerImpl.java new file mode 100644 index 000000000000..97456ad7d285 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsV2CpuResourceHandlerImpl.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.VisibleForTesting; + +/** + * An implementation for using CGroups V2 to restrict CPU usage on Linux. The + * implementation supports 3 different controls - restrict usage of all YARN + * containers, restrict relative usage of individual YARN containers and + * restrict usage of individual YARN containers. Admins can set the overall CPU + * to be used by all YARN containers - this is implemented by setting + * cpu.max to the value desired. If strict resource usage mode is not enabled, + * cpu.weight is set for individual containers - this prevents containers from + * exceeding the overall limit for YARN containers but individual containers + * can use as much of the CPU as available(under the YARN limit). If strict + * resource usage is enabled, then container can only use the percentage of + * CPU allocated to them and this is again implemented using cpu.max. + */ +@InterfaceStability.Unstable +@InterfaceAudience.Private +public class CGroupsV2CpuResourceHandlerImpl extends AbstractCGroupsCpuResourceHandler { + private static final CGroupsHandler.CGroupController CPU = + CGroupsHandler.CGroupController.CPU; + + @VisibleForTesting + static final int CPU_DEFAULT_WEIGHT = 100; // cgroup v2 default + static final int CPU_DEFAULT_WEIGHT_OPPORTUNISTIC = 1; + static final int CPU_MAX_WEIGHT = 10000; + static final String NO_LIMIT = "max"; + + + CGroupsV2CpuResourceHandlerImpl(CGroupsHandler cGroupsHandler) { + super(cGroupsHandler); + } + + @Override + protected void updateCgroupMaxCpuLimit(String cgroupId, String max, String period) + throws ResourceHandlerException { + // The cpu.max file in cgroup v2 is a read-write two value file which exists on + // non-root cgroups. The default is “max 100000”. + // It is the maximum bandwidth limit. It’s in the following format: + // $MAX $PERIOD + // which indicates that the group may consume up to $MAX in each $PERIOD duration. + // “max” for $MAX indicates no limit. If only one number is written, $MAX is updated. + String currentCpuMax = cGroupsHandler.getCGroupParam(CPU, cgroupId, + CGroupsHandler.CGROUP_CPU_MAX); + + if (currentCpuMax == null) { + currentCpuMax = ""; + } + + String[] currentCpuMaxArray = currentCpuMax.split(" "); + String maxToSet = max != null ? max : currentCpuMaxArray[0]; + maxToSet = maxToSet.equals("-1") ? NO_LIMIT : maxToSet; + String periodToSet = period != null ? period : currentCpuMaxArray[1]; + cGroupsHandler + .updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_MAX, + maxToSet + " " + periodToSet); + } + + @Override + protected int getOpportunisticCpuWeight() { + return CPU_DEFAULT_WEIGHT_OPPORTUNISTIC; + } + protected int getCpuWeightByContainerVcores(int containerVCores) { + return Math.min(containerVCores * CPU_DEFAULT_WEIGHT, CPU_MAX_WEIGHT); + } + + @Override + protected void updateCgroupCpuWeight(String cgroupId, int weight) throws ResourceHandlerException { + cGroupsHandler.updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_PARAM_WEIGHT, + String.valueOf(weight)); + } + + @Override + public boolean cpuLimitExists(String cgroupPath) throws ResourceHandlerException { + String globalCpuMaxLimit = cGroupsHandler.getCGroupParam(CPU, "", + CGroupsHandler.CGROUP_CPU_MAX); + if (globalCpuMaxLimit == null) { + return false; + } + String[] cpuMaxLimitArray = globalCpuMaxLimit.split(" "); + + return !cpuMaxLimitArray[0].equals(NO_LIMIT); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java index 037d4cf1706a..74d6f8a528ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java @@ -175,7 +175,7 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler { String.valueOf(limits[0])); updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(limits[1])); - } else if (CGroupsCpuResourceHandlerImpl.cpuLimitsExist( + } else if (CGroupsCpuResourceHandlerImpl.checkCgroupV1CPULimitExists( pathForCgroup(CONTROLLER_CPU, ""))) { LOG.info("Removing CPU constraints for YARN containers."); updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(-1)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsBlkioResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsBlkioResourceHandlerImpl.java index e8ec6fa83bd8..65a5c667b68a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsBlkioResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsBlkioResourceHandlerImpl.java @@ -77,7 +77,7 @@ public class TestCGroupsBlkioResourceHandlerImpl { CGroupsHandler.CGroupController.BLKIO, id); verify(mockCGroupsHandler, times(1)).updateCGroupParam( CGroupsHandler.CGroupController.BLKIO, id, - CGroupsHandler.CGROUP_PARAM_BLKIO_WEIGHT, + CGroupsHandler.CGROUP_PARAM_WEIGHT, CGroupsBlkioResourceHandlerImpl.DEFAULT_WEIGHT); Assert.assertNotNull(ret); Assert.assertEquals(1, ret.size()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsV2CpuResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsV2CpuResourceHandlerImpl.java new file mode 100644 index 000000000000..1d77d8adc62e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsV2CpuResourceHandlerImpl.java @@ -0,0 +1,313 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.List; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestCGroupsV2CpuResourceHandlerImpl { + + private CGroupsHandler mockCGroupsHandler; + private CGroupsV2CpuResourceHandlerImpl cGroupsCpuResourceHandler; + private ResourceCalculatorPlugin plugin; + final int numProcessors = 4; + + @Before + public void setup() { + mockCGroupsHandler = mock(CGroupsHandler.class); + when(mockCGroupsHandler.getPathForCGroup(any(), any())).thenReturn("."); + cGroupsCpuResourceHandler = + new CGroupsV2CpuResourceHandlerImpl(mockCGroupsHandler); + + plugin = mock(ResourceCalculatorPlugin.class); + Mockito.doReturn(numProcessors).when(plugin).getNumProcessors(); + Mockito.doReturn(numProcessors).when(plugin).getNumCores(); + } + + @Test + public void testBootstrap() throws Exception { + Configuration conf = new YarnConfiguration(); + + List<PrivilegedOperation> ret = + cGroupsCpuResourceHandler.bootstrap(plugin, conf); + verify(mockCGroupsHandler, times(1)) + .initializeCGroupController(CGroupsHandler.CGroupController.CPU); + verify(mockCGroupsHandler, times(0)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, "", + CGroupsHandler.CGROUP_CPU_MAX, ""); + Assert.assertNull(ret); + } + + @Test + public void testBootstrapLimits() throws Exception { + Configuration conf = new YarnConfiguration(); + + int cpuPerc = 80; + conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, + cpuPerc); + int period = (CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US * 100) / (cpuPerc + * numProcessors); + String cpuMaxValue = CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US + " " + period; + List<PrivilegedOperation> ret = + cGroupsCpuResourceHandler.bootstrap(plugin, conf); + verify(mockCGroupsHandler, times(1)) + .initializeCGroupController(CGroupsHandler.CGroupController.CPU); + verify(mockCGroupsHandler, times(1)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, "", + CGroupsHandler.CGROUP_CPU_MAX, cpuMaxValue); + Assert.assertNull(ret); + } + + @Test + public void testBootstrapExistingLimits() throws Exception { + Configuration conf = new YarnConfiguration(); + + when(mockCGroupsHandler + .getCGroupParam(CGroupsHandler.CGroupController.CPU, "", + CGroupsHandler.CGROUP_CPU_MAX)) + .thenReturn("100 100000"); + + List<PrivilegedOperation> ret = + cGroupsCpuResourceHandler.bootstrap(plugin, conf); + verify(mockCGroupsHandler, times(1)) + .initializeCGroupController(CGroupsHandler.CGroupController.CPU); + verify(mockCGroupsHandler, times(2)) + .getCGroupParam(CGroupsHandler.CGroupController.CPU, "", + CGroupsHandler.CGROUP_CPU_MAX); + verify(mockCGroupsHandler, times(1)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, "", + CGroupsHandler.CGROUP_CPU_MAX, "max 100000"); + Assert.assertNull(ret); + } + + @Test + public void testPreStart() throws Exception { + String id = "container_01_01"; + String path = "test-path/" + id; + ContainerId mockContainerId = mock(ContainerId.class); + when(mockContainerId.toString()).thenReturn(id); + Container mockContainer = mock(Container.class); + when(mockContainer.getContainerId()).thenReturn(mockContainerId); + when(mockCGroupsHandler + .getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id)) + .thenReturn(path); + when(mockContainer.getResource()).thenReturn(Resource.newInstance(1024, 2)); + + List<PrivilegedOperation> ret = + cGroupsCpuResourceHandler.preStart(mockContainer); + verify(mockCGroupsHandler, times(1)) + .createCGroup(CGroupsHandler.CGroupController.CPU, id); + verify(mockCGroupsHandler, times(1)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id, + CGroupsHandler.CGROUP_PARAM_WEIGHT, String + .valueOf(CGroupsV2CpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT * 2)); // 2 vcores + + // don't set cpu.max + verify(mockCGroupsHandler, never()) + .updateCGroupParam(eq(CGroupsHandler.CGroupController.CPU), eq(id), + eq(CGroupsHandler.CGROUP_CPU_MAX), anyString()); + + validatePrivilegedOperationList(ret, path); + } + + @Test + public void testPreStartStrictUsage() throws Exception { + String id = "container_01_01"; + String path = "test-path/" + id; + ContainerId mockContainerId = mock(ContainerId.class); + when(mockContainerId.toString()).thenReturn(id); + Container mockContainer = mock(Container.class); + when(mockContainer.getContainerId()).thenReturn(mockContainerId); + when(mockCGroupsHandler + .getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id)) + .thenReturn(path); + when(mockContainer.getResource()).thenReturn(Resource.newInstance(1024, 1)); + Configuration conf = new YarnConfiguration(); + conf.setBoolean( + YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, + true); + + cGroupsCpuResourceHandler.bootstrap(plugin, conf); + + int defaultVCores = 8; + float share = (float) numProcessors / (float) defaultVCores; + List<PrivilegedOperation> ret = + cGroupsCpuResourceHandler.preStart(mockContainer); + + verify(mockCGroupsHandler, times(1)) + .createCGroup(CGroupsHandler.CGroupController.CPU, id); + verify(mockCGroupsHandler, times(1)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id, + CGroupsHandler.CGROUP_PARAM_WEIGHT, + String.valueOf(CGroupsV2CpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT)); + + // set quota and period + String cpuMaxValue = (int) (CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US * share) + + " " + CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US; + verify(mockCGroupsHandler, times(1)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id, + CGroupsHandler.CGROUP_CPU_MAX, cpuMaxValue); + + validatePrivilegedOperationList(ret, path); + } + + @Test + public void testPreStartRestrictedContainers() throws Exception { + String id = "container_01_01"; + String path = "test-path/" + id; + int defaultVCores = 8; + Configuration conf = new YarnConfiguration(); + conf.setBoolean( + YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, + true); + int cpuPerc = 75; + conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, + cpuPerc); + + cGroupsCpuResourceHandler.bootstrap(plugin, conf); + + String maxCpuLimit = CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US + " " + + CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US * 100 / (cpuPerc * numProcessors); + verify(mockCGroupsHandler, times(1)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, "", + CGroupsHandler.CGROUP_CPU_MAX, maxCpuLimit); + + float yarnCores = (float) (cpuPerc * numProcessors) / 100; + int[] containerVCores = { 2, 4 }; + for (int cVcores : containerVCores) { + ContainerId mockContainerId = mock(ContainerId.class); + when(mockContainerId.toString()).thenReturn(id); + Container mockContainer = mock(Container.class); + when(mockContainer.getContainerId()).thenReturn(mockContainerId); + when(mockCGroupsHandler + .getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id)) + .thenReturn(path); + when(mockContainer.getResource()) + .thenReturn(Resource.newInstance(1024, cVcores)); + when(mockCGroupsHandler + .getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id)) + .thenReturn(path); + + float share = (cVcores * yarnCores) / defaultVCores; + int quotaUS; + int periodUS; + if (share > 1.0f) { + quotaUS = CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US; + periodUS = + (int) ((float) CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US / share); + } else { + quotaUS = (int) (CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US * share); + periodUS = CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US; + } + + cGroupsCpuResourceHandler.preStart(mockContainer); + + verify(mockCGroupsHandler, times(1)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id, + CGroupsHandler.CGROUP_PARAM_WEIGHT, String.valueOf( + CGroupsV2CpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT * cVcores)); + + // set cpu.max + verify(mockCGroupsHandler, times(1)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id, + CGroupsHandler.CGROUP_CPU_MAX, quotaUS + " " + periodUS); + } + } + + @Test + public void testReacquireContainer() throws Exception { + ContainerId containerIdMock = mock(ContainerId.class); + Assert.assertNull( + cGroupsCpuResourceHandler.reacquireContainer(containerIdMock)); + } + + @Test + public void testPostComplete() throws Exception { + String id = "container_01_01"; + ContainerId mockContainerId = mock(ContainerId.class); + when(mockContainerId.toString()).thenReturn(id); + Assert.assertNull(cGroupsCpuResourceHandler.postComplete(mockContainerId)); + verify(mockCGroupsHandler, times(1)) + .deleteCGroup(CGroupsHandler.CGroupController.CPU, id); + } + + @Test + public void testTeardown() throws Exception { + Assert.assertNull(cGroupsCpuResourceHandler.teardown()); + } + + @Test + public void testOpportunistic() throws Exception { + Configuration conf = new YarnConfiguration(); + + cGroupsCpuResourceHandler.bootstrap(plugin, conf); + + ContainerTokenIdentifier tokenId = mock(ContainerTokenIdentifier.class); + when(tokenId.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC); + Container container = mock(Container.class); + String id = "container_01_01"; + ContainerId mockContainerId = mock(ContainerId.class); + when(mockContainerId.toString()).thenReturn(id); + when(container.getContainerId()).thenReturn(mockContainerId); + when(container.getContainerTokenIdentifier()).thenReturn(tokenId); + when(container.getResource()).thenReturn(Resource.newInstance(1024, 2)); + + cGroupsCpuResourceHandler.preStart(container); + verify(mockCGroupsHandler, times(1)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id, + CGroupsHandler.CGROUP_PARAM_WEIGHT, String.valueOf( + CGroupsV2CpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT_OPPORTUNISTIC)); + } + + private void validatePrivilegedOperationList(List<PrivilegedOperation> ops, String path) { + Assert.assertNotNull(ops); + Assert.assertEquals(1, ops.size()); + PrivilegedOperation op = ops.get(0); + Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, + op.getOperationType()); + List<String> args = op.getArguments(); + Assert.assertEquals(1, args.size()); + Assert.assertEquals(PrivilegedOperation.CGROUP_ARG_PREFIX + path, + args.get(0)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org