YARN-7332. Compute effectiveCapacity per each resource vector. (Sunil G via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f0d91dfe Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f0d91dfe Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f0d91dfe Branch: refs/heads/YARN-5881 Commit: f0d91dfe1813ed917ee3006d911465ad93fbcd67 Parents: c2440930 Author: Wangda Tan <wan...@apache.org> Authored: Fri Oct 27 10:16:33 2017 -0700 Committer: Sunil G <sun...@apache.org> Committed: Fri Nov 24 15:37:55 2017 +0530 ---------------------------------------------------------------------- .../scheduler/capacity/ParentQueue.java | 66 ++++++++++++-- .../scheduler/capacity/TestParentQueue.java | 94 ++++++++++++++++++++ 2 files changed, 153 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0d91dfe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 5ab1494..940637e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AccessType; @@ -68,7 +69,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; +import org.apache.hadoop.yarn.util.UnitsConversionUtil; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; @Private @@ -928,24 +931,25 @@ public class ParentQueue extends AbstractCSQueue { // Factor to scale down effective resource: When cluster has sufficient // resources, effective_min_resources will be same as configured // min_resources. - float effectiveMinRatio = 1; + Resource numeratorForMinRatio = null; ResourceCalculator rc = this.csContext.getResourceCalculator(); if (getQueueName().equals("root")) { if (!resourceByLabel.equals(Resources.none()) && Resources.lessThan(rc, clusterResource, resourceByLabel, configuredMinResources)) { - effectiveMinRatio = Resources.divide(rc, clusterResource, - resourceByLabel, configuredMinResources); + numeratorForMinRatio = resourceByLabel; } } else { if (Resources.lessThan(rc, clusterResource, queueResourceQuotas.getEffectiveMinResource(label), configuredMinResources)) { - effectiveMinRatio = Resources.divide(rc, clusterResource, - queueResourceQuotas.getEffectiveMinResource(label), - configuredMinResources); + numeratorForMinRatio = queueResourceQuotas + .getEffectiveMinResource(label); } } + Map<String, Float> effectiveMinRatioPerResource = getEffectiveMinRatioPerResource( + configuredMinResources, numeratorForMinRatio); + // loop and do this for all child queues for (CSQueue childQueue : getChildQueues()) { Resource minResource = childQueue.getQueueResourceQuotas() @@ -955,7 +959,8 @@ public class ParentQueue extends AbstractCSQueue { if (childQueue.getCapacityConfigType() .equals(CapacityConfigType.ABSOLUTE_RESOURCE)) { childQueue.getQueueResourceQuotas().setEffectiveMinResource(label, - Resources.multiply(minResource, effectiveMinRatio)); + getMinResourceNormalized(childQueue.getQueueName(), effectiveMinRatioPerResource, + minResource)); // Max resource of a queue should be a minimum of {configuredMaxRes, // parentMaxRes}. parentMaxRes could be configured value. But if not @@ -1003,6 +1008,53 @@ public class ParentQueue extends AbstractCSQueue { } } + private Resource getMinResourceNormalized(String name, Map<String, Float> effectiveMinRatio, + Resource minResource) { + Resource ret = Resource.newInstance(minResource); + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + for (int i = 0; i < maxLength; i++) { + ResourceInformation nResourceInformation = minResource + .getResourceInformation(i); + + Float ratio = effectiveMinRatio.get(nResourceInformation.getName()); + if (ratio != null) { + ret.setResourceValue(i, + (long) (nResourceInformation.getValue() * ratio.floatValue())); + if (LOG.isDebugEnabled()) { + LOG.debug("Updating min resource for Queue: " + name + " as " + + ret.getResourceInformation(i) + ", Actual resource: " + + nResourceInformation.getValue() + ", ratio: " + + ratio.floatValue()); + } + } + } + return ret; + } + + private Map<String, Float> getEffectiveMinRatioPerResource( + Resource configuredMinResources, Resource numeratorForMinRatio) { + Map<String, Float> effectiveMinRatioPerResource = new HashMap<>(); + if (numeratorForMinRatio != null) { + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + for (int i = 0; i < maxLength; i++) { + ResourceInformation nResourceInformation = numeratorForMinRatio + .getResourceInformation(i); + ResourceInformation dResourceInformation = configuredMinResources + .getResourceInformation(i); + + long nValue = nResourceInformation.getValue(); + long dValue = UnitsConversionUtil.convert( + dResourceInformation.getUnits(), nResourceInformation.getUnits(), + dResourceInformation.getValue()); + if (dValue != 0) { + effectiveMinRatioPerResource.put(nResourceInformation.getName(), + (float) nValue / dValue); + } + } + } + return effectiveMinRatioPerResource; + } + private void deriveCapacityFromAbsoluteConfigurations(String label, Resource clusterResource, ResourceCalculator rc, CSQueue childQueue) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0d91dfe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 25a9774..fe66aba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -68,6 +68,11 @@ import org.mockito.stubbing.Answer; public class TestParentQueue { + private static final Resource QUEUE_B_RESOURCE = Resource + .newInstance(14 * 1024, 22); + private static final Resource QUEUE_A_RESOURCE = Resource + .newInstance(6 * 1024, 10); + private static final Log LOG = LogFactory.getLog(TestParentQueue.class); RMContext rmContext; @@ -118,6 +123,23 @@ public class TestParentQueue { LOG.info("Setup top-level queues a and b"); } + private void setupSingleLevelQueuesWithAbsoluteResource( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{A, B}); + + final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A; + conf.setMinimumResourceRequirement("", Q_A, + QUEUE_A_RESOURCE); + + final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B; + conf.setMinimumResourceRequirement("", Q_B, + QUEUE_B_RESOURCE); + + LOG.info("Setup top-level queues a and b with absolute resource"); + } + private FiCaSchedulerApp getMockApplication(int appId, String user) { FiCaSchedulerApp application = mock(FiCaSchedulerApp.class); doReturn(user).when(application).getUser(); @@ -931,6 +953,78 @@ public class TestParentQueue { reset(c); } + @Test + public void testAbsoluteResourceWithChangeInClusterResource() + throws Exception { + // Setup queue configs + setupSingleLevelQueuesWithAbsoluteResource(csConf); + + Map<String, CSQueue> queues = new HashMap<String, CSQueue>(); + CSQueue root = CapacitySchedulerQueueManager.parseQueue(csContext, csConf, + null, CapacitySchedulerConfiguration.ROOT, queues, queues, + TestUtils.spyHook); + + // Setup some nodes + final int memoryPerNode = 10; + int coresPerNode = 16; + int numNodes = 2; + + Resource clusterResource = Resources.createResource( + numNodes * (memoryPerNode * GB), numNodes * coresPerNode); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + + // Start testing + LeafQueue a = (LeafQueue) queues.get(A); + LeafQueue b = (LeafQueue) queues.get(B); + + assertEquals(a.getQueueResourceQuotas().getConfiguredMinResource(), + QUEUE_A_RESOURCE); + assertEquals(b.getQueueResourceQuotas().getConfiguredMinResource(), + QUEUE_B_RESOURCE); + assertEquals(a.getQueueResourceQuotas().getEffectiveMinResource(), + QUEUE_A_RESOURCE); + assertEquals(b.getQueueResourceQuotas().getEffectiveMinResource(), + QUEUE_B_RESOURCE); + + numNodes = 1; + clusterResource = Resources.createResource(numNodes * (memoryPerNode * GB), + numNodes * coresPerNode); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + + Resource QUEUE_B_RESOURCE_HALF = Resource.newInstance(7 * 1024, 11); + Resource QUEUE_A_RESOURCE_HALF = Resource.newInstance(3 * 1024, 5); + assertEquals(a.getQueueResourceQuotas().getConfiguredMinResource(), + QUEUE_A_RESOURCE); + assertEquals(b.getQueueResourceQuotas().getConfiguredMinResource(), + QUEUE_B_RESOURCE); + assertEquals(a.getQueueResourceQuotas().getEffectiveMinResource(), + QUEUE_A_RESOURCE_HALF); + assertEquals(b.getQueueResourceQuotas().getEffectiveMinResource(), + QUEUE_B_RESOURCE_HALF); + + coresPerNode = 40; + clusterResource = Resources.createResource(numNodes * (memoryPerNode * GB), + numNodes * coresPerNode); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + + Resource QUEUE_B_RESOURCE_70PERC = Resource.newInstance(7 * 1024, 27); + Resource QUEUE_A_RESOURCE_30PERC = Resource.newInstance(3 * 1024, 12); + assertEquals(a.getQueueResourceQuotas().getConfiguredMinResource(), + QUEUE_A_RESOURCE); + assertEquals(b.getQueueResourceQuotas().getConfiguredMinResource(), + QUEUE_B_RESOURCE); + assertEquals(a.getQueueResourceQuotas().getEffectiveMinResource(), + QUEUE_A_RESOURCE_30PERC); + assertEquals(b.getQueueResourceQuotas().getEffectiveMinResource(), + QUEUE_B_RESOURCE_70PERC); + } + @After public void tearDown() throws Exception { } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org