susheel-gupta commented on code in PR #5380:
URL: https://github.com/apache/hadoop/pull/5380#discussion_r1146496279


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java:
##########
@@ -0,0 +1,1682 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.util.Sets;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.AccessType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.getACLsForFlexibleAutoCreatedParentQueue;
+
+public abstract class AbstractParentQueue extends AbstractCSQueue {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AbstractParentQueue.class);
+
+  protected final List<CSQueue> childQueues;
+  private final boolean rootQueue;
+  private volatile int numApplications;
+
+  private final RecordFactory recordFactory =
+    RecordFactoryProvider.getRecordFactory(null);
+
+  private QueueOrderingPolicy queueOrderingPolicy;
+
+  private long lastSkipQueueDebugLoggingTimestamp = -1;
+
+  private int runnableApps;
+
+  private final boolean allowZeroCapacitySum;
+
+  private AutoCreatedQueueTemplate autoCreatedQueueTemplate;
+
+  // A ratio of the queue's effective minimum resource and the summary of the 
configured
+  // minimum resource of its children grouped by labels and calculated for 
each resource names
+  // distinctively.
+  private final Map<String, Map<String, Float>> effectiveMinResourceRatio =
+      new ConcurrentHashMap<>();
+
+
+  public AbstractParentQueue(CapacitySchedulerQueueContext queueContext,
+      String queueName, CSQueue parent, CSQueue old)
+    throws IOException {
+    this(queueContext, queueName, parent, old, false);
+  }
+
+  public AbstractParentQueue(CapacitySchedulerQueueContext queueContext,
+    String queueName, CSQueue parent, CSQueue old, boolean isDynamic) throws
+      IOException {
+
+      super(queueContext, queueName, parent, old);
+      setDynamicQueue(isDynamic);
+      this.rootQueue = (parent == null);
+
+      float rawCapacity = queueContext.getConfiguration()
+          .getNonLabeledQueueCapacity(this.queuePath);
+
+      if (rootQueue &&
+          (rawCapacity != 
CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) {
+        throw new IllegalArgumentException("Illegal " +
+            "capacity of " + rawCapacity + " for queue " + queueName +
+            ". Must be " + 
CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE);
+      }
+
+      this.childQueues = new ArrayList<>();
+      this.allowZeroCapacitySum =
+          queueContext.getConfiguration()
+              .getAllowZeroCapacitySum(getQueuePath());
+
+  }
+
+  // returns what is configured queue ordering policy
+  private String getQueueOrderingPolicyConfigName() {
+    return queueOrderingPolicy == null ?
+        null :
+        queueOrderingPolicy.getConfigName();
+  }
+
+  protected void setupQueueConfigs(Resource clusterResource)
+      throws IOException {
+    writeLock.lock();
+    try {
+      CapacitySchedulerConfiguration configuration = 
queueContext.getConfiguration();
+      autoCreatedQueueTemplate = new AutoCreatedQueueTemplate(
+          configuration, this.queuePath);
+      super.setupQueueConfigs(clusterResource);
+      StringBuilder aclsString = new StringBuilder();
+      for (Map.Entry<AccessType, AccessControlList> e : getACLs().entrySet()) {
+        aclsString.append(e.getKey()).append(":")
+            .append(e.getValue().getAclString());
+      }
+
+      StringBuilder labelStrBuilder = new StringBuilder();
+      if (getAccessibleNodeLabels() != null) {
+        for (String nodeLabel : getAccessibleNodeLabels()) {
+          labelStrBuilder.append(nodeLabel).append(",");
+        }
+      }
+
+      // Initialize queue ordering policy
+      queueOrderingPolicy = configuration.getQueueOrderingPolicy(
+          getQueuePath(), parent == null ?
+              null :
+              ((AbstractParentQueue) 
parent).getQueueOrderingPolicyConfigName());
+      queueOrderingPolicy.setQueues(childQueues);
+
+      LOG.info(getQueueName() + ", " + getCapacityOrWeightString()
+          + ", absoluteCapacity=" + getAbsoluteCapacity()
+          + ", maxCapacity=" + getMaximumCapacity()
+          + ", absoluteMaxCapacity=" + getAbsoluteMaximumCapacity()
+          + ", state=" + getState() + ", acls="
+          + aclsString + ", labels=" + labelStrBuilder + "\n"
+          + ", reservationsContinueLooking=" + isReservationsContinueLooking()
+          + ", orderingPolicy=" + getQueueOrderingPolicyConfigName()
+          + ", priority=" + getPriority()
+          + ", allowZeroCapacitySum=" + allowZeroCapacitySum);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  protected void setDynamicQueueACLProperties() {
+    super.setDynamicQueueACLProperties();
+
+    if (parent instanceof AbstractParentQueue) {
+      acls.putAll(getACLsForFlexibleAutoCreatedParentQueue(
+          ((AbstractParentQueue) parent).getAutoCreatedQueueTemplate()));
+    }
+  }
+
+  private static float PRECISION = 0.0005f; // 0.05% precision
+
+    // Check weight configuration, throw exception when configuration is 
invalid
+  // return true when all children use weight mode.
+  private QueueCapacityType getCapacityConfigurationTypeForQueues(
+      Collection<CSQueue> queues) throws IOException {
+    // Do we have ANY queue set capacity in any labels?
+    boolean percentageIsSet = false;
+
+    // Do we have ANY queue set weight in any labels?
+    boolean weightIsSet = false;
+
+    // Do we have ANY queue set absolute in any labels?
+    boolean absoluteMinResSet = false;
+
+    StringBuilder diagMsg = new StringBuilder();
+
+    for (CSQueue queue : queues) {
+      for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
+        float capacityByLabel = 
queue.getQueueCapacities().getCapacity(nodeLabel);
+        if (capacityByLabel > 0) {
+          percentageIsSet = true;
+        }
+        float weightByLabel = queue.getQueueCapacities().getWeight(nodeLabel);
+        // By default weight is set to -1, so >= 0 is enough.
+        if (weightByLabel >= 0) {
+          weightIsSet = true;
+          diagMsg.append(
+              "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel
+                  + " uses weight mode}. ");
+        }
+        if (!queue.getQueueResourceQuotas().getConfiguredMinResource(nodeLabel)
+            .equals(Resources.none())) {
+          absoluteMinResSet = true;
+          // There's a special handling: when absolute resource is configured,
+          // capacity will be calculated (and set) for UI/metrics purposes, so
+          // when asboluteMinResource is set, unset percentage
+          percentageIsSet = false;
+          diagMsg.append(
+              "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel
+                  + " uses absolute mode}. ");
+        }
+        if (percentageIsSet) {
+          diagMsg.append(
+              "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel
+                  + " uses percentage mode}. ");
+        }
+      }
+    }
+    // If we have mixed capacity, weight or absolute resource (any of the two)
+    // We will throw exception
+    // Root queue is an exception here, because by default root queue returns
+    // 100 as capacity no matter what. We should look into this case in the
+    // future. To avoid impact too many code paths, we don;t check root queue's
+    // config.
+    if (queues.iterator().hasNext() &&
+        !queues.iterator().next().getQueuePath().equals(
+        CapacitySchedulerConfiguration.ROOT) &&
+        (percentageIsSet ? 1 : 0) + (weightIsSet ? 1 : 0) + (absoluteMinResSet 
?
+            1 :
+            0) > 1) {
+      throw new IOException("Parent queue '" + getQueuePath()
+          + "' have children queue used mixed of "
+          + " weight mode, percentage and absolute mode, it is not allowed, 
please "
+          + "double check, details:" + diagMsg.toString());
+    }
+
+    if (weightIsSet || queues.isEmpty()) {
+      return QueueCapacityType.WEIGHT;
+    } else if (absoluteMinResSet) {
+      return QueueCapacityType.ABSOLUTE_RESOURCE;
+    } else {
+      return QueueCapacityType.PERCENT;
+    }
+  }
+
+  private enum QueueCapacityType {
+    WEIGHT, ABSOLUTE_RESOURCE, PERCENT;
+  }
+
+  /**
+   * Set child queue and verify capacities
+   * 
+--------------+---------------------------+-------------------------------------+------------------------+
+   * |              | parent-weight             | parent-pct                   
       | parent-abs             |
+   * 
+--------------+---------------------------+-------------------------------------+------------------------+
+   * | child-weight | No specific check         | No specific check            
       | X                      |
+   * 
+--------------+---------------------------+-------------------------------------+------------------------+
+   * | child-pct    | Sum(children.capacity) =  | When:                        
       | X                      |
+   * |              |   0 OR 100                |   parent.capacity>0          
       |                        |
+   * |              |                           |     
sum(children.capacity)=100 OR 0 |                        |
+   * |              |                           |   parent.capacity=0          
       |                        |
+   * |              |                           |     sum(children.capacity)=0 
       |                        |
+   * 
+--------------+---------------------------+-------------------------------------+------------------------+
+   * | child-abs    | X                         | X                            
       | Sum(children.minRes)<= |
+   * |              |                           |                              
       | parent.minRes          |
+   * 
+--------------+---------------------------+-------------------------------------+------------------------+
+   * @param childQueues
+   */
+  void setChildQueues(Collection<CSQueue> childQueues) throws IOException {
+    writeLock.lock();
+    try {
+      boolean isLegacyQueueMode = 
queueContext.getConfiguration().isLegacyQueueMode();
+      if (isLegacyQueueMode) {
+        QueueCapacityType childrenCapacityType =
+            getCapacityConfigurationTypeForQueues(childQueues);
+        QueueCapacityType parentCapacityType =
+            getCapacityConfigurationTypeForQueues(ImmutableList.of(this));
+
+        if (childrenCapacityType == QueueCapacityType.ABSOLUTE_RESOURCE
+            || parentCapacityType == QueueCapacityType.ABSOLUTE_RESOURCE) {
+          // We don't allow any mixed absolute + {weight, percentage} between
+          // children and parent
+          if (childrenCapacityType != parentCapacityType && 
!this.getQueuePath()
+              .equals(CapacitySchedulerConfiguration.ROOT)) {
+            throw new IOException("Parent=" + this.getQueuePath()
+                + ": When absolute minResource is used, we must make sure both 
"
+                + "parent and child all use absolute minResource");
+          }
+
+          // Ensure that for each parent queue: parent.min-resource >=
+          // Σ(child.min-resource).
+          for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
+            Resource minRes = Resources.createResource(0, 0);
+            for (CSQueue queue : childQueues) {
+              // Accumulate all min/max resource configured for all child 
queues.
+              Resources.addTo(minRes, queue.getQueueResourceQuotas()
+                  .getConfiguredMinResource(nodeLabel));
+            }
+            Resource resourceByLabel = 
labelManager.getResourceByLabel(nodeLabel,
+                queueContext.getClusterResource());
+            Resource parentMinResource =
+                
usageTracker.getQueueResourceQuotas().getConfiguredMinResource(nodeLabel);
+            if (!parentMinResource.equals(Resources.none()) && 
Resources.lessThan(
+                resourceCalculator, resourceByLabel, parentMinResource, 
minRes)) {
+              throw new IOException(
+                  "Parent Queues" + " capacity: " + parentMinResource
+                      + " is less than" + " to its children:" + minRes
+                      + " for queue:" + getQueueName());
+            }
+          }
+        }
+
+        // When child uses percent
+        if (childrenCapacityType == QueueCapacityType.PERCENT) {
+          float childrenPctSum = 0;
+          // check label capacities
+          for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
+            // check children's labels
+            childrenPctSum = 0;
+            for (CSQueue queue : childQueues) {
+              childrenPctSum += 
queue.getQueueCapacities().getCapacity(nodeLabel);
+            }
+
+            if (Math.abs(1 - childrenPctSum) > PRECISION) {
+              // When children's percent sum != 100%
+              if (Math.abs(childrenPctSum) > PRECISION) {
+                // It is wrong when percent sum != {0, 1}
+                throw new IOException(
+                    "Illegal" + " capacity sum of " + childrenPctSum
+                        + " for children of queue " + getQueueName() + " for 
label="
+                        + nodeLabel + ". It should be either 0 or 1.0");
+              } else {
+                // We also allow children's percent sum = 0 under the following
+                // conditions
+                // - Parent uses weight mode
+                // - Parent uses percent mode, and parent has
+                //   (capacity=0 OR allowZero)
+                if (parentCapacityType == QueueCapacityType.PERCENT) {
+                  if ((Math.abs(queueCapacities.getCapacity(nodeLabel))
+                      > PRECISION) && (!allowZeroCapacitySum)) {
+                    throw new IOException(
+                        "Illegal" + " capacity sum of " + childrenPctSum
+                            + " for children of queue " + getQueueName()
+                            + " for label=" + nodeLabel
+                            + ". It is set to 0, but parent percent != 0, and "
+                            + "doesn't allow children capacity to set to 0");
+                  }
+                }
+              }
+            } else {
+              // Even if child pct sum == 1.0, we will make sure parent has
+              // positive percent.
+              if (parentCapacityType == QueueCapacityType.PERCENT && Math.abs(
+                  queueCapacities.getCapacity(nodeLabel)) <= 0f
+                  && !allowZeroCapacitySum) {
+                throw new IOException(
+                    "Illegal" + " capacity sum of " + childrenPctSum
+                        + " for children of queue " + getQueueName() + " for 
label="
+                        + nodeLabel + ". queue=" + getQueueName()
+                        + " has zero capacity, but child"
+                        + "queues have positive capacities");
+              }
+            }
+          }
+        }
+      }
+
+      this.childQueues.clear();
+      this.childQueues.addAll(childQueues);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("setChildQueues: " + getChildQueuesToPrint());
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+
+  @Override
+  public QueueInfo getQueueInfo(
+      boolean includeChildQueues, boolean recursive) {
+    readLock.lock();
+    try {
+      QueueInfo queueInfo = getQueueInfo();
+
+      List<QueueInfo> childQueuesInfo = new ArrayList<>();
+      if (includeChildQueues) {
+        for (CSQueue child : childQueues) {
+          // Get queue information recursively?
+          childQueuesInfo.add(child.getQueueInfo(recursive, recursive));
+        }
+      }
+      queueInfo.setChildQueues(childQueuesInfo);
+
+      return queueInfo;
+    } finally {
+      readLock.unlock();
+    }
+
+  }
+
+
+  private QueueUserACLInfo getUserAclInfo(
+      UserGroupInformation user) {
+    readLock.lock();
+    try {
+      QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance(
+          QueueUserACLInfo.class);
+      List<QueueACL> operations = new ArrayList<QueueACL>();
+      for (QueueACL operation : QueueACL.values()) {
+        if (hasAccess(operation, user)) {
+          operations.add(operation);
+        }
+      }
+
+      userAclInfo.setQueueName(getQueuePath());
+      userAclInfo.setUserAcls(operations);
+      return userAclInfo;
+    } finally {
+      readLock.unlock();
+    }
+
+  }
+
+  @Override
+  public List<QueueUserACLInfo> getQueueUserAclInfo(
+      UserGroupInformation user) {
+    readLock.lock();
+    try {
+      List<QueueUserACLInfo> userAcls = new ArrayList<>();
+
+      // Add parent queue acls
+      userAcls.add(getUserAclInfo(user));
+
+      // Add children queue acls
+      for (CSQueue child : childQueues) {
+        userAcls.addAll(child.getQueueUserAclInfo(user));
+      }
+
+      return userAcls;
+    } finally {
+      readLock.unlock();
+    }
+
+  }
+
+  public String toString() {
+    return getQueueName() + ": " +
+        "numChildQueue= " + childQueues.size() + ", " +
+        getCapacityOrWeightString() + ", " +
+        "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " +
+        "usedResources=" + usageTracker.getQueueUsage().getUsed() + ", " +
+        "usedCapacity=" + getUsedCapacity() + ", " +
+        "numApps=" + getNumApplications() + ", " +
+        "numContainers=" + getNumContainers();
+  }
+
+  private CSQueue createNewQueue(String childQueuePath, boolean isLeaf)
+      throws SchedulerDynamicEditException {
+    try {
+      AbstractCSQueue childQueue;
+      String queueShortName = childQueuePath.substring(
+          childQueuePath.lastIndexOf(".") + 1);
+
+      if (isLeaf) {
+        childQueue = new LeafQueue(queueContext,
+            queueShortName, this, null, true);
+      } else {
+        childQueue = new ParentQueue(queueContext, queueShortName, this, null, 
true);
+      }
+      childQueue.setDynamicQueue(true);
+      // It should be sufficient now, we don't need to set more, because 
weights
+      // related setup will be handled in updateClusterResources
+
+      return childQueue;
+    } catch (IOException e) {
+      throw new SchedulerDynamicEditException(e.toString());
+    }
+  }
+
+
+  public AbstractParentQueue addDynamicParentQueue(String queuePath)

Review Comment:
   @p-szucs I agree with the comment you made, it would be better keep the 
functionality more accurate and clear.
   What do you suggest @bteke ?.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to