YARN-7332. Compute effectiveCapacity per each resource vector. (Sunil G via 
wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c0b03ffb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c0b03ffb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c0b03ffb

Branch: refs/heads/YARN-5881
Commit: c0b03ffb9803b6d8378cadcba456d6e0fef3a884
Parents: c4ba54b
Author: Wangda Tan <wan...@apache.org>
Authored: Fri Oct 27 10:16:33 2017 -0700
Committer: Sunil G <sun...@apache.org>
Committed: Fri Nov 17 19:59:32 2017 +0530

----------------------------------------------------------------------
 .../scheduler/capacity/ParentQueue.java         | 66 ++++++++++++--
 .../scheduler/capacity/TestParentQueue.java     | 94 ++++++++++++++++++++
 2 files changed, 153 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0b03ffb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 5ab1494..940637e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AccessType;
@@ -68,7 +69,9 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 @Private
@@ -928,24 +931,25 @@ public class ParentQueue extends AbstractCSQueue {
     // Factor to scale down effective resource: When cluster has sufficient
     // resources, effective_min_resources will be same as configured
     // min_resources.
-    float effectiveMinRatio = 1;
+    Resource numeratorForMinRatio = null;
     ResourceCalculator rc = this.csContext.getResourceCalculator();
     if (getQueueName().equals("root")) {
       if (!resourceByLabel.equals(Resources.none()) && Resources.lessThan(rc,
           clusterResource, resourceByLabel, configuredMinResources)) {
-        effectiveMinRatio = Resources.divide(rc, clusterResource,
-            resourceByLabel, configuredMinResources);
+        numeratorForMinRatio = resourceByLabel;
       }
     } else {
       if (Resources.lessThan(rc, clusterResource,
           queueResourceQuotas.getEffectiveMinResource(label),
           configuredMinResources)) {
-        effectiveMinRatio = Resources.divide(rc, clusterResource,
-            queueResourceQuotas.getEffectiveMinResource(label),
-            configuredMinResources);
+        numeratorForMinRatio = queueResourceQuotas
+            .getEffectiveMinResource(label);
       }
     }
 
+    Map<String, Float> effectiveMinRatioPerResource = 
getEffectiveMinRatioPerResource(
+        configuredMinResources, numeratorForMinRatio);
+
     // loop and do this for all child queues
     for (CSQueue childQueue : getChildQueues()) {
       Resource minResource = childQueue.getQueueResourceQuotas()
@@ -955,7 +959,8 @@ public class ParentQueue extends AbstractCSQueue {
       if (childQueue.getCapacityConfigType()
           .equals(CapacityConfigType.ABSOLUTE_RESOURCE)) {
         childQueue.getQueueResourceQuotas().setEffectiveMinResource(label,
-            Resources.multiply(minResource, effectiveMinRatio));
+            getMinResourceNormalized(childQueue.getQueueName(), 
effectiveMinRatioPerResource,
+                minResource));
 
         // Max resource of a queue should be a minimum of {configuredMaxRes,
         // parentMaxRes}. parentMaxRes could be configured value. But if not
@@ -1003,6 +1008,53 @@ public class ParentQueue extends AbstractCSQueue {
     }
   }
 
+  private Resource getMinResourceNormalized(String name, Map<String, Float> 
effectiveMinRatio,
+      Resource minResource) {
+    Resource ret = Resource.newInstance(minResource);
+    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    for (int i = 0; i < maxLength; i++) {
+      ResourceInformation nResourceInformation = minResource
+          .getResourceInformation(i);
+
+      Float ratio = effectiveMinRatio.get(nResourceInformation.getName());
+      if (ratio != null) {
+        ret.setResourceValue(i,
+            (long) (nResourceInformation.getValue() * ratio.floatValue()));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Updating min resource for Queue: " + name + " as "
+              + ret.getResourceInformation(i) + ", Actual resource: "
+              + nResourceInformation.getValue() + ", ratio: "
+              + ratio.floatValue());
+        }
+      }
+    }
+    return ret;
+  }
+
+  private Map<String, Float> getEffectiveMinRatioPerResource(
+      Resource configuredMinResources, Resource numeratorForMinRatio) {
+    Map<String, Float> effectiveMinRatioPerResource = new HashMap<>();
+    if (numeratorForMinRatio != null) {
+      int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+      for (int i = 0; i < maxLength; i++) {
+        ResourceInformation nResourceInformation = numeratorForMinRatio
+            .getResourceInformation(i);
+        ResourceInformation dResourceInformation = configuredMinResources
+            .getResourceInformation(i);
+
+        long nValue = nResourceInformation.getValue();
+        long dValue = UnitsConversionUtil.convert(
+            dResourceInformation.getUnits(), nResourceInformation.getUnits(),
+            dResourceInformation.getValue());
+        if (dValue != 0) {
+          effectiveMinRatioPerResource.put(nResourceInformation.getName(),
+              (float) nValue / dValue);
+        }
+      }
+    }
+    return effectiveMinRatioPerResource;
+  }
+
   private void deriveCapacityFromAbsoluteConfigurations(String label,
       Resource clusterResource, ResourceCalculator rc, CSQueue childQueue) {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0b03ffb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index 25a9774..fe66aba 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -68,6 +68,11 @@ import org.mockito.stubbing.Answer;
 
 public class TestParentQueue {
 
+  private static final Resource QUEUE_B_RESOURCE = Resource
+      .newInstance(14 * 1024, 22);
+  private static final Resource QUEUE_A_RESOURCE = Resource
+      .newInstance(6 * 1024, 10);
+
   private static final Log LOG = LogFactory.getLog(TestParentQueue.class);
   
   RMContext rmContext;
@@ -118,6 +123,23 @@ public class TestParentQueue {
     LOG.info("Setup top-level queues a and b");
   }
 
+  private void setupSingleLevelQueuesWithAbsoluteResource(
+      CapacitySchedulerConfiguration conf) {
+
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{A, B});
+
+    final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
+    conf.setMinimumResourceRequirement("", Q_A,
+        QUEUE_A_RESOURCE);
+
+    final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
+    conf.setMinimumResourceRequirement("", Q_B,
+        QUEUE_B_RESOURCE);
+
+    LOG.info("Setup top-level queues a and b with absolute resource");
+  }
+
   private FiCaSchedulerApp getMockApplication(int appId, String user) {
     FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
     doReturn(user).when(application).getUser();
@@ -931,6 +953,78 @@ public class TestParentQueue {
     reset(c);
   }
 
+  @Test
+  public void testAbsoluteResourceWithChangeInClusterResource()
+      throws Exception {
+    // Setup queue configs
+    setupSingleLevelQueuesWithAbsoluteResource(csConf);
+
+    Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
+    CSQueue root = CapacitySchedulerQueueManager.parseQueue(csContext, csConf,
+        null, CapacitySchedulerConfiguration.ROOT, queues, queues,
+        TestUtils.spyHook);
+
+    // Setup some nodes
+    final int memoryPerNode = 10;
+    int coresPerNode = 16;
+    int numNodes = 2;
+
+    Resource clusterResource = Resources.createResource(
+        numNodes * (memoryPerNode * GB), numNodes * coresPerNode);
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+    root.updateClusterResource(clusterResource,
+        new ResourceLimits(clusterResource));
+
+    // Start testing
+    LeafQueue a = (LeafQueue) queues.get(A);
+    LeafQueue b = (LeafQueue) queues.get(B);
+
+    assertEquals(a.getQueueResourceQuotas().getConfiguredMinResource(),
+        QUEUE_A_RESOURCE);
+    assertEquals(b.getQueueResourceQuotas().getConfiguredMinResource(),
+        QUEUE_B_RESOURCE);
+    assertEquals(a.getQueueResourceQuotas().getEffectiveMinResource(),
+        QUEUE_A_RESOURCE);
+    assertEquals(b.getQueueResourceQuotas().getEffectiveMinResource(),
+        QUEUE_B_RESOURCE);
+
+    numNodes = 1;
+    clusterResource = Resources.createResource(numNodes * (memoryPerNode * GB),
+        numNodes * coresPerNode);
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+    root.updateClusterResource(clusterResource,
+        new ResourceLimits(clusterResource));
+
+    Resource QUEUE_B_RESOURCE_HALF = Resource.newInstance(7 * 1024, 11);
+    Resource QUEUE_A_RESOURCE_HALF = Resource.newInstance(3 * 1024, 5);
+    assertEquals(a.getQueueResourceQuotas().getConfiguredMinResource(),
+        QUEUE_A_RESOURCE);
+    assertEquals(b.getQueueResourceQuotas().getConfiguredMinResource(),
+        QUEUE_B_RESOURCE);
+    assertEquals(a.getQueueResourceQuotas().getEffectiveMinResource(),
+        QUEUE_A_RESOURCE_HALF);
+    assertEquals(b.getQueueResourceQuotas().getEffectiveMinResource(),
+        QUEUE_B_RESOURCE_HALF);
+
+    coresPerNode = 40;
+    clusterResource = Resources.createResource(numNodes * (memoryPerNode * GB),
+        numNodes * coresPerNode);
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+    root.updateClusterResource(clusterResource,
+        new ResourceLimits(clusterResource));
+
+    Resource QUEUE_B_RESOURCE_70PERC = Resource.newInstance(7 * 1024, 27);
+    Resource QUEUE_A_RESOURCE_30PERC = Resource.newInstance(3 * 1024, 12);
+    assertEquals(a.getQueueResourceQuotas().getConfiguredMinResource(),
+        QUEUE_A_RESOURCE);
+    assertEquals(b.getQueueResourceQuotas().getConfiguredMinResource(),
+        QUEUE_B_RESOURCE);
+    assertEquals(a.getQueueResourceQuotas().getEffectiveMinResource(),
+        QUEUE_A_RESOURCE_30PERC);
+    assertEquals(b.getQueueResourceQuotas().getEffectiveMinResource(),
+        QUEUE_B_RESOURCE_70PERC);
+  }
+
   @After
   public void tearDown() throws Exception {
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to