shuzirra commented on a change in pull request #3470:
URL: https://github.com/apache/hadoop/pull/3470#discussion_r794087433



##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
##########
@@ -25,6 +25,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+

Review comment:
       Usually I don't ask for change just because of an extra line, but in 
this case this is the only reason this file is marked as changed.

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
##########
@@ -818,6 +818,28 @@ public static Resource createResourceWithSameValue(long 
value) {
     return res;
   }
 
+  public static Resource multiply(Resource resource, float multiplier) {

Review comment:
       I'd really suggest to change the float to double here. Floating point 
precision is always tricky, so in cases where precision matters, the bigger the 
better. Float has 24 significant bits, while double has 53. 
(https://en.wikipedia.org/wiki/IEEE_754)
   This means float loses precision after about 8 digits. Which can be easily 
exceeded in the case of large clusters.
   
   See the code below:
           long a = 44444444;
           float f = 1.5f;
           double d = 1.5d;
           System.out.println((long)Math.floor(a * f));
           System.out.println((long)Math.floor(a * d));
   
   Output: 
    66666664
    66666666
   

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
##########
@@ -818,6 +818,28 @@ public static Resource createResourceWithSameValue(long 
value) {
     return res;
   }
 
+  public static Resource multiply(Resource resource, float multiplier) {
+    Resource newResource = Resource.newInstance(0, 0);
+
+    for (ResourceInformation resourceInformation : resource.getResources()) {
+      newResource.setResourceValue(resourceInformation.getName(),
+          (long) Math.floor(resourceInformation.getValue() * multiplier));
+    }
+
+    return newResource;
+  }
+
+  public static Resource multiplyRound(Resource resource, float multiplier) {

Review comment:
       Precision comment above applies here as well.

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RootCalculationDriver.java
##########
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A special case that contains the resource calculation of the root queue.
+ */
+public final class RootCalculationDriver extends ResourceCalculationDriver {
+  private final AbstractQueueCapacityCalculator rootCalculator;
+
+  public RootCalculationDriver(CSQueue rootQueue, QueueCapacityUpdateContext 
updateContext,
+                               AbstractQueueCapacityCalculator rootCalculator,
+                               Collection<String> definedResources) {
+    super(rootQueue, updateContext, Collections.emptyMap(), definedResources);
+    this.rootCalculator = rootCalculator;
+  }
+
+  @Override
+  public void calculateResources() {
+    for (String label : parent.getConfiguredNodeLabels()) {

Review comment:
       Super confusing that in a RootCalculationDriver we are iterating thorugh 
the PARENT's node labels, could you please add a comment, that in the special 
case of root queue, the parent is actually the root queue itself?

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueCapacityHandler.java
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.yarn.api.records.Resource;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
+import static 
org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
+import static 
org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ROOT;
+
+/**
+ * Controls how capacity and resource values are set and calculated for a 
queue.
+ * Effective minimum and maximum resource values are set for each label and 
resource separately.
+ */
+public class CapacitySchedulerQueueCapacityHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CapacitySchedulerQueueCapacityHandler.class);
+
+  private final Map<ResourceUnitCapacityType, AbstractQueueCapacityCalculator>
+      calculators;
+  private final AbstractQueueCapacityCalculator rootCalculator =
+      new RootQueueCapacityCalculator();
+  private final RMNodeLabelsManager labelsManager;
+  private final Collection<String> definedResources = new LinkedHashSet<>();
+
+  public CapacitySchedulerQueueCapacityHandler(RMNodeLabelsManager 
labelsManager) {
+    this.calculators = new HashMap<>();
+    this.labelsManager = labelsManager;
+
+    this.calculators.put(ResourceUnitCapacityType.ABSOLUTE,
+        new AbsoluteResourceCapacityCalculator());
+    this.calculators.put(ResourceUnitCapacityType.PERCENTAGE,
+        new PercentageQueueCapacityCalculator());
+    this.calculators.put(ResourceUnitCapacityType.WEIGHT,
+        new WeightQueueCapacityCalculator());
+
+    loadResourceNames();
+  }
+
+  /**
+   * Updates the resource and metrics values for a queue, its siblings and 
descendants.
+   * These values are calculated at runtime.
+   *

Review comment:
       It might be a big change, but I put it here for discussion: We are 
talking about a tree, in a tree it's ususally not a good idea to make a 
traversal with methods which affect siblings AND descendants, because it can 
easily end up in multiple updates for the same node. Also it's confusing that a 
method affect the siblings AND the descendants in the same time. 
   It is usually a cleaner approach to call the calculation on a node, which 
does the calculation for the node and it's children (and recursively calls 
itself), in this case we wouldn't have to handle the root as a special case, 
because no update would require access to parent, only the the node and it's 
children.

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ResourceCalculationDriver.java
##########
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
+
+/**
+ * Drives the main logic of resource calculation for all children under a 
parent queue. Acts as a
+ * bookkeeper of disposable update information that is used by all children 
under a common parent.
+ */
+public class ResourceCalculationDriver {
+  protected static final Set<ResourceUnitCapacityType> CALCULATOR_PRECEDENCE =
+      ImmutableSet.of(
+          ResourceUnitCapacityType.ABSOLUTE,
+          ResourceUnitCapacityType.PERCENTAGE,
+          ResourceUnitCapacityType.WEIGHT);
+
+  protected final QueueResourceRoundingStrategy roundingStrategy =
+      new DefaultQueueResourceRoundingStrategy(CALCULATOR_PRECEDENCE);
+  protected final CSQueue parent;
+  protected final QueueCapacityUpdateContext updateContext;
+  protected final Map<ResourceUnitCapacityType, 
AbstractQueueCapacityCalculator> calculators;
+  protected final Collection<String> definedResources;
+
+  protected final Map<String, ResourceVector> overallRemainingResource = new 
HashMap<>();
+  protected final Map<String, ResourceVector> batchRemainingResource = new 
HashMap<>();
+  // Used by ABSOLUTE capacity types
+  protected final Map<String, ResourceVector> normalizedResourceRatio = new 
HashMap<>();
+  // Used by WEIGHT capacity typet js
+  protected final Map<String, Map<String, Float>> sumWeightsPerLabel = new 
HashMap<>();
+
+  protected String currentResourceName;
+  protected AbstractQueueCapacityCalculator currentCalculator;
+  protected CSQueue currentChild;
+  protected Map<String, Float> usedResourceByCurrentCalculator = new 
HashMap<>();
+
+  public ResourceCalculationDriver(
+      CSQueue parent, QueueCapacityUpdateContext updateContext,
+      Map<ResourceUnitCapacityType, AbstractQueueCapacityCalculator> 
calculators,
+      Collection<String> definedResources) {
+    this.parent = parent;
+    this.updateContext = updateContext;
+    this.calculators = calculators;
+    this.definedResources = definedResources;
+  }
+
+  /**
+   * Returns the parent that is driving the calculation.
+   *
+   * @return a common parent queue
+   */
+  public CSQueue getParent() {
+    return parent;
+  }
+
+  /**
+   * Returns the context that is used throughout the whole update phase.
+   *
+   * @return update context
+   */
+  public QueueCapacityUpdateContext getUpdateContext() {
+    return updateContext;
+  }
+
+  /**
+   * Returns the name of the resource that is currently processed.
+   *
+   * @return resource name
+   */
+  public String getCurrentResourceName() {
+    return currentResourceName;
+  }
+
+  /**
+   * Returns the child that is currently processed.
+   *
+   * @return child queue
+   */
+  public CSQueue getCurrentChild() {
+    return currentChild;
+  }
+
+  /**
+   * Sets the currently evaluated child to a specific queue.
+   *
+   * @param currentChild a child queue
+   */
+  public void setCurrentChild(CSQueue currentChild) {
+    if (currentChild.getParent() != parent) {
+      throw new IllegalArgumentException("Child queue " + 
currentChild.getQueuePath() + " is not " +
+          "a child of " + parent.getQueuePath());
+    }

Review comment:
       We shouldn't change these independently, the change to child should 
invoke a change to parent, and parent shouldn't be set independently. Since the 
calculation requires an ACTUAL queue, just having a parent is not sufficient, 
so it shouldn't be set independently.

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/DefaultQueueResourceRoundingStrategy.java
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.SortedSet;
+
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType.WEIGHT;
+
+/**
+ * The default rounding strategy for resource calculation. Uses floor for all 
types except WEIGHT,
+ * which is always the last type to consider, therefore it is safe to round up.
+ */
+public class DefaultQueueResourceRoundingStrategy implements 
QueueResourceRoundingStrategy {
+  private final ResourceUnitCapacityType lastCapacityType;
+
+  public DefaultQueueResourceRoundingStrategy(
+      Set<ResourceUnitCapacityType> capacityTypePrecedence) {

Review comment:
       Isn't the capacityTypePrecedence is a static collection? Can it change? 
If it cannot then we don't need to calculate lastCapacityType every time, so we 
can just simply cache it or store in a static field (which is effectively a 
cache :) )

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
##########
@@ -818,6 +818,28 @@ public static Resource createResourceWithSameValue(long 
value) {
     return res;
   }
 
+  public static Resource multiply(Resource resource, float multiplier) {

Review comment:
       We have two multiply methods, one uses round the other uses floor, I 
think it would be better if this was reflected in the method's name as well, 
just to make easier to read even if we only use one of the methods. (If we see 
both next to each other the difference is obvious, but if we only use multiply, 
I wouln't even assume it rounds)

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
##########
@@ -1919,6 +1922,48 @@ private void updateCurrentResourceLimits(
         currentResourceLimits.getLimit()));
   }
 
+  @Override
+  public void refreshAfterResourceCalculation(Resource clusterResource, 
ResourceLimits resourceLimits) {

Review comment:
       This method is almost identical to the updateClusterResource, I think 
they could and should be merged.

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/DefaultQueueResourceRoundingStrategy.java
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.SortedSet;
+
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType.WEIGHT;
+
+/**
+ * The default rounding strategy for resource calculation. Uses floor for all 
types except WEIGHT,
+ * which is always the last type to consider, therefore it is safe to round up.
+ */
+public class DefaultQueueResourceRoundingStrategy implements 
QueueResourceRoundingStrategy {
+  private final ResourceUnitCapacityType lastCapacityType;
+
+  public DefaultQueueResourceRoundingStrategy(
+      Set<ResourceUnitCapacityType> capacityTypePrecedence) {
+    lastCapacityType = capacityTypePrecedence.stream().reduce((c1, c2) -> 
c2).orElseThrow(()
+        -> new IllegalArgumentException("Capacity type precedence collection 
is empty"));
+  }
+
+  @Override
+  public float getRoundedResource(float resourceValue, 
QueueCapacityVectorEntry capacityVectorEntry) {
+    if (capacityVectorEntry.getVectorResourceType().equals(lastCapacityType)) {
+      return Math.round(resourceValue);
+    } else {
+      return (float) Math.floor(resourceValue);

Review comment:
       Why do we need to use floor for non-final capacity types, and round for 
the rest? It's a bit inconsistent, but there might be a reason I just don't see.

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NullRMNodeLabelsManager.java
##########
@@ -98,4 +100,24 @@ protected void serviceInit(Configuration conf) throws 
Exception {
     conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
     super.serviceInit(conf);
   }
+
+  public void setResourceForLabel(String label, Resource resource) {
+    if (label.equals(NO_LABEL)) {
+      noNodeLabel = new FakeLabel(resource);
+      return;

Review comment:
       Also don't we need to store the NO_LABEL's resources as well? Why we 
return here. If we need to store it, we dont need this whole if branch. 

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NullRMNodeLabelsManager.java
##########
@@ -98,4 +100,24 @@ protected void serviceInit(Configuration conf) throws 
Exception {
     conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
     super.serviceInit(conf);
   }
+
+  public void setResourceForLabel(String label, Resource resource) {
+    if (label.equals(NO_LABEL)) {

Review comment:
       I'm giving up, where does the NO_LABEL come from?! It's not defined 
here, nor in the ancestors, there is no static import either. The only place I 
can find it is in org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager, 
whoever it is not imported. Shouldn't we import it? Or what do I miss?

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NullRMNodeLabelsManager.java
##########
@@ -98,4 +100,24 @@ protected void serviceInit(Configuration conf) throws 
Exception {
     conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
     super.serviceInit(conf);
   }
+
+  public void setResourceForLabel(String label, Resource resource) {
+    if (label.equals(NO_LABEL)) {
+      noNodeLabel = new FakeLabel(resource);

Review comment:
       Also cannot find this declaration, nor it being used.

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ResourceVector.java
##########
@@ -79,15 +80,24 @@ public static ResourceVector of(Resource resource) {
   }
 
   /**
-   * Subtract values for each resource defined in the given resource vector.
+   * Decrements values for each resource defined in the given resource vector.
    * @param otherResourceVector rhs resource vector of the subtraction
    */
-  public void subtract(ResourceVector otherResourceVector) {
+  public void decrement(ResourceVector otherResourceVector) {
     for (Map.Entry<String, Float> resource : otherResourceVector) {
       setValue(resource.getKey(), getValue(resource.getKey()) - 
resource.getValue());
     }
   }
 
+  /**
+   * Decrements the given resource by the specified value.
+   * @param resourceName name of the resource
+   * @param value value to be subtracted from the resource's current value
+   */
+  public void decrement(String resourceName, float value) {

Review comment:
       I know it's not the scope of this change, but while we are at it, we 
really should consider bumping the precision to double. For large clusters 
float precision might not be sufficient. 

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueCapacityHandler.java
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.yarn.api.records.Resource;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
+import static 
org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
+import static 
org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ROOT;
+
+/**
+ * Controls how capacity and resource values are set and calculated for a 
queue.
+ * Effective minimum and maximum resource values are set for each label and 
resource separately.
+ */
+public class CapacitySchedulerQueueCapacityHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CapacitySchedulerQueueCapacityHandler.class);
+
+  private final Map<ResourceUnitCapacityType, AbstractQueueCapacityCalculator>
+      calculators;
+  private final AbstractQueueCapacityCalculator rootCalculator =
+      new RootQueueCapacityCalculator();
+  private final RMNodeLabelsManager labelsManager;
+  private final Collection<String> definedResources = new LinkedHashSet<>();
+
+  public CapacitySchedulerQueueCapacityHandler(RMNodeLabelsManager 
labelsManager) {
+    this.calculators = new HashMap<>();
+    this.labelsManager = labelsManager;
+
+    this.calculators.put(ResourceUnitCapacityType.ABSOLUTE,
+        new AbsoluteResourceCapacityCalculator());
+    this.calculators.put(ResourceUnitCapacityType.PERCENTAGE,
+        new PercentageQueueCapacityCalculator());
+    this.calculators.put(ResourceUnitCapacityType.WEIGHT,
+        new WeightQueueCapacityCalculator());
+
+    loadResourceNames();
+  }
+
+  /**
+   * Updates the resource and metrics values for a queue, its siblings and 
descendants.
+   * These values are calculated at runtime.
+   *
+   * @param clusterResource resource of the cluster
+   * @param queue           queue to update
+   * @return update context that contains information about the update phase
+   */
+  public QueueCapacityUpdateContext update(Resource clusterResource, CSQueue 
queue) {
+    ResourceLimits resourceLimits = new ResourceLimits(clusterResource);
+    QueueCapacityUpdateContext updateContext =
+        new QueueCapacityUpdateContext(clusterResource, labelsManager);
+
+    if (queue.getQueuePath().equals(ROOT)) {
+      updateRoot(queue, updateContext, resourceLimits);
+      updateChildren(queue, updateContext, resourceLimits);
+    } else {
+      updateChildren(queue.getParent(), updateContext, resourceLimits);
+    }
+
+    return updateContext;
+  }
+
+  private void updateRoot(
+      CSQueue queue, QueueCapacityUpdateContext updateContext, ResourceLimits 
resourceLimits) {

Review comment:
       For clarity I'd suggest to rename the 'queue' argument to 'root' or 
'rootQueue', because this is only valid if the updateRoot is invoked with root 
queue as an argument.

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RootCalculationDriver.java
##########
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A special case that contains the resource calculation of the root queue.
+ */
+public final class RootCalculationDriver extends ResourceCalculationDriver {
+  private final AbstractQueueCapacityCalculator rootCalculator;
+
+  public RootCalculationDriver(CSQueue rootQueue, QueueCapacityUpdateContext 
updateContext,
+                               AbstractQueueCapacityCalculator rootCalculator,
+                               Collection<String> definedResources) {
+    super(rootQueue, updateContext, Collections.emptyMap(), definedResources);
+    this.rootCalculator = rootCalculator;
+  }
+
+  @Override
+  public void calculateResources() {
+    for (String label : parent.getConfiguredNodeLabels()) {
+      for (QueueCapacityVector.QueueCapacityVectorEntry capacityVectorEntry : 
parent.getConfiguredCapacityVector(label)) {
+        currentResourceName = capacityVectorEntry.getResourceName();
+        
parent.getOrCreateAbsoluteMinCapacityVector(label).setValue(currentResourceName,
 1);
+        
parent.getOrCreateAbsoluteMaxCapacityVector(label).setValue(currentResourceName,
 1);
+
+        float minimumResource = rootCalculator.calculateMinimumResource(this, 
label);
+        float maximumResource = rootCalculator.calculateMaximumResource(this, 
label);
+        long roundedMinResource = (long) Math.floor(minimumResource);
+        long roundedMaxResource = (long) Math.floor(maximumResource);

Review comment:
       If we have a dedicated rounding strategy, shouldn't we use that for ALL 
rounding purposes?

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueCapacityHandler.java
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.yarn.api.records.Resource;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
+import static 
org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
+import static 
org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ROOT;
+
+/**
+ * Controls how capacity and resource values are set and calculated for a 
queue.
+ * Effective minimum and maximum resource values are set for each label and 
resource separately.
+ */
+public class CapacitySchedulerQueueCapacityHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CapacitySchedulerQueueCapacityHandler.class);
+
+  private final Map<ResourceUnitCapacityType, AbstractQueueCapacityCalculator>
+      calculators;
+  private final AbstractQueueCapacityCalculator rootCalculator =
+      new RootQueueCapacityCalculator();
+  private final RMNodeLabelsManager labelsManager;
+  private final Collection<String> definedResources = new LinkedHashSet<>();
+
+  public CapacitySchedulerQueueCapacityHandler(RMNodeLabelsManager 
labelsManager) {
+    this.calculators = new HashMap<>();
+    this.labelsManager = labelsManager;
+
+    this.calculators.put(ResourceUnitCapacityType.ABSOLUTE,
+        new AbsoluteResourceCapacityCalculator());
+    this.calculators.put(ResourceUnitCapacityType.PERCENTAGE,
+        new PercentageQueueCapacityCalculator());
+    this.calculators.put(ResourceUnitCapacityType.WEIGHT,
+        new WeightQueueCapacityCalculator());
+
+    loadResourceNames();
+  }
+
+  /**
+   * Updates the resource and metrics values for a queue, its siblings and 
descendants.
+   * These values are calculated at runtime.
+   *
+   * @param clusterResource resource of the cluster
+   * @param queue           queue to update
+   * @return update context that contains information about the update phase
+   */
+  public QueueCapacityUpdateContext update(Resource clusterResource, CSQueue 
queue) {
+    ResourceLimits resourceLimits = new ResourceLimits(clusterResource);
+    QueueCapacityUpdateContext updateContext =
+        new QueueCapacityUpdateContext(clusterResource, labelsManager);
+
+    if (queue.getQueuePath().equals(ROOT)) {
+      updateRoot(queue, updateContext, resourceLimits);
+      updateChildren(queue, updateContext, resourceLimits);
+    } else {
+      updateChildren(queue.getParent(), updateContext, resourceLimits);
+    }
+
+    return updateContext;
+  }
+
+  private void updateRoot(
+      CSQueue queue, QueueCapacityUpdateContext updateContext, ResourceLimits 
resourceLimits) {
+    RootCalculationDriver rootCalculationDriver = new 
RootCalculationDriver(queue, updateContext,
+        rootCalculator, definedResources);
+    rootCalculationDriver.calculateResources();
+    
queue.refreshAfterResourceCalculation(updateContext.getUpdatedClusterResource(),
 resourceLimits);
+  }
+
+  private void updateChildren(
+      CSQueue parent, QueueCapacityUpdateContext updateContext,
+      ResourceLimits resourceLimits) {
+    if (parent == null || CollectionUtils.isEmpty(parent.getChildQueues())) {
+      return;
+    }
+
+    ResourceCalculationDriver resourceCalculationDriver = new 
ResourceCalculationDriver(
+        parent, updateContext, calculators, definedResources);
+    resourceCalculationDriver.calculateResources();
+
+    updateChildrenAfterCalculation(resourceCalculationDriver, resourceLimits);
+  }
+
+  private void updateChildrenAfterCalculation(
+      ResourceCalculationDriver resourceCalculationDriver, ResourceLimits 
resourceLimits) {
+    for (CSQueue childQueue : 
resourceCalculationDriver.getParent().getChildQueues()) {
+      resourceCalculationDriver.setCurrentChild(childQueue);
+      resourceCalculationDriver.updateChildCapacities();

Review comment:
       This would be much safer as a 
resourceCalculationDriver.updateChildCapacities(childQueue); call

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractQueueCapacityCalculator.java
##########
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueUpdateWarning.QueueUpdateWarningType;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
+
+/**
+ * A strategy class to encapsulate queue capacity setup and resource 
calculation
+ * logic.
+ */
+public abstract class AbstractQueueCapacityCalculator {
+  private static final String MB_UNIT = "Mi";
+
+  /**
+   * Sets the metrics and statistics after effective resource values 
calculation.
+   *
+   * @param resourceCalculationDriver driver that contains the current 
resource unit and child to
+   *                                  process
+   * @param label         node label
+   */
+  public abstract void updateCapacitiesAfterCalculation(
+      ResourceCalculationDriver resourceCalculationDriver, String label);
+
+
+  /**
+   * Returns the capacity type the calculator could handle.
+   *
+   * @return capacity type
+   */
+  public abstract ResourceUnitCapacityType getCapacityType();
+
+  /**
+   * Calculates the minimum effective resource.
+   *
+   * @param resourceCalculationDriver driver that contains the current 
resource unit and child to
+   *                                  process
+   * @param label         node label
+   * @return minimum effective resource
+   */
+  public abstract float calculateMinimumResource(ResourceCalculationDriver 
resourceCalculationDriver,
+                                                 String label);
+
+  /**
+   * Calculates the maximum effective resource.
+   *
+   * @param resourceCalculationDriver driver that contains the current 
resource unit and child to
+   *                                  process
+   * @param label         node label
+   * @return minimum effective resource
+   */
+  public abstract float calculateMaximumResource(ResourceCalculationDriver 
resourceCalculationDriver,
+                                                 String label);
+
+  /**
+   * Executes all logic that must be called prior to the effective resource 
value calculations.
+   *
+   * @param resourceCalculationDriver driver that contains the parent queue on 
which the prerequisite
+   *                                  calculation should be made
+   */
+  public void calculateResourcePrerequisites(ResourceCalculationDriver 
resourceCalculationDriver) {
+    for (String label : 
resourceCalculationDriver.getParent().getConfiguredNodeLabels()) {
+      // We need to set normalized resource ratio only once per parent
+      if (resourceCalculationDriver.getNormalizedResourceRatios().isEmpty()) {
+        setNormalizedResourceRatio(resourceCalculationDriver, label);
+      }
+    }
+  }
+
+  /**
+   * Returns all resource names that are defined for the capacity type that is
+   * handled by the calculator.
+   *
+   * @param queue queue for which the capacity vector is defined
+   * @param label node label
+   * @return resource names
+   */
+  protected Set<String> getResourceNames(CSQueue queue, String label) {
+    return getResourceNames(queue, label, getCapacityType());
+  }
+
+  /**
+   * Returns all resource names that are defined for a capacity type.
+   *
+   * @param queue        queue for which the capacity vector is defined
+   * @param label        node label
+   * @param capacityType capacity type for which the resource names are defined
+   * @return resource names
+   */
+  protected Set<String> getResourceNames(CSQueue queue, String label,
+                                         ResourceUnitCapacityType 
capacityType) {
+    return queue.getConfiguredCapacityVector(label)
+        .getResourceNamesByCapacityType(capacityType);
+  }
+
+  /**
+   * Sets capacity and absolute capacity values based on minimum and maximum 
effective resources.
+   *
+   * @param clusterResource cluster resource for the corresponding label
+   * @param queue child queue for which the capacities are set
+   * @param label node label
+   */
+  public static void setQueueCapacities(
+      Resource clusterResource, CSQueue queue, String label) {
+    if (!(queue instanceof AbstractCSQueue)) {
+      return;
+    }
+
+    AbstractCSQueue csQueue = (AbstractCSQueue) queue;
+    ResourceCalculator resourceCalculator = csQueue.resourceCalculator;
+
+    CSQueue parent = queue.getParent();
+    if (parent == null) {
+      return;
+    }
+    // Update capacity with a float calculated from the parent's minResources
+    // and the recently changed queue minResources.
+    // capacity = effectiveMinResource / {parent's effectiveMinResource}
+    float result = resourceCalculator.divide(clusterResource,
+        queue.getQueueResourceQuotas().getEffectiveMinResource(label),
+        parent.getQueueResourceQuotas().getEffectiveMinResource(label));
+    queue.getQueueCapacities().setCapacity(label,

Review comment:
       I'm not thirlled by the idea of a calculator actually making changes of 
the queues. It is more like a driver's job. Calculator should calulate but 
don't write the result back to the queues.

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueCapacityHandler.java
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.yarn.api.records.Resource;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
+import static 
org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
+import static 
org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ROOT;
+
+/**
+ * Controls how capacity and resource values are set and calculated for a 
queue.
+ * Effective minimum and maximum resource values are set for each label and 
resource separately.
+ */
+public class CapacitySchedulerQueueCapacityHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CapacitySchedulerQueueCapacityHandler.class);
+
+  private final Map<ResourceUnitCapacityType, AbstractQueueCapacityCalculator>
+      calculators;
+  private final AbstractQueueCapacityCalculator rootCalculator =
+      new RootQueueCapacityCalculator();
+  private final RMNodeLabelsManager labelsManager;
+  private final Collection<String> definedResources = new LinkedHashSet<>();
+
+  public CapacitySchedulerQueueCapacityHandler(RMNodeLabelsManager 
labelsManager) {
+    this.calculators = new HashMap<>();
+    this.labelsManager = labelsManager;
+
+    this.calculators.put(ResourceUnitCapacityType.ABSOLUTE,
+        new AbsoluteResourceCapacityCalculator());
+    this.calculators.put(ResourceUnitCapacityType.PERCENTAGE,
+        new PercentageQueueCapacityCalculator());
+    this.calculators.put(ResourceUnitCapacityType.WEIGHT,
+        new WeightQueueCapacityCalculator());
+
+    loadResourceNames();
+  }
+
+  /**
+   * Updates the resource and metrics values for a queue, its siblings and 
descendants.
+   * These values are calculated at runtime.
+   *
+   * @param clusterResource resource of the cluster
+   * @param queue           queue to update
+   * @return update context that contains information about the update phase
+   */
+  public QueueCapacityUpdateContext update(Resource clusterResource, CSQueue 
queue) {
+    ResourceLimits resourceLimits = new ResourceLimits(clusterResource);
+    QueueCapacityUpdateContext updateContext =
+        new QueueCapacityUpdateContext(clusterResource, labelsManager);
+
+    if (queue.getQueuePath().equals(ROOT)) {
+      updateRoot(queue, updateContext, resourceLimits);
+      updateChildren(queue, updateContext, resourceLimits);
+    } else {
+      updateChildren(queue.getParent(), updateContext, resourceLimits);

Review comment:
       Do we even call this branch anywhere?

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueCapacityHandler.java
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.yarn.api.records.Resource;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
+import static 
org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
+import static 
org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ROOT;
+
+/**
+ * Controls how capacity and resource values are set and calculated for a 
queue.
+ * Effective minimum and maximum resource values are set for each label and 
resource separately.
+ */
+public class CapacitySchedulerQueueCapacityHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CapacitySchedulerQueueCapacityHandler.class);
+
+  private final Map<ResourceUnitCapacityType, AbstractQueueCapacityCalculator>
+      calculators;
+  private final AbstractQueueCapacityCalculator rootCalculator =
+      new RootQueueCapacityCalculator();
+  private final RMNodeLabelsManager labelsManager;
+  private final Collection<String> definedResources = new LinkedHashSet<>();
+
+  public CapacitySchedulerQueueCapacityHandler(RMNodeLabelsManager 
labelsManager) {
+    this.calculators = new HashMap<>();
+    this.labelsManager = labelsManager;
+
+    this.calculators.put(ResourceUnitCapacityType.ABSOLUTE,
+        new AbsoluteResourceCapacityCalculator());
+    this.calculators.put(ResourceUnitCapacityType.PERCENTAGE,
+        new PercentageQueueCapacityCalculator());
+    this.calculators.put(ResourceUnitCapacityType.WEIGHT,
+        new WeightQueueCapacityCalculator());
+
+    loadResourceNames();
+  }
+
+  /**
+   * Updates the resource and metrics values for a queue, its siblings and 
descendants.
+   * These values are calculated at runtime.
+   *
+   * @param clusterResource resource of the cluster
+   * @param queue           queue to update
+   * @return update context that contains information about the update phase
+   */
+  public QueueCapacityUpdateContext update(Resource clusterResource, CSQueue 
queue) {
+    ResourceLimits resourceLimits = new ResourceLimits(clusterResource);
+    QueueCapacityUpdateContext updateContext =
+        new QueueCapacityUpdateContext(clusterResource, labelsManager);
+
+    if (queue.getQueuePath().equals(ROOT)) {
+      updateRoot(queue, updateContext, resourceLimits);
+      updateChildren(queue, updateContext, resourceLimits);
+    } else {
+      updateChildren(queue.getParent(), updateContext, resourceLimits);
+    }
+
+    return updateContext;
+  }
+
+  private void updateRoot(
+      CSQueue queue, QueueCapacityUpdateContext updateContext, ResourceLimits 
resourceLimits) {
+    RootCalculationDriver rootCalculationDriver = new 
RootCalculationDriver(queue, updateContext,
+        rootCalculator, definedResources);
+    rootCalculationDriver.calculateResources();
+    
queue.refreshAfterResourceCalculation(updateContext.getUpdatedClusterResource(),
 resourceLimits);
+  }
+
+  private void updateChildren(
+      CSQueue parent, QueueCapacityUpdateContext updateContext,
+      ResourceLimits resourceLimits) {
+    if (parent == null || CollectionUtils.isEmpty(parent.getChildQueues())) {
+      return;
+    }
+
+    ResourceCalculationDriver resourceCalculationDriver = new 
ResourceCalculationDriver(
+        parent, updateContext, calculators, definedResources);
+    resourceCalculationDriver.calculateResources();
+
+    updateChildrenAfterCalculation(resourceCalculationDriver, resourceLimits);
+  }
+
+  private void updateChildrenAfterCalculation(
+      ResourceCalculationDriver resourceCalculationDriver, ResourceLimits 
resourceLimits) {
+    for (CSQueue childQueue : 
resourceCalculationDriver.getParent().getChildQueues()) {
+      resourceCalculationDriver.setCurrentChild(childQueue);

Review comment:
       I don't really like the idea of setting the current child in a driver. 
This way the state of the recursion is tracked at two palces. Here, in this 
class, and in the driver as well, it's easier to get things messed up, I feel 
like this childQueue should be an argument for the methods, and driver 
shouldn't store this as an internal state.

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ResourceCalculationDriver.java
##########
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
+
+/**
+ * Drives the main logic of resource calculation for all children under a 
parent queue. Acts as a
+ * bookkeeper of disposable update information that is used by all children 
under a common parent.
+ */
+public class ResourceCalculationDriver {
+  protected static final Set<ResourceUnitCapacityType> CALCULATOR_PRECEDENCE =
+      ImmutableSet.of(
+          ResourceUnitCapacityType.ABSOLUTE,
+          ResourceUnitCapacityType.PERCENTAGE,
+          ResourceUnitCapacityType.WEIGHT);
+
+  protected final QueueResourceRoundingStrategy roundingStrategy =
+      new DefaultQueueResourceRoundingStrategy(CALCULATOR_PRECEDENCE);
+  protected final CSQueue parent;
+  protected final QueueCapacityUpdateContext updateContext;
+  protected final Map<ResourceUnitCapacityType, 
AbstractQueueCapacityCalculator> calculators;
+  protected final Collection<String> definedResources;
+
+  protected final Map<String, ResourceVector> overallRemainingResource = new 
HashMap<>();
+  protected final Map<String, ResourceVector> batchRemainingResource = new 
HashMap<>();
+  // Used by ABSOLUTE capacity types
+  protected final Map<String, ResourceVector> normalizedResourceRatio = new 
HashMap<>();
+  // Used by WEIGHT capacity typet js
+  protected final Map<String, Map<String, Float>> sumWeightsPerLabel = new 
HashMap<>();
+
+  protected String currentResourceName;
+  protected AbstractQueueCapacityCalculator currentCalculator;
+  protected CSQueue currentChild;
+  protected Map<String, Float> usedResourceByCurrentCalculator = new 
HashMap<>();
+
+  public ResourceCalculationDriver(
+      CSQueue parent, QueueCapacityUpdateContext updateContext,
+      Map<ResourceUnitCapacityType, AbstractQueueCapacityCalculator> 
calculators,
+      Collection<String> definedResources) {
+    this.parent = parent;
+    this.updateContext = updateContext;
+    this.calculators = calculators;
+    this.definedResources = definedResources;
+  }
+
+  /**
+   * Returns the parent that is driving the calculation.
+   *
+   * @return a common parent queue
+   */
+  public CSQueue getParent() {
+    return parent;
+  }
+
+  /**
+   * Returns the context that is used throughout the whole update phase.
+   *
+   * @return update context
+   */
+  public QueueCapacityUpdateContext getUpdateContext() {
+    return updateContext;
+  }
+
+  /**
+   * Returns the name of the resource that is currently processed.
+   *
+   * @return resource name
+   */
+  public String getCurrentResourceName() {
+    return currentResourceName;
+  }
+
+  /**
+   * Returns the child that is currently processed.
+   *
+   * @return child queue
+   */
+  public CSQueue getCurrentChild() {
+    return currentChild;
+  }
+
+  /**
+   * Sets the currently evaluated child to a specific queue.
+   *
+   * @param currentChild a child queue
+   */
+  public void setCurrentChild(CSQueue currentChild) {
+    if (currentChild.getParent() != parent) {
+      throw new IllegalArgumentException("Child queue " + 
currentChild.getQueuePath() + " is not " +
+          "a child of " + parent.getQueuePath());
+    }
+
+    this.currentChild = currentChild;
+  }
+
+  /**
+   * A shorthand to return the minimum capacity vector entry for the currently 
evaluated child and
+   * resource name.
+   *
+   * @param label node label
+   * @return capacity vector entry
+   */
+  public QueueCapacityVectorEntry getCurrentMinimumCapacityEntry(String label) 
{
+    return 
currentChild.getConfiguredCapacityVector(label).getResource(currentResourceName);
+  }
+
+  /**
+   * A shorthand to return the maximum capacity vector entry for the currently 
evaluated child and
+   * resource name.
+   *
+   * @param label node label
+   * @return capacity vector entry
+   */
+  public QueueCapacityVectorEntry getCurrentMaximumCapacityEntry(String label) 
{
+    return 
currentChild.getConfiguredMaxCapacityVector(label).getResource(currentResourceName);
+  }
+
+  /**
+   * Increments the aggregated weight.
+   *
+   * @param label        node label
+   * @param resourceName resource unit name
+   * @param value        weight value
+   */
+  public void incrementWeight(String label, String resourceName, float value) {
+    sumWeightsPerLabel.putIfAbsent(label, new HashMap<>());
+    sumWeightsPerLabel.get(label).put(resourceName,
+        sumWeightsPerLabel.get(label).getOrDefault(resourceName, 0f) + value);

Review comment:
       If this is called frequently we might consider using a mutable float 
here to reduce GC strain.

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ResourceCalculationDriver.java
##########
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
+
+/**
+ * Drives the main logic of resource calculation for all children under a 
parent queue. Acts as a
+ * bookkeeper of disposable update information that is used by all children 
under a common parent.
+ */
+public class ResourceCalculationDriver {
+  protected static final Set<ResourceUnitCapacityType> CALCULATOR_PRECEDENCE =
+      ImmutableSet.of(
+          ResourceUnitCapacityType.ABSOLUTE,
+          ResourceUnitCapacityType.PERCENTAGE,
+          ResourceUnitCapacityType.WEIGHT);
+
+  protected final QueueResourceRoundingStrategy roundingStrategy =
+      new DefaultQueueResourceRoundingStrategy(CALCULATOR_PRECEDENCE);
+  protected final CSQueue parent;
+  protected final QueueCapacityUpdateContext updateContext;
+  protected final Map<ResourceUnitCapacityType, 
AbstractQueueCapacityCalculator> calculators;
+  protected final Collection<String> definedResources;
+
+  protected final Map<String, ResourceVector> overallRemainingResource = new 
HashMap<>();
+  protected final Map<String, ResourceVector> batchRemainingResource = new 
HashMap<>();
+  // Used by ABSOLUTE capacity types
+  protected final Map<String, ResourceVector> normalizedResourceRatio = new 
HashMap<>();
+  // Used by WEIGHT capacity typet js
+  protected final Map<String, Map<String, Float>> sumWeightsPerLabel = new 
HashMap<>();
+
+  protected String currentResourceName;
+  protected AbstractQueueCapacityCalculator currentCalculator;
+  protected CSQueue currentChild;

Review comment:
       I don't think these should be fields and internal states, it is much 
easier to get out of sync with the callers in the case of complex recursion. I 
think these should be arguments for the apropriate methods, if you are 
concerened about the number of arguments these can be wrapped into an object. 
But this way you split the traversal responsibility between the components. The 
component which makes the actual traversal and recursion has a state about 
where we are in the tree (resource name, child etc), and it is set onto this 
driver, which will behave differently based on these settings, so this object 
also tracks it. The problem if any of these fields is not cleared after a step 
in the recurison, the actual traversal and the state of this oject will be out 
of sync, and it will be a nightmare to debug it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to