This is an automated email from the ASF dual-hosted git repository.
wwei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new a4bd64e YARN-9116. Capacity Scheduler: implements queue level
maximum-allocation inheritance. Contributed by Aihua Xu.
a4bd64e is described below
commit a4bd64e724cbe2ef639c984ddfe2da3baf170a96
Author: Weiwei Yang <[email protected]>
AuthorDate: Thu Jan 24 17:20:52 2019 +0800
YARN-9116. Capacity Scheduler: implements queue level maximum-allocation
inheritance. Contributed by Aihua Xu.
---
.../hadoop/yarn/util/resource/ResourceUtils.java | 110 ++++++++++
.../hadoop/yarn/submarine/client/cli/CliUtils.java | 89 --------
.../client/cli/param/RunJobParameters.java | 7 +-
.../submarine/client/cli/TestRunJobCliParsing.java | 74 -------
.../hadoop/yarn/util/resource/Resources.java | 20 --
.../yarn/util/resource/TestResourceUtils.java | 71 ++++++
.../hadoop/yarn/util/resource/TestResources.java | 8 +-
.../scheduler/capacity/AbstractCSQueue.java | 55 ++++-
.../scheduler/capacity/CSQueue.java | 4 +
.../capacity/CapacitySchedulerConfiguration.java | 65 +++---
.../scheduler/fair/ConfigurableResource.java | 3 +-
.../scheduler/capacity/TestCapacityScheduler.java | 237 ++++++++++++++++++---
12 files changed, 483 insertions(+), 260 deletions(-)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
index 26d7592..b6cb581 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
@@ -20,14 +20,18 @@ package org.apache.hadoop.yarn.util.resource;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
+import org.apache.hadoop.yarn.api.records.impl.LightWeightResource;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
@@ -71,6 +75,8 @@ public class ResourceUtils {
"^(((\\p{Alnum}([\\p{Alnum}-]*\\p{Alnum})?\\.)*"
+ "\\p{Alnum}([\\p{Alnum}-]*\\p{Alnum})?)/)?\\p{Alpha}([\\w.-]*)$");
+ private final static String RES_PATTERN = "^[^=]+=\\d+\\s?\\w*$";
+
private static volatile boolean initializedResources = false;
private static final Map<String, Integer> RESOURCE_NAME_TO_INDEX =
new ConcurrentHashMap<String, Integer>();
@@ -776,4 +782,108 @@ public class ResourceUtils {
return info;
}
+
+ /**
+ * Return a new {@link Resource} instance with all resource values
+ * initialized to {@code value}.
+ * @param value the value to use for all resources
+ * @return a new {@link Resource} instance
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public static Resource createResourceWithSameValue(long value) {
+ LightWeightResource res = new LightWeightResource(value,
+ Long.valueOf(value).intValue());
+ int numberOfResources = getNumberOfKnownResourceTypes();
+ for (int i = 2; i < numberOfResources; i++) {
+ res.setResourceValue(i, value);
+ }
+
+ return res;
+ }
+
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public static Resource createResourceFromString(
+ String resourceStr,
+ List<ResourceTypeInfo> resourceTypeInfos) {
+ Map<String, Long> typeToValue = parseResourcesString(resourceStr);
+ validateResourceTypes(typeToValue.keySet(), resourceTypeInfos);
+ Resource resource = Resource.newInstance(0, 0);
+ for (Entry<String, Long> entry : typeToValue.entrySet()) {
+ resource.setResourceValue(entry.getKey(), entry.getValue());
+ }
+ return resource;
+ }
+
+ private static Map<String, Long> parseResourcesString(String resourcesStr) {
+ Map<String, Long> resources = new HashMap<>();
+ String[] pairs = resourcesStr.trim().split(",");
+ for (String resource : pairs) {
+ resource = resource.trim();
+ if (!resource.matches(RES_PATTERN)) {
+ throw new IllegalArgumentException("\"" + resource + "\" is not a "
+ + "valid resource type/amount pair. "
+ + "Please provide key=amount pairs separated by commas.");
+ }
+ String[] splits = resource.split("=");
+ String key = splits[0], value = splits[1];
+ String units = getUnits(value);
+
+ String valueWithoutUnit = value.substring(0,
+ value.length()- units.length()).trim();
+ long resourceValue = Long.parseLong(valueWithoutUnit);
+
+ // Convert commandline unit to standard YARN unit.
+ if (units.equals("M") || units.equals("m")) {
+ units = "Mi";
+ } else if (units.equals("G") || units.equals("g")) {
+ units = "Gi";
+ } else if (units.isEmpty()) {
+ // do nothing;
+ } else {
+ throw new IllegalArgumentException("Acceptable units are M/G or
empty");
+ }
+
+ // special handle memory-mb and memory
+ if (key.equals(ResourceInformation.MEMORY_URI)) {
+ if (!units.isEmpty()) {
+ resourceValue = UnitsConversionUtil.convert(units, "Mi",
+ resourceValue);
+ }
+ }
+
+ if (key.equals("memory")) {
+ key = ResourceInformation.MEMORY_URI;
+ resourceValue = UnitsConversionUtil.convert(units, "Mi",
+ resourceValue);
+ }
+
+ // special handle gpu
+ if (key.equals("gpu")) {
+ key = ResourceInformation.GPU_URI;
+ }
+
+ // special handle fpga
+ if (key.equals("fpga")) {
+ key = ResourceInformation.FPGA_URI;
+ }
+
+ resources.put(key, resourceValue);
+ }
+ return resources;
+ }
+
+ private static void validateResourceTypes(
+ Iterable<String> resourceNames,
+ List<ResourceTypeInfo> resourceTypeInfos)
+ throws ResourceNotFoundException {
+ for (String resourceName : resourceNames) {
+ if (!resourceTypeInfos.stream().anyMatch(
+ e -> e.getName().equals(resourceName))) {
+ throw new ResourceNotFoundException(
+ "Unknown resource: " + resourceName);
+ }
+ }
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java
index f3eee7c..c00bc2c 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java
@@ -16,23 +16,15 @@ package org.apache.hadoop.yarn.submarine.client.cli;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceInformation;
-import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
-import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
import
org.apache.hadoop.yarn.submarine.common.exception.SubmarineRuntimeException;
import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
-import org.apache.hadoop.yarn.util.UnitsConversionUtil;
-import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import static org.apache.hadoop.yarn.submarine.client.cli.CliConstants.KEYTAB;
@@ -41,7 +33,6 @@ import static
org.apache.hadoop.yarn.submarine.client.cli.CliConstants.PRINCIPAL
public class CliUtils {
private static final Logger LOG =
LoggerFactory.getLogger(CliUtils.class);
- private final static String RES_PATTERN = "^[^=]+=\\d+\\s?\\w*$";
/**
* Replace patterns inside cli
*
@@ -74,86 +65,6 @@ public class CliUtils {
return newCli;
}
- private static Map<String, Long> parseResourcesString(String resourcesStr) {
- Map<String, Long> resources = new HashMap<>();
- String[] pairs = resourcesStr.trim().split(",");
- for (String resource : pairs) {
- resource = resource.trim();
- if (!resource.matches(RES_PATTERN)) {
- throw new IllegalArgumentException("\"" + resource + "\" is not a "
- + "valid resource type/amount pair. "
- + "Please provide key=amount pairs separated by commas.");
- }
- String[] splits = resource.split("=");
- String key = splits[0], value = splits[1];
- String units = ResourceUtils.getUnits(value);
-
- String valueWithoutUnit = value.substring(0,
- value.length()- units.length()).trim();
- long resourceValue = Long.parseLong(valueWithoutUnit);
-
- // Convert commandline unit to standard YARN unit.
- if (units.equals("M") || units.equals("m")) {
- units = "Mi";
- } else if (units.equals("G") || units.equals("g")) {
- units = "Gi";
- } else if (units.isEmpty()) {
- // do nothing;
- } else {
- throw new IllegalArgumentException("Acceptable units are M/G or
empty");
- }
-
- // special handle memory-mb and memory
- if (key.equals(ResourceInformation.MEMORY_URI)) {
- if (!units.isEmpty()) {
- resourceValue = UnitsConversionUtil.convert(units, "Mi",
- resourceValue);
- }
- }
-
- if (key.equals("memory")) {
- key = ResourceInformation.MEMORY_URI;
- resourceValue = UnitsConversionUtil.convert(units, "Mi",
- resourceValue);
- }
-
- // special handle gpu
- if (key.equals("gpu")) {
- key = ResourceInformation.GPU_URI;
- }
-
- // special handle fpga
- if (key.equals("fpga")) {
- key = ResourceInformation.FPGA_URI;
- }
-
- resources.put(key, resourceValue);
- }
- return resources;
- }
-
- private static void validateResourceTypes(Iterable<String> resourceNames,
- List<ResourceTypeInfo> resourceTypes) throws IOException, YarnException {
- for (String resourceName : resourceNames) {
- if (!resourceTypes.stream().anyMatch(
- e -> e.getName().equals(resourceName))) {
- throw new ResourceNotFoundException(
- "Unknown resource: " + resourceName);
- }
- }
- }
-
- public static Resource createResourceFromString(String resourceStr,
- List<ResourceTypeInfo> resourceTypes) throws IOException, YarnException {
- Map<String, Long> typeToValue = parseResourcesString(resourceStr);
- validateResourceTypes(typeToValue.keySet(), resourceTypes);
- Resource resource = Resource.newInstance(0, 0);
- for (Map.Entry<String, Long> entry : typeToValue.entrySet()) {
- resource.setResourceValue(entry.getKey(), entry.getValue());
- }
- return resource;
- }
-
// Is it for help?
public static boolean argsForHelp(String[] args) {
if (args == null || args.length == 0)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java
index 111d4eb..9a01dad 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.submarine.client.cli.CliConstants;
import org.apache.hadoop.yarn.submarine.client.cli.CliUtils;
import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import java.io.IOException;
import java.util.ArrayList;
@@ -104,7 +105,7 @@ public class RunJobParameters extends RunParameters {
throw new ParseException(
"--" + CliConstants.WORKER_RES + " is absent.");
}
- workerResource = CliUtils.createResourceFromString(
+ workerResource = ResourceUtils.createResourceFromString(
workerResourceStr,
clientContext.getOrCreateYarnClient().getResourceTypeInfo());
}
@@ -115,7 +116,7 @@ public class RunJobParameters extends RunParameters {
if (psResourceStr == null) {
throw new ParseException("--" + CliConstants.PS_RES + " is absent.");
}
- psResource = CliUtils.createResourceFromString(psResourceStr,
+ psResource = ResourceUtils.createResourceFromString(psResourceStr,
clientContext.getOrCreateYarnClient().getResourceTypeInfo());
}
@@ -127,7 +128,7 @@ public class RunJobParameters extends RunParameters {
if (tensorboardResourceStr == null || tensorboardResourceStr.isEmpty()) {
tensorboardResourceStr = CliConstants.TENSORBOARD_DEFAULT_RESOURCES;
}
- tensorboardResource = CliUtils.createResourceFromString(
+ tensorboardResource = ResourceUtils.createResourceFromString(
tensorboardResourceStr,
clientContext.getOrCreateYarnClient().getResourceTypeInfo());
tensorboardDockerImage = parsedCommandLine.getOptionValue(
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java
index 184d53d..2a8f1da 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java
@@ -21,9 +21,6 @@ package org.apache.hadoop.yarn.submarine.client.cli;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceInformation;
-import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
import org.apache.hadoop.yarn.submarine.common.MockClientContext;
@@ -32,15 +29,12 @@ import
org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory;
import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor;
import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage;
-import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
@@ -204,72 +198,4 @@ public class TestRunJobCliParsing {
"python run-ps.py --input=hdfs://input
--model_dir=hdfs://output/model",
runJobCli.getRunJobParameters().getPSLaunchCmd());
}
-
- @Test
- public void testResourceUnitParsing() throws Exception {
- Resource res = CliUtils.createResourceFromString("memory=20g,vcores=3",
- ResourceUtils.getResourcesTypeInfo());
- Assert.assertEquals(Resources.createResource(20 * 1024, 3), res);
-
- res = CliUtils.createResourceFromString("memory=20G,vcores=3",
- ResourceUtils.getResourcesTypeInfo());
- Assert.assertEquals(Resources.createResource(20 * 1024, 3), res);
-
- res = CliUtils.createResourceFromString("memory=20M,vcores=3",
- ResourceUtils.getResourcesTypeInfo());
- Assert.assertEquals(Resources.createResource(20, 3), res);
-
- res = CliUtils.createResourceFromString("memory=20m,vcores=3",
- ResourceUtils.getResourcesTypeInfo());
- Assert.assertEquals(Resources.createResource(20, 3), res);
-
- res = CliUtils.createResourceFromString("memory-mb=20,vcores=3",
- ResourceUtils.getResourcesTypeInfo());
- Assert.assertEquals(Resources.createResource(20, 3), res);
-
- res = CliUtils.createResourceFromString("memory-mb=20m,vcores=3",
- ResourceUtils.getResourcesTypeInfo());
- Assert.assertEquals(Resources.createResource(20, 3), res);
-
- res = CliUtils.createResourceFromString("memory-mb=20G,vcores=3",
- ResourceUtils.getResourcesTypeInfo());
- Assert.assertEquals(Resources.createResource(20 * 1024, 3), res);
-
- // W/o unit for memory means bits, and 20 bits will be rounded to 0
- res = CliUtils.createResourceFromString("memory=20,vcores=3",
- ResourceUtils.getResourcesTypeInfo());
- Assert.assertEquals(Resources.createResource(0, 3), res);
-
- // Test multiple resources
- List<ResourceTypeInfo> resTypes = new ArrayList<>(
- ResourceUtils.getResourcesTypeInfo());
- resTypes.add(ResourceTypeInfo.newInstance(ResourceInformation.GPU_URI,
""));
- ResourceUtils.reinitializeResources(resTypes);
- res = CliUtils.createResourceFromString("memory=2G,vcores=3,gpu=0",
- resTypes);
- Assert.assertEquals(2 * 1024, res.getMemorySize());
- Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI));
-
- res = CliUtils.createResourceFromString("memory=2G,vcores=3,gpu=3",
- resTypes);
- Assert.assertEquals(2 * 1024, res.getMemorySize());
- Assert.assertEquals(3, res.getResourceValue(ResourceInformation.GPU_URI));
-
- res = CliUtils.createResourceFromString("memory=2G,vcores=3",
- resTypes);
- Assert.assertEquals(2 * 1024, res.getMemorySize());
- Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI));
-
- res = CliUtils.createResourceFromString("memory=2G,vcores=3,yarn.io/gpu=0",
- resTypes);
- Assert.assertEquals(2 * 1024, res.getMemorySize());
- Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI));
-
- res = CliUtils.createResourceFromString("memory=2G,vcores=3,yarn.io/gpu=3",
- resTypes);
- Assert.assertEquals(2 * 1024, res.getMemorySize());
- Assert.assertEquals(3, res.getResourceValue(ResourceInformation.GPU_URI));
-
- // TODO, add more negative tests.
- }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index bf1df8d..4764147 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -25,7 +25,6 @@ import
org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
-import org.apache.hadoop.yarn.api.records.impl.LightWeightResource;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
/**
@@ -40,25 +39,6 @@ public class Resources {
LogFactory.getLog(Resources.class);
/**
- * Return a new {@link Resource} instance with all resource values
- * initialized to {@code value}.
- * @param value the value to use for all resources
- * @return a new {@link Resource} instance
- */
- @Private
- @Unstable
- public static Resource createResourceWithSameValue(long value) {
- LightWeightResource res = new LightWeightResource(value,
- Long.valueOf(value).intValue());
- int numberOfResources = ResourceUtils.getNumberOfKnownResourceTypes();
- for (int i = 2; i < numberOfResources; i++) {
- res.setResourceValue(i, value);
- }
-
- return res;
- }
-
- /**
* Helper class to create a resource with a fixed value for all resource
* types. For example, a NONE resource which returns 0 for any resource type.
*/
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java
index d6e0565..f7ec4f8 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.After;
@@ -31,7 +32,9 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/**
@@ -391,6 +394,74 @@ public class TestResourceUtils {
}
}
+ @Test
+ public void testResourceUnitParsing() throws Exception {
+ Resource res =
ResourceUtils.createResourceFromString("memory=20g,vcores=3",
+ ResourceUtils.getResourcesTypeInfo());
+ Assert.assertEquals(Resources.createResource(20 * 1024, 3), res);
+
+ res = ResourceUtils.createResourceFromString("memory=20G,vcores=3",
+ ResourceUtils.getResourcesTypeInfo());
+ Assert.assertEquals(Resources.createResource(20 * 1024, 3), res);
+
+ res = ResourceUtils.createResourceFromString("memory=20M,vcores=3",
+ ResourceUtils.getResourcesTypeInfo());
+ Assert.assertEquals(Resources.createResource(20, 3), res);
+
+ res = ResourceUtils.createResourceFromString("memory=20m,vcores=3",
+ ResourceUtils.getResourcesTypeInfo());
+ Assert.assertEquals(Resources.createResource(20, 3), res);
+
+ res = ResourceUtils.createResourceFromString("memory-mb=20,vcores=3",
+ ResourceUtils.getResourcesTypeInfo());
+ Assert.assertEquals(Resources.createResource(20, 3), res);
+
+ res = ResourceUtils.createResourceFromString("memory-mb=20m,vcores=3",
+ ResourceUtils.getResourcesTypeInfo());
+ Assert.assertEquals(Resources.createResource(20, 3), res);
+
+ res = ResourceUtils.createResourceFromString("memory-mb=20G,vcores=3",
+ ResourceUtils.getResourcesTypeInfo());
+ Assert.assertEquals(Resources.createResource(20 * 1024, 3), res);
+
+ // W/o unit for memory means bits, and 20 bits will be rounded to 0
+ res = ResourceUtils.createResourceFromString("memory=20,vcores=3",
+ ResourceUtils.getResourcesTypeInfo());
+ Assert.assertEquals(Resources.createResource(0, 3), res);
+
+ // Test multiple resources
+ List<ResourceTypeInfo> resTypes = new ArrayList<>(
+ ResourceUtils.getResourcesTypeInfo());
+ resTypes.add(ResourceTypeInfo.newInstance(ResourceInformation.GPU_URI,
""));
+ ResourceUtils.reinitializeResources(resTypes);
+ res = ResourceUtils.createResourceFromString("memory=2G,vcores=3,gpu=0",
+ resTypes);
+ Assert.assertEquals(2 * 1024, res.getMemorySize());
+ Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI));
+
+ res = ResourceUtils.createResourceFromString("memory=2G,vcores=3,gpu=3",
+ resTypes);
+ Assert.assertEquals(2 * 1024, res.getMemorySize());
+ Assert.assertEquals(3, res.getResourceValue(ResourceInformation.GPU_URI));
+
+ res = ResourceUtils.createResourceFromString("memory=2G,vcores=3",
+ resTypes);
+ Assert.assertEquals(2 * 1024, res.getMemorySize());
+ Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI));
+
+ res = ResourceUtils.createResourceFromString(
+ "memory=2G,vcores=3,yarn.io/gpu=0", resTypes);
+ Assert.assertEquals(2 * 1024, res.getMemorySize());
+ Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI));
+
+ res = ResourceUtils.createResourceFromString(
+ "memory=2G,vcores=3,yarn.io/gpu=3", resTypes);
+ Assert.assertEquals(2 * 1024, res.getMemorySize());
+ Assert.assertEquals(3, res.getResourceValue(ResourceInformation.GPU_URI));
+
+ // TODO, add more negative tests.
+ }
+
public static String setupResourceTypes(Configuration conf, String filename)
throws Exception {
File source = new File(
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResources.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResources.java
index 07b24eb..ecd940e 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResources.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResources.java
@@ -269,7 +269,7 @@ public class TestResources {
unsetExtraResourceType();
setupExtraResourceType();
- Resource res = Resources.createResourceWithSameValue(11L);
+ Resource res = ResourceUtils.createResourceWithSameValue(11L);
assertEquals(11L, res.getMemorySize());
assertEquals(11, res.getVirtualCores());
assertEquals(11L,
res.getResourceInformation(EXTRA_RESOURCE_TYPE).getValue());
@@ -280,7 +280,7 @@ public class TestResources {
unsetExtraResourceType();
setupExtraResourceType();
- Resource res = Resources.createResourceWithSameValue(11);
+ Resource res = ResourceUtils.createResourceWithSameValue(11);
assertEquals(11, res.getMemorySize());
assertEquals(11, res.getVirtualCores());
assertEquals(11,
res.getResourceInformation(EXTRA_RESOURCE_TYPE).getValue());
@@ -288,14 +288,14 @@ public class TestResources {
@Test
public void testCreateSimpleResourceWithSameLongValue() {
- Resource res = Resources.createResourceWithSameValue(11L);
+ Resource res = ResourceUtils.createResourceWithSameValue(11L);
assertEquals(11L, res.getMemorySize());
assertEquals(11, res.getVirtualCores());
}
@Test
public void testCreateSimpleResourceWithSameIntValue() {
- Resource res = Resources.createResourceWithSameValue(11);
+ Resource res = ResourceUtils.createResourceWithSameValue(11);
assertEquals(11, res.getMemorySize());
assertEquals(11, res.getVirtualCores());
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 2c9f9a3..caa88cf 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -69,10 +70,13 @@ import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.Sets;
+import static
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.UNDEFINED;
+
public abstract class AbstractCSQueue implements CSQueue {
private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
@@ -361,9 +365,9 @@ public abstract class AbstractCSQueue implements CSQueue {
capacityConfigType = CapacityConfigType.NONE;
updateConfigurableResourceRequirement(getQueuePath(), clusterResource);
- this.maximumAllocation =
- configuration.getMaximumAllocationPerQueue(
- getQueuePath());
+ // Setup queue's maximumAllocation respecting the global setting
+ // and queue setting
+ setupMaximumAllocation(configuration);
// initialized the queue state based on previous state, configured state
// and its parent state.
@@ -425,6 +429,51 @@ public abstract class AbstractCSQueue implements CSQueue {
}
}
+ private void setupMaximumAllocation(CapacitySchedulerConfiguration csConf) {
+ String queue = getQueuePath();
+ Resource clusterMax = ResourceUtils
+ .fetchMaximumAllocationFromConfig(csConf);
+ Resource queueMax = csConf.getQueueMaximumAllocation(queue);
+
+ maximumAllocation = Resources.clone(
+ parent == null ? clusterMax : parent.getMaximumAllocation());
+
+ String errMsg =
+ "Queue maximum allocation cannot be larger than the cluster
setting"
+ + " for queue " + queue
+ + " max allocation per queue: %s"
+ + " cluster setting: " + clusterMax;
+
+ if (queueMax == Resources.none()) {
+ // Handle backward compatibility
+ long queueMemory = csConf.getQueueMaximumAllocationMb(queue);
+ int queueVcores = csConf.getQueueMaximumAllocationVcores(queue);
+ if (queueMemory != UNDEFINED) {
+ maximumAllocation.setMemorySize(queueMemory);
+ }
+
+ if (queueVcores != UNDEFINED) {
+ maximumAllocation.setVirtualCores(queueVcores);
+ }
+
+ if ((queueMemory != UNDEFINED && queueMemory > clusterMax.getMemorySize()
+ || (queueVcores != UNDEFINED
+ && queueVcores > clusterMax.getVirtualCores()))) {
+ throw new IllegalArgumentException(
+ String.format(errMsg, maximumAllocation));
+ }
+ } else {
+ // Queue level maximum-allocation can't be larger than cluster setting
+ for (ResourceInformation ri : queueMax.getResources()) {
+ if (ri.compareTo(clusterMax.getResourceInformation(ri.getName())) > 0)
{
+ throw new IllegalArgumentException(String.format(errMsg, queueMax));
+ }
+
+ maximumAllocation.setResourceInformation(ri.getName(), ri);
+ }
+ }
+ }
+
private Map<String, Float> getUserWeightsFromHierarchy
(CapacitySchedulerConfiguration configuration) throws
IOException {
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index c0c280e..1af3250 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -86,6 +86,10 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
public PrivilegedEntity getPrivilegedEntity();
+ Resource getMaximumAllocation();
+
+ Resource getMinimumAllocation();
+
/**
* Get the configured <em>capacity</em> of the queue.
* @return configured queue capacity
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index dec6a84..ead1f57 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -141,12 +142,14 @@ public class CapacitySchedulerConfiguration extends
ReservationSchedulerConfigur
public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true;
@Private
+ public static final String MAXIMUM_ALLOCATION = "maximum-allocation";
+
+ @Private
public static final String MAXIMUM_ALLOCATION_MB = "maximum-allocation-mb";
@Private
public static final String MAXIMUM_ALLOCATION_VCORES =
- "maximum-allocation-vcores";
-
+ "maximum-allocation-vcores";
/**
* Ordering policy of queues
*/
@@ -889,50 +892,32 @@ public class CapacitySchedulerConfiguration extends
ReservationSchedulerConfigur
}
/**
- * Get the per queue setting for the maximum limit to allocate to
- * each container request.
+ * Get maximum_allocation setting for the specified queue from the
+ * configuration.
*
* @param queue
* name of the queue
- * @return setting specified per queue else falls back to the cluster setting
+ * @return Resource object or Resource.none if not set
*/
- public Resource getMaximumAllocationPerQueue(String queue) {
- // Only support to specify memory and vcores maximum allocation per queue
- // for now.
+ public Resource getQueueMaximumAllocation(String queue) {
String queuePrefix = getQueuePrefix(queue);
- long maxAllocationMbPerQueue = getInt(queuePrefix + MAXIMUM_ALLOCATION_MB,
- (int)UNDEFINED);
- int maxAllocationVcoresPerQueue = getInt(
- queuePrefix + MAXIMUM_ALLOCATION_VCORES, (int)UNDEFINED);
- if (LOG.isDebugEnabled()) {
- LOG.debug("max alloc mb per queue for " + queue + " is "
- + maxAllocationMbPerQueue);
- LOG.debug("max alloc vcores per queue for " + queue + " is "
- + maxAllocationVcoresPerQueue);
- }
- Resource clusterMax = ResourceUtils.fetchMaximumAllocationFromConfig(this);
- if (maxAllocationMbPerQueue == (int)UNDEFINED) {
- LOG.info("max alloc mb per queue for " + queue + " is undefined");
- maxAllocationMbPerQueue = clusterMax.getMemorySize();
- }
- if (maxAllocationVcoresPerQueue == (int)UNDEFINED) {
- LOG.info("max alloc vcore per queue for " + queue + " is undefined");
- maxAllocationVcoresPerQueue = clusterMax.getVirtualCores();
- }
- // Copy from clusterMax and overwrite per-queue's maximum memory/vcore
- // allocation.
- Resource result = Resources.clone(clusterMax);
- result.setMemorySize(maxAllocationMbPerQueue);
- result.setVirtualCores(maxAllocationVcoresPerQueue);
- if (maxAllocationMbPerQueue > clusterMax.getMemorySize()
- || maxAllocationVcoresPerQueue > clusterMax.getVirtualCores()) {
- throw new IllegalArgumentException(
- "Queue maximum allocation cannot be larger than the cluster setting"
- + " for queue " + queue
- + " max allocation per queue: " + result
- + " cluster setting: " + clusterMax);
+ String rawQueueMaxAllocation = get(queuePrefix + MAXIMUM_ALLOCATION, null);
+ if (Strings.isNullOrEmpty(rawQueueMaxAllocation)) {
+ return Resources.none();
+ } else {
+ return ResourceUtils.createResourceFromString(rawQueueMaxAllocation,
+ ResourceUtils.getResourcesTypeInfo());
}
- return result;
+ }
+
+ public long getQueueMaximumAllocationMb(String queue) {
+ String queuePrefix = getQueuePrefix(queue);
+ return getInt(queuePrefix + MAXIMUM_ALLOCATION_MB, (int)UNDEFINED);
+ }
+
+ public int getQueueMaximumAllocationVcores(String queue) {
+ String queuePrefix = getQueuePrefix(queue);
+ return getInt(queuePrefix + MAXIMUM_ALLOCATION_VCORES, (int)UNDEFINED);
}
public boolean getEnableUserMetrics() {
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ConfigurableResource.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ConfigurableResource.java
index 62bad44..44049fd 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ConfigurableResource.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ConfigurableResource.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
-import org.apache.hadoop.yarn.util.resource.Resources;
/**
* A {@code ConfigurableResource} object represents an entity that is used to
@@ -53,7 +52,7 @@ public class ConfigurableResource {
* @param value the value to use for all resources
*/
ConfigurableResource(long value) {
- this(Resources.createResourceWithSameValue(value));
+ this(ResourceUtils.createResourceWithSameValue(value));
}
public ConfigurableResource(Resource resource) {
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 56882a7..aac7f15 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -18,8 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import static
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION;
import static
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
import static
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -2996,8 +2998,8 @@ public class TestCapacityScheduler extends
CapacitySchedulerTestBase {
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumResourceCapability().getMemorySize());
assertEquals("max allocation for A1",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
- conf.getMaximumAllocationPerQueue(A1).getMemorySize());
+ Resources.none(),
+ conf.getQueueMaximumAllocation(A1));
assertEquals("max allocation",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
@@ -3087,6 +3089,10 @@ public class TestCapacityScheduler extends
CapacitySchedulerTestBase {
cs.reinitialize(conf, mockContext);
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
+ CSQueue rootQueue = cs.getRootQueue();
+ CSQueue queueA = findQueue(rootQueue, A);
+ CSQueue queueA1 = findQueue(queueA, A1);
+
assertEquals("max capability MB in CS",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumResourceCapability().getMemorySize());
@@ -3095,10 +3101,10 @@ public class TestCapacityScheduler extends
CapacitySchedulerTestBase {
cs.getMaximumResourceCapability().getVirtualCores());
assertEquals("max allocation MB A1",
4096,
- conf.getMaximumAllocationPerQueue(A1).getMemorySize());
+ queueA1.getMaximumAllocation().getMemorySize());
assertEquals("max allocation vcores A1",
2,
- conf.getMaximumAllocationPerQueue(A1).getVirtualCores());
+ queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("cluster max allocation MB",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
@@ -3106,11 +3112,8 @@ public class TestCapacityScheduler extends
CapacitySchedulerTestBase {
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores());
- CSQueue rootQueue = cs.getRootQueue();
- CSQueue queueA = findQueue(rootQueue, A);
- CSQueue queueA1 = findQueue(queueA, A1);
- assertEquals("queue max allocation", 4096, ((LeafQueue) queueA1)
- .getMaximumAllocation().getMemorySize());
+ assertEquals("queue max allocation", 4096,
+ queueA1.getMaximumAllocation().getMemorySize());
setMaxAllocMb(conf, A1, 6144);
setMaxAllocVcores(conf, A1, 3);
@@ -3118,9 +3121,9 @@ public class TestCapacityScheduler extends
CapacitySchedulerTestBase {
// conf will have changed but we shouldn't be able to change max allocation
// for the actual queue
assertEquals("max allocation MB A1", 6144,
- conf.getMaximumAllocationPerQueue(A1).getMemorySize());
+ queueA1.getMaximumAllocation().getMemorySize());
assertEquals("max allocation vcores A1", 3,
- conf.getMaximumAllocationPerQueue(A1).getVirtualCores());
+ queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("max allocation MB cluster",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
@@ -3128,9 +3131,9 @@ public class TestCapacityScheduler extends
CapacitySchedulerTestBase {
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores());
assertEquals("queue max allocation MB", 6144,
- ((LeafQueue) queueA1).getMaximumAllocation().getMemorySize());
+ queueA1.getMaximumAllocation().getMemorySize());
assertEquals("queue max allocation vcores", 3,
- ((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores());
+ queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("max capability MB cluster",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumResourceCapability().getMemorySize());
@@ -3216,17 +3219,17 @@ public class TestCapacityScheduler extends
CapacitySchedulerTestBase {
CSQueue queueB2 = findQueue(queueB, B2);
assertEquals("queue A1 max allocation MB", 4096,
- ((LeafQueue) queueA1).getMaximumAllocation().getMemorySize());
+ queueA1.getMaximumAllocation().getMemorySize());
assertEquals("queue A1 max allocation vcores", 4,
- ((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores());
+ queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("queue A2 max allocation MB", 10240,
- ((LeafQueue) queueA2).getMaximumAllocation().getMemorySize());
+ queueA2.getMaximumAllocation().getMemorySize());
assertEquals("queue A2 max allocation vcores", 10,
- ((LeafQueue) queueA2).getMaximumAllocation().getVirtualCores());
+ queueA2.getMaximumAllocation().getVirtualCores());
assertEquals("queue B2 max allocation MB", 10240,
- ((LeafQueue) queueB2).getMaximumAllocation().getMemorySize());
+ queueB2.getMaximumAllocation().getMemorySize());
assertEquals("queue B2 max allocation vcores", 10,
- ((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores());
+ queueB2.getMaximumAllocation().getVirtualCores());
setMaxAllocMb(conf, 12288);
setMaxAllocVcores(conf, 12);
@@ -3238,17 +3241,187 @@ public class TestCapacityScheduler extends
CapacitySchedulerTestBase {
assertEquals("max allocation vcores in CS", 12,
cs.getMaximumResourceCapability().getVirtualCores());
assertEquals("queue A1 max MB allocation", 4096,
- ((LeafQueue) queueA1).getMaximumAllocation().getMemorySize());
+ queueA1.getMaximumAllocation().getMemorySize());
assertEquals("queue A1 max vcores allocation", 4,
- ((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores());
+ queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("queue A2 max MB allocation", 12288,
- ((LeafQueue) queueA2).getMaximumAllocation().getMemorySize());
+ queueA2.getMaximumAllocation().getMemorySize());
assertEquals("queue A2 max vcores allocation", 12,
- ((LeafQueue) queueA2).getMaximumAllocation().getVirtualCores());
+ queueA2.getMaximumAllocation().getVirtualCores());
assertEquals("queue B2 max MB allocation", 12288,
- ((LeafQueue) queueB2).getMaximumAllocation().getMemorySize());
+ queueB2.getMaximumAllocation().getMemorySize());
assertEquals("queue B2 max vcores allocation", 12,
- ((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores());
+ queueB2.getMaximumAllocation().getVirtualCores());
+ }
+
+ @Test
+ public void testQueuesMaxAllocationInheritance() throws Exception {
+ // queue level max allocation is set by the queue configuration explicitly
+ // or inherits from the parent.
+
+ CapacityScheduler cs = new CapacityScheduler();
+ cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+ setMaxAllocMb(conf,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+ setMaxAllocVcores(conf,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+
+ // Test the child queue overrides
+ setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
+ "memory-mb=4096,vcores=2");
+ setMaxAllocation(conf, A1, "memory-mb=6144,vcores=2");
+ setMaxAllocation(conf, B, "memory-mb=5120, vcores=2");
+ setMaxAllocation(conf, B2, "memory-mb=1024, vcores=2");
+
+ cs.init(conf);
+ cs.start();
+ cs.reinitialize(conf, mockContext);
+ checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
+
+ CSQueue rootQueue = cs.getRootQueue();
+ CSQueue queueA = findQueue(rootQueue, A);
+ CSQueue queueB = findQueue(rootQueue, B);
+ CSQueue queueA1 = findQueue(queueA, A1);
+ CSQueue queueA2 = findQueue(queueA, A2);
+ CSQueue queueB1 = findQueue(queueB, B1);
+ CSQueue queueB2 = findQueue(queueB, B2);
+
+ assertEquals("max capability MB in CS",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ cs.getMaximumResourceCapability().getMemorySize());
+ assertEquals("max capability vcores in CS",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ cs.getMaximumResourceCapability().getVirtualCores());
+ assertEquals("max allocation MB A1",
+ 6144,
+ queueA1.getMaximumAllocation().getMemorySize());
+ assertEquals("max allocation vcores A1",
+ 2,
+ queueA1.getMaximumAllocation().getVirtualCores());
+ assertEquals("max allocation MB A2", 4096,
+ queueA2.getMaximumAllocation().getMemorySize());
+ assertEquals("max allocation vcores A2",
+ 2,
+ queueA2.getMaximumAllocation().getVirtualCores());
+ assertEquals("max allocation MB B", 5120,
+ queueB.getMaximumAllocation().getMemorySize());
+ assertEquals("max allocation MB B1", 5120,
+ queueB1.getMaximumAllocation().getMemorySize());
+ assertEquals("max allocation MB B2", 1024,
+ queueB2.getMaximumAllocation().getMemorySize());
+
+ // Test get the max-allocation from different parent
+ unsetMaxAllocation(conf, A1);
+ unsetMaxAllocation(conf, B);
+ unsetMaxAllocation(conf, B1);
+ setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
+ "memory-mb=6144,vcores=2");
+ setMaxAllocation(conf, A, "memory-mb=8192,vcores=2");
+
+ cs.reinitialize(conf, mockContext);
+
+ assertEquals("max capability MB in CS",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ cs.getMaximumResourceCapability().getMemorySize());
+ assertEquals("max capability vcores in CS",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ cs.getMaximumResourceCapability().getVirtualCores());
+ assertEquals("max allocation MB A1",
+ 8192,
+ queueA1.getMaximumAllocation().getMemorySize());
+ assertEquals("max allocation vcores A1",
+ 2,
+ queueA1.getMaximumAllocation().getVirtualCores());
+ assertEquals("max allocation MB B1",
+ 6144,
+ queueB1.getMaximumAllocation().getMemorySize());
+ assertEquals("max allocation vcores B1",
+ 2,
+ queueB1.getMaximumAllocation().getVirtualCores());
+
+ // Test the default
+ unsetMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT);
+ unsetMaxAllocation(conf, A);
+ unsetMaxAllocation(conf, A1);
+ cs.reinitialize(conf, mockContext);
+
+ assertEquals("max capability MB in CS",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ cs.getMaximumResourceCapability().getMemorySize());
+ assertEquals("max capability vcores in CS",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ cs.getMaximumResourceCapability().getVirtualCores());
+ assertEquals("max allocation MB A1",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ queueA1.getMaximumAllocation().getMemorySize());
+ assertEquals("max allocation vcores A1",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ queueA1.getMaximumAllocation().getVirtualCores());
+ assertEquals("max allocation MB A2",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ queueA2.getMaximumAllocation().getMemorySize());
+ assertEquals("max allocation vcores A2",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ queueA2.getMaximumAllocation().getVirtualCores());
+ }
+
+ @Test
+ public void testVerifyQueuesMaxAllocationConf() throws Exception {
+ // queue level max allocation can't exceed the cluster setting
+
+ CapacityScheduler cs = new CapacityScheduler();
+ cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+ setMaxAllocMb(conf,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+ setMaxAllocVcores(conf,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+
+ long largerMem =
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB +
1024;
+ long largerVcores =
+
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES+10;
+
+ cs.init(conf);
+ cs.start();
+ cs.reinitialize(conf, mockContext);
+ checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
+
+ setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
+ "memory-mb=" + largerMem + ",vcores=2");
+ try {
+ cs.reinitialize(conf, mockContext);
+ fail("Queue Root maximum allocation can't exceed the cluster setting");
+ } catch(Exception e) {
+ assertTrue("maximum allocation exception",
+ e.getCause().getMessage().contains("maximum allocation"));
+ }
+
+ setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
+ "memory-mb=4096,vcores=2");
+ setMaxAllocation(conf, A, "memory-mb=6144,vcores=2");
+ setMaxAllocation(conf, A1, "memory-mb=" + largerMem + ",vcores=2");
+ try {
+ cs.reinitialize(conf, mockContext);
+ fail("Queue A1 maximum allocation can't exceed the cluster setting");
+ } catch(Exception e) {
+ assertTrue("maximum allocation exception",
+ e.getCause().getMessage().contains("maximum allocation"));
+ }
+ setMaxAllocation(conf, A1, "memory-mb=8192" + ",vcores=" + largerVcores);
+ try {
+ cs.reinitialize(conf, mockContext);
+ fail("Queue A1 maximum allocation can't exceed the cluster setting");
+ } catch(Exception e) {
+ assertTrue("maximum allocation exception",
+ e.getCause().getMessage().contains("maximum allocation"));
+ }
+
}
private void waitContainerAllocated(MockAM am, int mem, int nContainer,
@@ -4103,6 +4276,20 @@ public class TestCapacityScheduler extends
CapacitySchedulerTestBase {
conf.setInt(propName, maxAllocVcores);
}
+ private void setMaxAllocation(CapacitySchedulerConfiguration conf,
+ String queueName, String maxAllocation) {
+ String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+ + MAXIMUM_ALLOCATION;
+ conf.set(propName, maxAllocation);
+ }
+
+ private void unsetMaxAllocation(CapacitySchedulerConfiguration conf,
+ String queueName) {
+ String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+ + MAXIMUM_ALLOCATION;
+ conf.unset(propName);
+ }
+
private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
RMContainer rmContainer = cs.getRMContainer(containerId);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]