YARN-3243. CapacityScheduler should pass headroom from parent to children to make sure ParentQueue obey its capacity limits. Contributed by Wangda Tan. (cherry picked from commit 487374b7fe0c92fc7eb1406c568952722b5d5b15)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1c601e49 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1c601e49 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1c601e49 Branch: refs/heads/branch-2 Commit: 1c601e492f4cd80e012aa78b796383ee9de161fd Parents: 895588b Author: Jian He <[email protected]> Authored: Tue Mar 17 10:22:15 2015 -0700 Committer: Jian He <[email protected]> Committed: Tue Mar 17 10:25:07 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/capacity/AbstractCSQueue.java | 112 ++++++- .../scheduler/capacity/CSQueue.java | 4 +- .../scheduler/capacity/CapacityScheduler.java | 33 ++- .../scheduler/capacity/LeafQueue.java | 292 +++++++------------ .../scheduler/capacity/ParentQueue.java | 140 +++------ .../scheduler/common/fica/FiCaSchedulerApp.java | 16 +- .../capacity/TestApplicationLimits.java | 8 +- .../capacity/TestCapacityScheduler.java | 59 ++++ .../scheduler/capacity/TestChildQueueOrder.java | 25 +- .../scheduler/capacity/TestLeafQueue.java | 142 ++++----- .../scheduler/capacity/TestParentQueue.java | 97 +++--- .../scheduler/capacity/TestReservations.java | 147 +++++----- 13 files changed, 561 insertions(+), 517 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b2f25cd..e15fdf2 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -8,6 +8,9 @@ Release 2.8.0 - UNRELEASED IMPROVEMENTS + YARN-3243. CapacityScheduler should pass headroom from parent to children + to make sure ParentQueue obey its capacity limits. (Wangda Tan via jianhe) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index d800709..4e53060 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -20,10 +20,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; @@ -34,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType; @@ -49,6 +53,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.collect.Sets; public abstract class AbstractCSQueue implements CSQueue { + private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class); CSQueue parent; final String queueName; @@ -406,21 +411,102 @@ public abstract class AbstractCSQueue implements CSQueue { parentQ.getPreemptionDisabled()); } - protected Resource getCurrentResourceLimit(Resource clusterResource, - ResourceLimits currentResourceLimits) { + private Resource getCurrentLimitResource(String nodeLabel, + Resource clusterResource, ResourceLimits currentResourceLimits) { /* - * Queue's max available resource = min(my.max, my.limit) - * my.limit is set by my parent, considered used resource of my siblings + * Current limit resource: For labeled resource: limit = queue-max-resource + * (TODO, this part need update when we support labeled-limit) For + * non-labeled resource: limit = min(queue-max-resource, + * limit-set-by-parent) */ Resource queueMaxResource = - Resources.multiplyAndNormalizeDown(resourceCalculator, clusterResource, - queueCapacities.getAbsoluteMaximumCapacity(), minimumAllocation); - Resource queueCurrentResourceLimit = - Resources.min(resourceCalculator, clusterResource, queueMaxResource, - currentResourceLimits.getLimit()); - queueCurrentResourceLimit = - Resources.roundDown(resourceCalculator, queueCurrentResourceLimit, - minimumAllocation); - return queueCurrentResourceLimit; + Resources.multiplyAndNormalizeDown(resourceCalculator, + labelManager.getResourceByLabel(nodeLabel, clusterResource), + queueCapacities.getAbsoluteMaximumCapacity(nodeLabel), minimumAllocation); + if (nodeLabel.equals(RMNodeLabelsManager.NO_LABEL)) { + return Resources.min(resourceCalculator, clusterResource, + queueMaxResource, currentResourceLimits.getLimit()); + } + return queueMaxResource; + } + + synchronized boolean canAssignToThisQueue(Resource clusterResource, + Set<String> nodeLabels, ResourceLimits currentResourceLimits, + Resource nowRequired, Resource resourceCouldBeUnreserved) { + // Get label of this queue can access, it's (nodeLabel AND queueLabel) + Set<String> labelCanAccess; + if (null == nodeLabels || nodeLabels.isEmpty()) { + labelCanAccess = new HashSet<String>(); + // Any queue can always access any node without label + labelCanAccess.add(RMNodeLabelsManager.NO_LABEL); + } else { + labelCanAccess = new HashSet<String>( + accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels + : Sets.intersection(accessibleLabels, nodeLabels)); + } + + for (String label : labelCanAccess) { + // New total resource = used + required + Resource newTotalResource = + Resources.add(queueUsage.getUsed(label), nowRequired); + + Resource currentLimitResource = + getCurrentLimitResource(label, clusterResource, currentResourceLimits); + + // if reservation continous looking enabled, check to see if could we + // potentially use this node instead of a reserved node if the application + // has reserved containers. + // TODO, now only consider reservation cases when the node has no label + if (this.reservationsContinueLooking + && label.equals(RMNodeLabelsManager.NO_LABEL) + && Resources.greaterThan(resourceCalculator, clusterResource, + resourceCouldBeUnreserved, Resources.none())) { + // resource-without-reserved = used - reserved + Resource newTotalWithoutReservedResource = + Resources.subtract(newTotalResource, resourceCouldBeUnreserved); + + // when total-used-without-reserved-resource < currentLimit, we still + // have chance to allocate on this node by unreserving some containers + if (Resources.lessThan(resourceCalculator, clusterResource, + newTotalWithoutReservedResource, currentLimitResource)) { + if (LOG.isDebugEnabled()) { + LOG.debug("try to use reserved: " + getQueueName() + + " usedResources: " + queueUsage.getUsed() + + ", clusterResources: " + clusterResource + + ", reservedResources: " + resourceCouldBeUnreserved + + ", capacity-without-reserved: " + + newTotalWithoutReservedResource + ", maxLimitCapacity: " + + currentLimitResource); + } + return true; + } + } + + // Otherwise, if any of the label of this node beyond queue limit, we + // cannot allocate on this node. Consider a small epsilon here. + if (Resources.greaterThan(resourceCalculator, clusterResource, + newTotalResource, currentLimitResource)) { + return false; + } + + if (LOG.isDebugEnabled()) { + LOG.debug(getQueueName() + + "Check assign to queue, label=" + label + + " usedResources: " + queueUsage.getUsed(label) + + " clusterResources: " + clusterResource + + " currentUsedCapacity " + + Resources.divide(resourceCalculator, clusterResource, + queueUsage.getUsed(label), + labelManager.getResourceByLabel(label, clusterResource)) + + " max-capacity: " + + queueCapacities.getAbsoluteMaximumCapacity(label) + + ")"); + } + return true; + } + + // Actually, this will not happen, since labelCanAccess will be always + // non-empty + return false; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 0a60acc..1a9448a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -189,13 +189,11 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { * Assign containers to applications in the queue or it's children (if any). * @param clusterResource the resource of the cluster. * @param node node on which resources are available - * @param needToUnreserve assign container only if it can unreserve one first * @param resourceLimits how much overall resource of this queue can use. * @return the assignment */ public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, boolean needToUnreserve, - ResourceLimits resourceLimits); + FiCaSchedulerNode node, ResourceLimits resourceLimits); /** * A container assigned to the queue has completed. http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 756e537..c86c0ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1061,9 +1061,14 @@ public class CapacityScheduler extends node.getNodeID()); LeafQueue queue = ((LeafQueue)reservedApplication.getQueue()); - CSAssignment assignment = queue.assignContainers(clusterResource, node, - false, new ResourceLimits( - clusterResource)); + CSAssignment assignment = + queue.assignContainers( + clusterResource, + node, + // TODO, now we only consider limits for parent for non-labeled + // resources, should consider labeled resources as well. + new ResourceLimits(labelManager.getResourceByLabel( + RMNodeLabelsManager.NO_LABEL, clusterResource))); RMContainer excessReservation = assignment.getExcessReservation(); if (excessReservation != null) { @@ -1087,8 +1092,13 @@ public class CapacityScheduler extends LOG.debug("Trying to schedule on node: " + node.getNodeName() + ", available: " + node.getAvailableResource()); } - root.assignContainers(clusterResource, node, false, new ResourceLimits( - clusterResource)); + root.assignContainers( + clusterResource, + node, + // TODO, now we only consider limits for parent for non-labeled + // resources, should consider labeled resources as well. + new ResourceLimits(labelManager.getResourceByLabel( + RMNodeLabelsManager.NO_LABEL, clusterResource))); } } else { LOG.info("Skipping scheduling since node " + node.getNodeID() + @@ -1209,6 +1219,13 @@ public class CapacityScheduler extends usePortForNodeName, nodeManager.getNodeLabels()); this.nodes.put(nodeManager.getNodeID(), schedulerNode); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); + + // update this node to node label manager + if (labelManager != null) { + labelManager.activateNode(nodeManager.getNodeID(), + nodeManager.getTotalCapability()); + } + root.updateClusterResource(clusterResource, new ResourceLimits( clusterResource)); int numNodes = numNodeManagers.incrementAndGet(); @@ -1220,12 +1237,6 @@ public class CapacityScheduler extends if (scheduleAsynchronously && numNodes == 1) { asyncSchedulerThread.beginSchedule(); } - - // update this node to node label manager - if (labelManager != null) { - labelManager.activateNode(nodeManager.getNodeID(), - nodeManager.getTotalCapability()); - } } private synchronized void removeNode(RMNode nodeInfo) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index a607a62..dd6a894 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -76,7 +76,6 @@ import org.apache.hadoop.yarn.server.utils.Lock.NoLock; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Sets; @Private @Unstable @@ -157,7 +156,7 @@ public class LeafQueue extends AbstractCSQueue { // and all queues may not be realized yet, we'll use (optimistic) // absoluteMaxCapacity (it will be replaced with the more accurate // absoluteMaxAvailCapacity during headroom/userlimit/allocation events) - computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource); + setQueueResourceLimitsInfo(clusterResource); CapacitySchedulerConfiguration conf = csContext.getConfiguration(); userLimit = conf.getUserLimit(getQueuePath()); @@ -739,9 +738,8 @@ public class LeafQueue extends AbstractCSQueue { @Override public synchronized CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, boolean needToUnreserve, - ResourceLimits currentResourceLimits) { - this.currentResourceLimits = currentResourceLimits; + FiCaSchedulerNode node, ResourceLimits currentResourceLimits) { + updateCurrentResourceLimits(currentResourceLimits, clusterResource); if(LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() @@ -796,7 +794,7 @@ public class LeafQueue extends AbstractCSQueue { continue; } if (!this.reservationsContinueLooking) { - if (!needContainers(application, priority, required)) { + if (!shouldAllocOrReserveNewContainer(application, priority, required)) { if (LOG.isDebugEnabled()) { LOG.debug("doesn't need containers based on reservation algo!"); } @@ -818,8 +816,8 @@ public class LeafQueue extends AbstractCSQueue { required, requestedNodeLabels); // Check queue max-capacity limit - if (!canAssignToThisQueue(clusterResource, required, - node.getLabels(), application, true)) { + if (!super.canAssignToThisQueue(clusterResource, node.getLabels(), + this.currentResourceLimits, required, application.getCurrentReservation())) { return NULL_ASSIGNMENT; } @@ -835,7 +833,7 @@ public class LeafQueue extends AbstractCSQueue { // Try to schedule CSAssignment assignment = assignContainersOnNode(clusterResource, node, application, priority, - null, needToUnreserve); + null); // Did the application skip this node? if (assignment.getSkipped()) { @@ -896,7 +894,7 @@ public class LeafQueue extends AbstractCSQueue { // Try to assign if we have sufficient resources assignContainersOnNode(clusterResource, node, application, priority, - rmContainer, false); + rmContainer); // Doesn't matter... since it's already charged for at time of reservation // "re-reservation" is *free* @@ -938,102 +936,14 @@ public class LeafQueue extends AbstractCSQueue { Resources.roundDown(resourceCalculator, headroom, minimumAllocation); return headroom; } - - synchronized boolean canAssignToThisQueue(Resource clusterResource, - Resource required, Set<String> nodeLabels, FiCaSchedulerApp application, - boolean checkReservations) { - // Get label of this queue can access, it's (nodeLabel AND queueLabel) - Set<String> labelCanAccess; - if (null == nodeLabels || nodeLabels.isEmpty()) { - labelCanAccess = new HashSet<String>(); - // Any queue can always access any node without label - labelCanAccess.add(RMNodeLabelsManager.NO_LABEL); - } else { - labelCanAccess = new HashSet<String>(Sets.intersection(accessibleLabels, nodeLabels)); - } - - boolean canAssign = true; - for (String label : labelCanAccess) { - Resource potentialTotalCapacity = - Resources.add(queueUsage.getUsed(label), required); - - float potentialNewCapacity = - Resources.divide(resourceCalculator, clusterResource, - potentialTotalCapacity, - labelManager.getResourceByLabel(label, clusterResource)); - // if enabled, check to see if could we potentially use this node instead - // of a reserved node if the application has reserved containers - // TODO, now only consider reservation cases when the node has no label - if (this.reservationsContinueLooking && checkReservations - && label.equals(RMNodeLabelsManager.NO_LABEL)) { - float potentialNewWithoutReservedCapacity = Resources.divide( - resourceCalculator, - clusterResource, - Resources.subtract(potentialTotalCapacity, - application.getCurrentReservation()), - labelManager.getResourceByLabel(label, clusterResource)); - - if (potentialNewWithoutReservedCapacity <= queueCapacities - .getAbsoluteMaximumCapacity()) { - if (LOG.isDebugEnabled()) { - LOG.debug("try to use reserved: " - + getQueueName() - + " usedResources: " - + queueUsage.getUsed() - + " clusterResources: " - + clusterResource - + " reservedResources: " - + application.getCurrentReservation() - + " currentCapacity " - + Resources.divide(resourceCalculator, clusterResource, - queueUsage.getUsed(), clusterResource) + " required " + required - + " potentialNewWithoutReservedCapacity: " - + potentialNewWithoutReservedCapacity + " ( " - + " max-capacity: " - + queueCapacities.getAbsoluteMaximumCapacity() + ")"); - } - // we could potentially use this node instead of reserved node - return true; - } - } - - // Otherwise, if any of the label of this node beyond queue limit, we - // cannot allocate on this node. Consider a small epsilon here. - if (potentialNewCapacity > queueCapacities - .getAbsoluteMaximumCapacity(label) + 1e-4) { - canAssign = false; - break; - } - - if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() - + "Check assign to queue, label=" + label - + " usedResources: " + queueUsage.getUsed(label) - + " clusterResources: " + clusterResource - + " currentCapacity " - + Resources.divide(resourceCalculator, clusterResource, - queueUsage.getUsed(label), - labelManager.getResourceByLabel(label, clusterResource)) - + " potentialNewCapacity: " + potentialNewCapacity + " ( " - + " max-capacity: " + queueCapacities.getAbsoluteMaximumCapacity() - + ")"); - } - } - - return canAssign; - } - private Resource computeQueueCurrentLimitAndSetHeadroomInfo( + private void setQueueResourceLimitsInfo( Resource clusterResource) { - Resource queueCurrentResourceLimit = - getCurrentResourceLimit(clusterResource, currentResourceLimits); - synchronized (queueResourceLimitsInfo) { - queueResourceLimitsInfo.setQueueCurrentLimit(queueCurrentResourceLimit); + queueResourceLimitsInfo.setQueueCurrentLimit(currentResourceLimits + .getLimit()); queueResourceLimitsInfo.setClusterResource(clusterResource); } - - return queueCurrentResourceLimit; } @Lock({LeafQueue.class, FiCaSchedulerApp.class}) @@ -1048,16 +958,16 @@ public class LeafQueue extends AbstractCSQueue { computeUserLimit(application, clusterResource, required, queueUser, requestedLabels); - Resource currentResourceLimit = - computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource); + setQueueResourceLimitsInfo(clusterResource); Resource headroom = - getHeadroom(queueUser, currentResourceLimit, clusterResource, userLimit); + getHeadroom(queueUser, currentResourceLimits.getLimit(), + clusterResource, userLimit); if (LOG.isDebugEnabled()) { LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" + userLimit + - " queueMaxAvailRes=" + currentResourceLimit + + " queueMaxAvailRes=" + currentResourceLimits.getLimit() + " consumed=" + queueUser.getUsed() + " headroom=" + headroom); } @@ -1207,8 +1117,8 @@ public class LeafQueue extends AbstractCSQueue { return true; } - boolean needContainers(FiCaSchedulerApp application, Priority priority, - Resource required) { + boolean shouldAllocOrReserveNewContainer(FiCaSchedulerApp application, + Priority priority, Resource required) { int requiredContainers = application.getTotalRequiredResources(priority); int reservedContainers = application.getNumReservedContainers(priority); int starvation = 0; @@ -1240,7 +1150,7 @@ public class LeafQueue extends AbstractCSQueue { private CSAssignment assignContainersOnNode(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, boolean needToUnreserve) { + RMContainer reservedContainer) { Resource assigned = Resources.none(); NodeType requestType = null; @@ -1252,7 +1162,7 @@ public class LeafQueue extends AbstractCSQueue { requestType = NodeType.NODE_LOCAL; assigned = assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, - node, application, priority, reservedContainer, needToUnreserve, + node, application, priority, reservedContainer, allocatedContainer); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -1280,7 +1190,7 @@ public class LeafQueue extends AbstractCSQueue { assigned = assignRackLocalContainers(clusterResource, rackLocalResourceRequest, - node, application, priority, reservedContainer, needToUnreserve, + node, application, priority, reservedContainer, allocatedContainer); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -1308,7 +1218,7 @@ public class LeafQueue extends AbstractCSQueue { assigned = assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, - node, application, priority, reservedContainer, needToUnreserve, + node, application, priority, reservedContainer, allocatedContainer); // update locality statistics @@ -1320,13 +1230,24 @@ public class LeafQueue extends AbstractCSQueue { return SKIP_ASSIGNMENT; } + + private Resource getMinimumResourceNeedUnreserved(Resource askedResource) { + // First we need to get minimum resource we need unreserve + // minimum-resource-need-unreserve = used + asked - limit + Resource minimumUnreservedResource = + Resources.subtract(Resources.add(queueUsage.getUsed(), askedResource), + currentResourceLimits.getLimit()); + return minimumUnreservedResource; + } @Private protected boolean findNodeToUnreserve(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - Resource capability) { + Resource askedResource, Resource minimumUnreservedResource) { // need to unreserve some other container first - NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, capability); + NodeId idToUnreserve = + application.getNodeIdToUnreserve(priority, minimumUnreservedResource, + resourceCalculator, clusterResource); if (idToUnreserve == null) { if (LOG.isDebugEnabled()) { LOG.debug("checked to see if could unreserve for app but nothing " @@ -1343,7 +1264,7 @@ public class LeafQueue extends AbstractCSQueue { LOG.debug("unreserving for app: " + application.getApplicationId() + " on nodeId: " + idToUnreserve + " in order to replace reserved application and place it on node: " - + node.getNodeID() + " needing: " + capability); + + node.getNodeID() + " needing: " + askedResource); } // headroom @@ -1364,15 +1285,7 @@ public class LeafQueue extends AbstractCSQueue { @Private protected boolean checkLimitsToReserve(Resource clusterResource, - FiCaSchedulerApp application, Resource capability, - boolean needToUnreserve) { - if (needToUnreserve) { - if (LOG.isDebugEnabled()) { - LOG.debug("we needed to unreserve to be able to allocate"); - } - return false; - } - + FiCaSchedulerApp application, Resource capability) { // we can't reserve if we got here based on the limit // checks assuming we could unreserve!!! Resource userLimit = computeUserLimitAndSetHeadroom(application, @@ -1380,7 +1293,8 @@ public class LeafQueue extends AbstractCSQueue { // Check queue max-capacity limit, // TODO: Consider reservation on labels - if (!canAssignToThisQueue(clusterResource, capability, null, application, false)) { + if (!canAssignToThisQueue(clusterResource, null, + this.currentResourceLimits, capability, Resources.none())) { if (LOG.isDebugEnabled()) { LOG.debug("was going to reserve but hit queue limit"); } @@ -1402,43 +1316,40 @@ public class LeafQueue extends AbstractCSQueue { private Resource assignNodeLocalContainers(Resource clusterResource, ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, boolean needToUnreserve, - MutableObject allocatedContainer) { + RMContainer reservedContainer, MutableObject allocatedContainer) { if (canAssign(application, priority, node, NodeType.NODE_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, - needToUnreserve, allocatedContainer); + allocatedContainer); } return Resources.none(); } - private Resource assignRackLocalContainers( - Resource clusterResource, ResourceRequest rackLocalResourceRequest, - FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, boolean needToUnreserve, - MutableObject allocatedContainer) { + private Resource assignRackLocalContainers(Resource clusterResource, + ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority, + RMContainer reservedContainer, MutableObject allocatedContainer) { if (canAssign(application, priority, node, NodeType.RACK_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, - needToUnreserve, allocatedContainer); + allocatedContainer); } return Resources.none(); } - private Resource assignOffSwitchContainers( - Resource clusterResource, ResourceRequest offSwitchResourceRequest, - FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, boolean needToUnreserve, - MutableObject allocatedContainer) { + private Resource assignOffSwitchContainers(Resource clusterResource, + ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority, + RMContainer reservedContainer, MutableObject allocatedContainer) { if (canAssign(application, priority, node, NodeType.OFF_SWITCH, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, - needToUnreserve, allocatedContainer); + allocatedContainer); } return Resources.none(); @@ -1522,13 +1433,12 @@ public class LeafQueue extends AbstractCSQueue { private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, ResourceRequest request, NodeType type, RMContainer rmContainer, - boolean needToUnreserve, MutableObject createdContainer) { + MutableObject createdContainer) { if (LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() + " application=" + application.getApplicationId() + " priority=" + priority.getPriority() - + " request=" + request + " type=" + type - + " needToUnreserve= " + needToUnreserve); + + " request=" + request + " type=" + type); } // check if the resource request can access the label @@ -1548,12 +1458,14 @@ public class LeafQueue extends AbstractCSQueue { Resource available = node.getAvailableResource(); Resource totalResource = node.getTotalResource(); - if (!Resources.fitsIn(capability, totalResource)) { + if (!Resources.lessThanOrEqual(resourceCalculator, clusterResource, + capability, totalResource)) { LOG.warn("Node : " + node.getNodeID() + " does not have sufficient resource for request : " + request + " node total capability : " + node.getTotalResource()); return Resources.none(); } + assert Resources.greaterThan( resourceCalculator, clusterResource, available, Resources.none()); @@ -1566,18 +1478,9 @@ public class LeafQueue extends AbstractCSQueue { LOG.warn("Couldn't get container for allocation!"); return Resources.none(); } - - // default to true since if reservation continue look feature isn't on - // needContainers is checked earlier and we wouldn't have gotten this far - boolean canAllocContainer = true; - if (this.reservationsContinueLooking) { - // based on reservations can we allocate/reserve more or do we need - // to unreserve one first - canAllocContainer = needContainers(application, priority, capability); - if (LOG.isDebugEnabled()) { - LOG.debug("can alloc container is: " + canAllocContainer); - } - } + + boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer( + application, priority, capability); // Can we allocate a container on this node? int availableContainers = @@ -1588,25 +1491,25 @@ public class LeafQueue extends AbstractCSQueue { // Did we previously reserve containers at this 'priority'? if (rmContainer != null) { unreserve(application, priority, node, rmContainer); - } else if (this.reservationsContinueLooking - && (!canAllocContainer || needToUnreserve)) { - // need to unreserve some other container first - boolean res = findNodeToUnreserve(clusterResource, node, application, - priority, capability); - if (!res) { - return Resources.none(); - } - } else { - // we got here by possibly ignoring queue capacity limits. If the - // parameter needToUnreserve is true it means we ignored one of those - // limits in the chance we could unreserve. If we are here we aren't - // trying to unreserve so we can't allocate anymore due to that parent - // limit. - if (needToUnreserve) { - if (LOG.isDebugEnabled()) { - LOG.debug("we needed to unreserve to be able to allocate, skipping"); + } else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) { + // when reservationsContinueLooking is set, we may need to unreserve + // some containers to meet this queue and its parents' resource limits + // TODO, need change here when we want to support continuous reservation + // looking for labeled partitions. + Resource minimumUnreservedResource = + getMinimumResourceNeedUnreserved(capability); + if (!shouldAllocOrReserveNewContainer + || Resources.greaterThan(resourceCalculator, clusterResource, + minimumUnreservedResource, Resources.none())) { + boolean containerUnreserved = + findNodeToUnreserve(clusterResource, node, application, priority, + capability, minimumUnreservedResource); + // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved + // container (That means we *have to* unreserve some resource to + // continue)). If we failed to unreserve some resource, + if (!containerUnreserved) { + return Resources.none(); } - return Resources.none(); } } @@ -1632,17 +1535,16 @@ public class LeafQueue extends AbstractCSQueue { } else { // if we are allowed to allocate but this node doesn't have space, reserve it or // if this was an already a reserved container, reserve it again - if ((canAllocContainer) || (rmContainer != null)) { - - if (reservationsContinueLooking) { - // we got here by possibly ignoring parent queue capacity limits. If - // the parameter needToUnreserve is true it means we ignored one of - // those limits in the chance we could unreserve. If we are here - // we aren't trying to unreserve so we can't allocate - // anymore due to that parent limit - boolean res = checkLimitsToReserve(clusterResource, application, capability, - needToUnreserve); - if (!res) { + if (shouldAllocOrReserveNewContainer || rmContainer != null) { + + if (reservationsContinueLooking && rmContainer == null) { + // we could possibly ignoring parent queue capacity limits when + // reservationsContinueLooking is set. + // If we're trying to reserve a container here, not container will be + // unreserved for reserving the new one. Check limits again before + // reserve the new container + if (!checkLimitsToReserve(clusterResource, + application, capability)) { return Resources.none(); } } @@ -1784,18 +1686,36 @@ public class LeafQueue extends AbstractCSQueue { Resources.multiplyAndNormalizeUp(resourceCalculator, clusterResource, queueCapacities.getAbsoluteCapacity(), minimumAllocation); } + + private void updateCurrentResourceLimits( + ResourceLimits currentResourceLimits, Resource clusterResource) { + // TODO: need consider non-empty node labels when resource limits supports + // node labels + // Even if ParentQueue will set limits respect child's max queue capacity, + // but when allocating reserved container, CapacityScheduler doesn't do + // this. So need cap limits by queue's max capacity here. + this.currentResourceLimits = currentResourceLimits; + Resource queueMaxResource = + Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager + .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource), + queueCapacities + .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL), + minimumAllocation); + this.currentResourceLimits.setLimit(Resources.min(resourceCalculator, + clusterResource, queueMaxResource, currentResourceLimits.getLimit())); + } @Override public synchronized void updateClusterResource(Resource clusterResource, ResourceLimits currentResourceLimits) { - this.currentResourceLimits = currentResourceLimits; + updateCurrentResourceLimits(currentResourceLimits, clusterResource); lastClusterResource = clusterResource; updateAbsoluteCapacityResource(clusterResource); // Update headroom info based on new cluster resource value // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity // during allocation - computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource); + setQueueResourceLimitsInfo(clusterResource); // Update metrics CSQueueUtils.updateQueueStatistics( http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 7feaa15..5ed6bb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -48,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -63,8 +61,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.collect.Sets; - @Private @Evolving public class ParentQueue extends AbstractCSQueue { @@ -380,8 +376,7 @@ public class ParentQueue extends AbstractCSQueue { @Override public synchronized CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, boolean needToUnreserve, - ResourceLimits resourceLimits) { + FiCaSchedulerNode node, ResourceLimits resourceLimits) { CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); Set<String> nodeLabels = node.getLabels(); @@ -397,21 +392,18 @@ public class ParentQueue extends AbstractCSQueue { + getQueueName()); } - boolean localNeedToUnreserve = false; - // Are we over maximum-capacity for this queue? - if (!canAssignToThisQueue(clusterResource, nodeLabels)) { - // check to see if we could if we unreserve first - localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource); - if (!localNeedToUnreserve) { - break; - } + // This will also consider parent's limits and also continuous reservation + // looking + if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits, + minimumAllocation, Resources.createResource(getMetrics() + .getReservedMB(), getMetrics().getReservedVirtualCores()))) { + break; } // Schedule CSAssignment assignedToChild = - assignContainersToChildQueues(clusterResource, node, - localNeedToUnreserve | needToUnreserve, resourceLimits); + assignContainersToChildQueues(clusterResource, node, resourceLimits); assignment.setType(assignedToChild.getType()); // Done if no child-queue assigned anything @@ -459,74 +451,6 @@ public class ParentQueue extends AbstractCSQueue { return assignment; } - private synchronized boolean canAssignToThisQueue(Resource clusterResource, - Set<String> nodeLabels) { - Set<String> labelCanAccess = - new HashSet<String>( - accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels - : Sets.intersection(accessibleLabels, nodeLabels)); - if (nodeLabels.isEmpty()) { - // Any queue can always access any node without label - labelCanAccess.add(RMNodeLabelsManager.NO_LABEL); - } - - boolean canAssign = true; - for (String label : labelCanAccess) { - float currentAbsoluteLabelUsedCapacity = - Resources.divide(resourceCalculator, clusterResource, - queueUsage.getUsed(label), - labelManager.getResourceByLabel(label, clusterResource)); - // if any of the label doesn't beyond limit, we can allocate on this node - if (currentAbsoluteLabelUsedCapacity >= - queueCapacities.getAbsoluteMaximumCapacity(label)) { - if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() + " used=" + queueUsage.getUsed() - + " current-capacity (" + queueUsage.getUsed(label) + ") " - + " >= max-capacity (" - + labelManager.getResourceByLabel(label, clusterResource) + ")"); - } - canAssign = false; - break; - } - } - - return canAssign; - } - - - private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) { - if (this.reservationsContinueLooking) { - // check to see if we could potentially use this node instead of a reserved - // node - - Resource reservedResources = Resources.createResource(getMetrics() - .getReservedMB(), getMetrics().getReservedVirtualCores()); - float capacityWithoutReservedCapacity = Resources.divide( - resourceCalculator, clusterResource, - Resources.subtract(queueUsage.getUsed(), reservedResources), - clusterResource); - - if (capacityWithoutReservedCapacity <= queueCapacities - .getAbsoluteMaximumCapacity()) { - if (LOG.isDebugEnabled()) { - LOG.debug("parent: try to use reserved: " + getQueueName() - + " usedResources: " + queueUsage.getUsed().getMemory() - + " clusterResources: " + clusterResource.getMemory() - + " reservedResources: " + reservedResources.getMemory() - + " currentCapacity " + ((float) queueUsage.getUsed().getMemory()) - / clusterResource.getMemory() - + " potentialNewWithoutReservedCapacity: " - + capacityWithoutReservedCapacity + " ( " + " max-capacity: " - + queueCapacities.getAbsoluteMaximumCapacity() + ")"); - } - // we could potentially use this node instead of reserved node - return true; - } - } - return false; - } - - private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { return (node.getReservedContainer() == null) && Resources.greaterThanOrEqual(resourceCalculator, clusterResource, @@ -534,28 +458,38 @@ public class ParentQueue extends AbstractCSQueue { } private ResourceLimits getResourceLimitsOfChild(CSQueue child, - Resource clusterResource, ResourceLimits myLimits) { - /* - * Set head-room of a given child, limit = - * min(minimum-of-limit-of-this-queue-and-ancestors, this.max) - this.used - * + child.used. To avoid any of this queue's and its ancestors' limit - * being violated - */ - Resource myCurrentLimit = - getCurrentResourceLimit(clusterResource, myLimits); - // My available resource = my-current-limit - my-used-resource - Resource myMaxAvailableResource = Resources.subtract(myCurrentLimit, - getUsedResources()); - // Child's limit = my-available-resource + resource-already-used-by-child + Resource clusterResource, ResourceLimits parentLimits) { + // Set resource-limit of a given child, child.limit = + // min(my.limit - my.used + child.used, child.max) + + // Parent available resource = parent-limit - parent-used-resource + Resource parentMaxAvailableResource = + Resources.subtract(parentLimits.getLimit(), getUsedResources()); + + // Child's limit = parent-available-resource + child-used Resource childLimit = - Resources.add(myMaxAvailableResource, child.getUsedResources()); - + Resources.add(parentMaxAvailableResource, child.getUsedResources()); + + // Get child's max resource + Resource childConfiguredMaxResource = + Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager + .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource), + child.getAbsoluteMaximumCapacity(), minimumAllocation); + + // Child's limit should be capped by child configured max resource + childLimit = + Resources.min(resourceCalculator, clusterResource, childLimit, + childConfiguredMaxResource); + + // Normalize before return + childLimit = + Resources.roundDown(resourceCalculator, childLimit, minimumAllocation); + return new ResourceLimits(childLimit); } private synchronized CSAssignment assignContainersToChildQueues( - Resource cluster, FiCaSchedulerNode node, boolean needToUnreserve, - ResourceLimits limits) { + Resource cluster, FiCaSchedulerNode node, ResourceLimits limits) { CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); @@ -573,9 +507,7 @@ public class ParentQueue extends AbstractCSQueue { ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, cluster, limits); - assignment = - childQueue.assignContainers(cluster, node, needToUnreserve, - childLimits); + assignment = childQueue.assignContainers(cluster, node, childLimits); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + " stats: " + childQueue + " --> " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 9f97b13..6cc2777 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -274,7 +274,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { } synchronized public NodeId getNodeIdToUnreserve(Priority priority, - Resource capability) { + Resource resourceNeedUnreserve, ResourceCalculator rc, + Resource clusterResource) { // first go around make this algorithm simple and just grab first // reservation that has enough resources @@ -283,16 +284,19 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { if ((reservedContainers != null) && (!reservedContainers.isEmpty())) { for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) { + NodeId nodeId = entry.getKey(); + Resource containerResource = entry.getValue().getContainer().getResource(); + // make sure we unreserve one with at least the same amount of // resources, otherwise could affect capacity limits - if (Resources.fitsIn(capability, entry.getValue().getContainer() - .getResource())) { + if (Resources.lessThanOrEqual(rc, clusterResource, + resourceNeedUnreserve, containerResource)) { if (LOG.isDebugEnabled()) { LOG.debug("unreserving node with reservation size: " - + entry.getValue().getContainer().getResource() - + " in order to allocate container with size: " + capability); + + containerResource + + " in order to allocate container with size: " + resourceNeedUnreserve); } - return entry.getKey(); + return nodeId; } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 8cad057..1ca5c97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -611,7 +611,7 @@ public class TestApplicationLimits { app_0_0.updateResourceRequests(app_0_0_requests); // Schedule to compute - queue.assignContainers(clusterResource, node_0, false, new ResourceLimits( + queue.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); Resource expectedHeadroom = Resources.createResource(10*16*GB, 1); assertEquals(expectedHeadroom, app_0_0.getHeadroom()); @@ -631,7 +631,7 @@ public class TestApplicationLimits { app_0_1.updateResourceRequests(app_0_1_requests); // Schedule to compute - queue.assignContainers(clusterResource, node_0, false, new ResourceLimits( + queue.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); // Schedule to compute assertEquals(expectedHeadroom, app_0_0.getHeadroom()); assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change @@ -651,7 +651,7 @@ public class TestApplicationLimits { app_1_0.updateResourceRequests(app_1_0_requests); // Schedule to compute - queue.assignContainers(clusterResource, node_0, false, new ResourceLimits( + queue.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); // Schedule to compute expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes assertEquals(expectedHeadroom, app_0_0.getHeadroom()); @@ -660,7 +660,7 @@ public class TestApplicationLimits { // Now reduce cluster size and check for the smaller headroom clusterResource = Resources.createResource(90*16*GB); - queue.assignContainers(clusterResource, node_0, false, new ResourceLimits( + queue.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); // Schedule to compute expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes assertEquals(expectedHeadroom, app_0_0.getHeadroom()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 83ab104..7a265dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -125,6 +125,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.ComparisonFailure; import org.junit.Test; import org.mockito.Mockito; @@ -2483,6 +2484,64 @@ public class TestCapacityScheduler { Assert.assertEquals(30 * GB, am1.doHeartbeat().getAvailableResources().getMemory()); } + + @Test + public void testParentQueueMaxCapsAreRespected() throws Exception { + /* + * Queue tree: + * Root + * / \ + * A B + * / \ + * A1 A2 + */ + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); + csConf.setCapacity(A, 50); + csConf.setMaximumCapacity(A, 50); + csConf.setCapacity(B, 50); + + // Define 2nd-level queues + csConf.setQueues(A, new String[] {"a1", "a2"}); + csConf.setCapacity(A1, 50); + csConf.setUserLimitFactor(A1, 100.0f); + csConf.setCapacity(A2, 50); + csConf.setUserLimitFactor(A2, 100.0f); + csConf.setCapacity(B1, B1_CAPACITY); + csConf.setUserLimitFactor(B1, 100.0f); + + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 24 * GB, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + waitContainerAllocated(am1, 4 * GB, 2, 2, rm1, nm1); + + // Try to launch app2 in a2, asked 2GB, should success + RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "a2"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + try { + // Try to allocate a container, a's usage=11G/max=12 + // a1's usage=9G/max=12 + // a2's usage=2G/max=12 + // In this case, if a2 asked 2G, should fail. + waitContainerAllocated(am2, 2 * GB, 1, 2, rm1, nm1); + } catch (AssertionError failure) { + // Expected, return; + return; + } + Assert.fail("Shouldn't successfully allocate containers for am2, " + + "queue-a's max capacity will be violated if container allocated"); + } private void setMaxAllocMb(Configuration conf, int maxAllocMb) { conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 7edb17d..71dc523 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -145,7 +144,7 @@ public class TestChildQueueOrder { if (allocation > 0) { doReturn(new CSAssignment(Resources.none(), type)). when(queue) - .assignContainers(eq(clusterResource), eq(node), anyBoolean(), + .assignContainers(eq(clusterResource), eq(node), any(ResourceLimits.class)); // Mock the node's resource availability @@ -157,7 +156,7 @@ public class TestChildQueueOrder { return new CSAssignment(allocatedResource, type); } }). - when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean(), + when(queue).assignContainers(eq(clusterResource), eq(node), any(ResourceLimits.class)); doNothing().when(node).releaseContainer(any(Container.class)); } @@ -274,7 +273,7 @@ public class TestChildQueueOrder { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + root.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); for(int i=0; i < 2; i++) { @@ -282,7 +281,7 @@ public class TestChildQueueOrder { stubQueueAllocation(b, clusterResource, node_0, 1*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + root.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); } for(int i=0; i < 3; i++) @@ -291,7 +290,7 @@ public class TestChildQueueOrder { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 1*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + root.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); } for(int i=0; i < 4; i++) @@ -300,7 +299,7 @@ public class TestChildQueueOrder { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 1*GB); - root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + root.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); } verifyQueueMetrics(a, 1*GB, clusterResource); @@ -334,7 +333,7 @@ public class TestChildQueueOrder { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + root.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); } verifyQueueMetrics(a, 3*GB, clusterResource); @@ -362,7 +361,7 @@ public class TestChildQueueOrder { stubQueueAllocation(b, clusterResource, node_0, 1*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + root.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 3*GB, clusterResource); @@ -389,7 +388,7 @@ public class TestChildQueueOrder { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + root.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -404,13 +403,13 @@ public class TestChildQueueOrder { stubQueueAllocation(b, clusterResource, node_0, 1*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 1*GB); - root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + root.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); InOrder allocationOrder = inOrder(d,b); allocationOrder.verify(d).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class)); + any(FiCaSchedulerNode.class), any(ResourceLimits.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class)); + any(FiCaSchedulerNode.class), any(ResourceLimits.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource);
