http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index b1f239c..5beed37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -18,7 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,10 +39,14 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.collect.ImmutableSet; + public class CapacitySchedulerConfiguration extends Configuration { private static final Log LOG = @@ -83,6 +95,12 @@ public class CapacitySchedulerConfiguration extends Configuration { public static final String STATE = "state"; @Private + public static final String ACCESSIBLE_NODE_LABELS = "accessible-node-labels"; + + @Private + public static final String DEFAULT_NODE_LABEL_EXPRESSION = + "default-node-label-expression"; + public static final String RESERVE_CONT_LOOK_ALL_NODES = PREFIX + "reservations-continue-look-all-nodes"; @@ -268,6 +286,10 @@ public class CapacitySchedulerConfiguration extends Configuration { return queueName; } + private String getNodeLabelPrefix(String queue, String label) { + return getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label + DOT; + } + public int getMaximumSystemApplications() { int maxApplications = getInt(MAXIMUM_SYSTEM_APPLICATIONS, DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS); @@ -343,6 +365,15 @@ public class CapacitySchedulerConfiguration extends Configuration { ", maxCapacity=" + maxCapacity); } + public void setCapacityByLabel(String queue, String label, float capacity) { + setFloat(getNodeLabelPrefix(queue, label) + CAPACITY, capacity); + } + + public void setMaximumCapacityByLabel(String queue, String label, + float capacity) { + setFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY, capacity); + } + public int getUserLimit(String queue) { int userLimit = getInt(getQueuePrefix(queue) + USER_LIMIT, DEFAULT_USER_LIMIT); @@ -372,6 +403,121 @@ public class CapacitySchedulerConfiguration extends Configuration { QueueState.valueOf(state.toUpperCase()) : QueueState.RUNNING; } + public void setAccessibleNodeLabels(String queue, Set<String> labels) { + if (labels == null) { + return; + } + String str = StringUtils.join(",", labels); + set(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS, str); + } + + public Set<String> getAccessibleNodeLabels(String queue) { + String accessibleLabelStr = + get(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS); + + // When accessible-label is null, + if (accessibleLabelStr == null) { + // Only return null when queue is not ROOT + if (!queue.equals(ROOT)) { + return null; + } + } else { + // print a warning when accessibleNodeLabel specified in config and queue + // is ROOT + if (queue.equals(ROOT)) { + LOG.warn("Accessible node labels for root queue will be ignored," + + " it will be automatically set to \"*\"."); + } + } + + // always return ANY for queue root + if (queue.equals(ROOT)) { + return ImmutableSet.of(RMNodeLabelsManager.ANY); + } + + // In other cases, split the accessibleLabelStr by "," + Set<String> set = new HashSet<String>(); + for (String str : accessibleLabelStr.split(",")) { + if (!str.trim().isEmpty()) { + set.add(str.trim()); + } + } + + // if labels contains "*", only keep ANY behind + if (set.contains(RMNodeLabelsManager.ANY)) { + set.clear(); + set.add(RMNodeLabelsManager.ANY); + } + return Collections.unmodifiableSet(set); + } + + public Map<String, Float> getNodeLabelCapacities(String queue, + Set<String> labels, RMNodeLabelsManager mgr) { + Map<String, Float> nodeLabelCapacities = new HashMap<String, Float>(); + + if (labels == null) { + return nodeLabelCapacities; + } + + for (String label : labels.contains(CommonNodeLabelsManager.ANY) ? mgr + .getClusterNodeLabels() : labels) { + // capacity of all labels in each queue should be 1 + if (org.apache.commons.lang.StringUtils.equals(ROOT, queue)) { + nodeLabelCapacities.put(label, 1.0f); + continue; + } + float capacity = + getFloat(getNodeLabelPrefix(queue, label) + CAPACITY, UNDEFINED); + if (capacity < MINIMUM_CAPACITY_VALUE + || capacity > MAXIMUM_CAPACITY_VALUE) { + throw new IllegalArgumentException("Illegal " + "capacity of " + + capacity + " for label=" + label + " in queue=" + queue); + } + if (LOG.isDebugEnabled()) { + LOG.debug("CSConf - getCapacityOfLabel: prefix=" + + getNodeLabelPrefix(queue, label) + ", capacity=" + capacity); + } + + nodeLabelCapacities.put(label, capacity / 100f); + } + return nodeLabelCapacities; + } + + public Map<String, Float> getMaximumNodeLabelCapacities(String queue, + Set<String> labels, RMNodeLabelsManager mgr) { + Map<String, Float> maximumNodeLabelCapacities = new HashMap<String, Float>(); + if (labels == null) { + return maximumNodeLabelCapacities; + } + + for (String label : labels.contains(CommonNodeLabelsManager.ANY) ? mgr + .getClusterNodeLabels() : labels) { + float maxCapacity = + getFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY, + UNDEFINED); + maxCapacity = (maxCapacity == DEFAULT_MAXIMUM_CAPACITY_VALUE) ? + MAXIMUM_CAPACITY_VALUE : maxCapacity; + if (maxCapacity < MINIMUM_CAPACITY_VALUE + || maxCapacity > MAXIMUM_CAPACITY_VALUE) { + throw new IllegalArgumentException("Illegal " + "capacity of " + + maxCapacity + " for label=" + label + " in queue=" + queue); + } + LOG.debug("CSConf - getCapacityOfLabel: prefix=" + + getNodeLabelPrefix(queue, label) + ", capacity=" + maxCapacity); + + maximumNodeLabelCapacities.put(label, maxCapacity / 100f); + } + return maximumNodeLabelCapacities; + } + + public String getDefaultNodeLabelExpression(String queue) { + return get(getQueuePrefix(queue) + DEFAULT_NODE_LABEL_EXPRESSION); + } + + public void setDefaultNodeLabelExpression(String queue, String exp) { + set(getQueuePrefix(queue) + DEFAULT_NODE_LABEL_EXPRESSION, exp); + } + /* * Returns whether we should continue to look at all heart beating nodes even * after the reservation limit was hit. The node heart beating in could
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index cab0318..ffeec63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -24,12 +24,14 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -52,36 +54,31 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; @Private @Unstable -public class LeafQueue implements CSQueue { +public class LeafQueue extends AbstractCSQueue { private static final Log LOG = LogFactory.getLog(LeafQueue.class); - private final String queueName; - private CSQueue parent; - private float capacity; - private float absoluteCapacity; - private float maximumCapacity; - private float absoluteMaxCapacity; private float absoluteUsedCapacity = 0.0f; private int userLimit; private float userLimitFactor; @@ -95,10 +92,6 @@ public class LeafQueue implements CSQueue { private int maxActiveApplicationsPerUser; private int nodeLocalityDelay; - - private Resource usedResources = Resources.createResource(0, 0); - private float usedCapacity = 0.0f; - private volatile int numContainers; Set<FiCaSchedulerApp> activeApplications; Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap = @@ -106,20 +99,9 @@ public class LeafQueue implements CSQueue { Set<FiCaSchedulerApp> pendingApplications; - private final Resource minimumAllocation; - private final Resource maximumAllocation; private final float minimumAllocationFactor; private Map<String, User> users = new HashMap<String, User>(); - - private final QueueMetrics metrics; - - private QueueInfo queueInfo; - - private QueueState state; - - private Map<QueueACL, AccessControlList> acls = - new HashMap<QueueACL, AccessControlList>(); private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -127,29 +109,18 @@ public class LeafQueue implements CSQueue { private CapacitySchedulerContext scheduler; private final ActiveUsersManager activeUsersManager; - - private final ResourceCalculator resourceCalculator; - - private boolean reservationsContinueLooking; + + // cache last cluster resource to compute actual capacity + private Resource lastClusterResource = Resources.none(); private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo(); public LeafQueue(CapacitySchedulerContext cs, - String queueName, CSQueue parent, CSQueue old) { + String queueName, CSQueue parent, CSQueue old) throws IOException { + super(cs, queueName, parent, old); this.scheduler = cs; - this.queueName = queueName; - this.parent = parent; - - this.resourceCalculator = cs.getResourceCalculator(); - // must be after parent and queueName are initialized - this.metrics = old != null ? old.getMetrics() : - QueueMetrics.forQueue(getQueuePath(), parent, - cs.getConfiguration().getEnableUserMetrics(), - cs.getConf()); this.activeUsersManager = new ActiveUsersManager(metrics); - this.minimumAllocation = cs.getMinimumResourceCapability(); - this.maximumAllocation = cs.getMaximumResourceCapability(); this.minimumAllocationFactor = Resources.ratio(resourceCalculator, Resources.subtract(maximumAllocation, minimumAllocation), @@ -167,7 +138,8 @@ public class LeafQueue implements CSQueue { float userLimitFactor = cs.getConfiguration().getUserLimitFactor(getQueuePath()); - int maxApplications = cs.getConfiguration().getMaximumApplicationsPerQueue(getQueuePath()); + int maxApplications = + cs.getConfiguration().getMaximumApplicationsPerQueue(getQueuePath()); if (maxApplications < 0) { int maxSystemApps = cs.getConfiguration().getMaximumSystemApplications(); maxApplications = (int)(maxSystemApps * absoluteCapacity); @@ -187,12 +159,10 @@ public class LeafQueue implements CSQueue { resourceCalculator, cs.getClusterResource(), this.minimumAllocation, maxAMResourcePerQueuePercent, absoluteCapacity); - int maxActiveApplicationsPerUser = - CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveAppsUsingAbsCap, userLimit, - userLimitFactor); + int maxActiveApplicationsPerUser = + CSQueueUtils.computeMaxActiveApplicationsPerUser( + maxActiveAppsUsingAbsCap, userLimit, userLimitFactor); - this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class); - this.queueInfo.setQueueName(queueName); this.queueInfo.setChildQueues(new ArrayList<QueueInfo>()); QueueState state = cs.getConfiguration().getState(getQueuePath()); @@ -200,14 +170,13 @@ public class LeafQueue implements CSQueue { Map<QueueACL, AccessControlList> acls = cs.getConfiguration().getAcls(getQueuePath()); - setupQueueConfigs( - cs.getClusterResource(), - capacity, absoluteCapacity, - maximumCapacity, absoluteMaxCapacity, - userLimit, userLimitFactor, + setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity, + maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor, maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser, - maxActiveApplications, maxActiveApplicationsPerUser, state, acls, - cs.getConfiguration().getNodeLocalityDelay(), + maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs + .getConfiguration().getNodeLocalityDelay(), accessibleLabels, + defaultLabelExpression, this.capacitiyByNodeLabels, + this.maxCapacityByNodeLabels, cs.getConfiguration().getReservationContinueLook()); if(LOG.isDebugEnabled()) { @@ -221,7 +190,7 @@ public class LeafQueue implements CSQueue { new TreeSet<FiCaSchedulerApp>(applicationComparator); this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator); } - + // externalizing in method, to allow overriding protected float getCapacityFromConf() { return (float)scheduler.getConfiguration().getCapacity(getQueuePath()) / 100; @@ -236,19 +205,22 @@ public class LeafQueue implements CSQueue { int maxApplicationsPerUser, int maxActiveApplications, int maxActiveApplicationsPerUser, QueueState state, Map<QueueACL, AccessControlList> acls, int nodeLocalityDelay, - boolean continueLooking) - { + Set<String> labels, String defaultLabelExpression, + Map<String, Float> capacitieByLabel, + Map<String, Float> maximumCapacitiesByLabel, + boolean revervationContinueLooking) throws IOException { + super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity, + maximumCapacity, absoluteMaxCapacity, state, acls, labels, + defaultLabelExpression, capacitieByLabel, maximumCapacitiesByLabel, + revervationContinueLooking); // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); float absCapacity = getParent().getAbsoluteCapacity() * capacity; - CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absCapacity, absoluteMaxCapacity); + CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absCapacity, + absoluteMaxCapacity); - this.capacity = capacity; this.absoluteCapacity = absCapacity; - this.maximumCapacity = maximumCapacity; - this.absoluteMaxCapacity = absoluteMaxCapacity; - this.userLimit = userLimit; this.userLimitFactor = userLimitFactor; @@ -258,27 +230,35 @@ public class LeafQueue implements CSQueue { this.maxActiveApplications = maxActiveApplications; this.maxActiveApplicationsPerUser = maxActiveApplicationsPerUser; - - this.state = state; - - this.acls = acls; - this.queueInfo.setCapacity(this.capacity); - this.queueInfo.setMaximumCapacity(this.maximumCapacity); - this.queueInfo.setQueueState(this.state); + if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels, + this.defaultLabelExpression)) { + throw new IOException("Invalid default label expression of " + + " queue=" + + queueInfo.getQueueName() + + " doesn't have permission to access all labels " + + "in default label expression. labelExpression of resource request=" + + (this.defaultLabelExpression == null ? "" + : this.defaultLabelExpression) + + ". Queue labels=" + + (queueInfo.getAccessibleNodeLabels() == null ? "" : StringUtils.join(queueInfo + .getAccessibleNodeLabels().iterator(), ','))); + } this.nodeLocalityDelay = nodeLocalityDelay; - this.reservationsContinueLooking = continueLooking; StringBuilder aclsString = new StringBuilder(); for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) { aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); } - - // Update metrics - CSQueueUtils.updateQueueStatistics( - resourceCalculator, this, getParent(), clusterResource, - minimumAllocation); + + StringBuilder labelStrBuilder = new StringBuilder(); + if (labels != null) { + for (String s : labels) { + labelStrBuilder.append(s); + labelStrBuilder.append(","); + } + } LOG.info("Initializing " + queueName + "\n" + "capacity = " + capacity + @@ -333,50 +313,12 @@ public class LeafQueue implements CSQueue { " [= configuredState ]" + "\n" + "acls = " + aclsString + " [= configuredAcls ]" + "\n" + + "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + + "labels=" + labelStrBuilder.toString() + "\n" + "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + "reservationsContinueLooking = " + reservationsContinueLooking + "\n"); } - - @Override - public synchronized float getCapacity() { - return capacity; - } - - @Override - public synchronized float getAbsoluteCapacity() { - return absoluteCapacity; - } - - @Override - public synchronized float getMaximumCapacity() { - return maximumCapacity; - } - - @Override - public synchronized float getAbsoluteMaximumCapacity() { - return absoluteMaxCapacity; - } - - @Override - public synchronized float getAbsoluteUsedCapacity() { - return absoluteUsedCapacity; - } - - @Override - public synchronized CSQueue getParent() { - return parent; - } - - @Override - public synchronized void setParent(CSQueue newParentQueue) { - this.parent = (ParentQueue)newParentQueue; - } - - @Override - public String getQueueName() { - return queueName; - } @Override public String getQueuePath() { @@ -387,22 +329,6 @@ public class LeafQueue implements CSQueue { * Used only by tests. */ @Private - public Resource getMinimumAllocation() { - return minimumAllocation; - } - - /** - * Used only by tests. - */ - @Private - public Resource getMaximumAllocation() { - return maximumAllocation; - } - - /** - * Used only by tests. - */ - @Private public float getMinimumAllocationFactor() { return minimumAllocationFactor; } @@ -437,45 +363,9 @@ public class LeafQueue implements CSQueue { } @Override - public synchronized float getUsedCapacity() { - return usedCapacity; - } - - @Override - public synchronized Resource getUsedResources() { - return usedResources; - } - - @Override public List<CSQueue> getChildQueues() { return null; } - - @Override - public synchronized void setUsedCapacity(float usedCapacity) { - this.usedCapacity = usedCapacity; - } - - @Override - public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) { - this.absoluteUsedCapacity = absUsedCapacity; - } - - /** - * Set maximum capacity - used only for testing. - * @param maximumCapacity new max capacity - */ - synchronized void setMaxCapacity(float maximumCapacity) { - // Sanity check - CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); - float absMaxCapacity = - CSQueueUtils.computeAbsoluteMaximumCapacity( - maximumCapacity, getParent()); - CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity); - - this.maximumCapacity = maximumCapacity; - this.absoluteMaxCapacity = absMaxCapacity; - } /** * Set user limit - used only for testing. @@ -569,11 +459,6 @@ public class LeafQueue implements CSQueue { return nodeLocalityDelay; } - @Private - boolean getReservationContinueLooking() { - return reservationsContinueLooking; - } - public String toString() { return queueName + ": " + "capacity=" + capacity + ", " + @@ -584,6 +469,11 @@ public class LeafQueue implements CSQueue { "numApps=" + getNumApplications() + ", " + "numContainers=" + getNumContainers(); } + + @VisibleForTesting + public synchronized void setNodeLabelManager(RMNodeLabelsManager mgr) { + this.labelManager = mgr; + } @VisibleForTesting public synchronized User getUser(String userName) { @@ -633,6 +523,10 @@ public class LeafQueue implements CSQueue { newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(), newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls, newlyParsedLeafQueue.getNodeLocalityDelay(), + newlyParsedLeafQueue.accessibleLabels, + newlyParsedLeafQueue.defaultLabelExpression, + newlyParsedLeafQueue.capacitiyByNodeLabels, + newlyParsedLeafQueue.maxCapacityByNodeLabels, newlyParsedLeafQueue.reservationsContinueLooking); // queue metrics are updated, more resource may be available @@ -641,19 +535,6 @@ public class LeafQueue implements CSQueue { } @Override - public boolean hasAccess(QueueACL acl, UserGroupInformation user) { - // Check if the leaf-queue allows access - synchronized (this) { - if (acls.get(acl).isUserAllowed(user)) { - return true; - } - } - - // Check if parent-queue allows access - return getParent().hasAccess(acl, user); - } - - @Override public void submitApplicationAttempt(FiCaSchedulerApp application, String userName) { // Careful! Locking order is important! @@ -749,7 +630,8 @@ public class LeafQueue implements CSQueue { } } - private synchronized void addApplicationAttempt(FiCaSchedulerApp application, User user) { + private synchronized void addApplicationAttempt(FiCaSchedulerApp application, + User user) { // Accept user.submitApplication(); pendingApplications.add(application); @@ -785,7 +667,8 @@ public class LeafQueue implements CSQueue { getParent().finishApplicationAttempt(application, queue); } - public synchronized void removeApplicationAttempt(FiCaSchedulerApp application, User user) { + public synchronized void removeApplicationAttempt( + FiCaSchedulerApp application, User user) { boolean wasActive = activeApplications.remove(application); if (!wasActive) { pendingApplications.remove(application); @@ -821,6 +704,21 @@ public class LeafQueue implements CSQueue { private static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true); + private static Set<String> getRequestLabelSetByExpression( + String labelExpression) { + Set<String> labels = new HashSet<String>(); + if (null == labelExpression) { + return labels; + } + for (String l : labelExpression.split("&&")) { + if (l.trim().isEmpty()) { + continue; + } + labels.add(l.trim()); + } + return labels; + } + @Override public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) { @@ -830,6 +728,12 @@ public class LeafQueue implements CSQueue { + " #applications=" + activeApplications.size()); } + // if our queue cannot access this node, just return + if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, + labelManager.getLabelsOnNode(node.getNodeID()))) { + return NULL_ASSIGNMENT; + } + // Check for reserved resources RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { @@ -879,6 +783,10 @@ public class LeafQueue implements CSQueue { continue; } } + + Set<String> requestedNodeLabels = + getRequestLabelSetByExpression(anyRequest + .getNodeLabelExpression()); // Compute user-limit & set headroom // Note: We compute both user-limit & headroom with the highest @@ -887,16 +795,17 @@ public class LeafQueue implements CSQueue { // before all higher priority ones are serviced. Resource userLimit = computeUserLimitAndSetHeadroom(application, clusterResource, - required); + required, requestedNodeLabels); // Check queue max-capacity limit - if (!assignToQueue(clusterResource, required, application, true)) { + if (!canAssignToThisQueue(clusterResource, required, + labelManager.getLabelsOnNode(node.getNodeID()), application, true)) { return NULL_ASSIGNMENT; } // Check user limit if (!assignToUser(clusterResource, application.getUser(), userLimit, - application, true)) { + application, true, requestedNodeLabels)) { break; } @@ -922,7 +831,8 @@ public class LeafQueue implements CSQueue { // Book-keeping // Note: Update headroom to account for current allocation too... - allocateResource(clusterResource, application, assigned); + allocateResource(clusterResource, application, assigned, + labelManager.getLabelsOnNode(node.getNodeID())); // Don't reset scheduling opportunities for non-local assignments // otherwise the app will be delayed for each non-local assignment. @@ -976,7 +886,7 @@ public class LeafQueue implements CSQueue { protected Resource getHeadroom(User user, Resource queueMaxCap, Resource clusterResource, FiCaSchedulerApp application, Resource required) { return getHeadroom(user, queueMaxCap, clusterResource, - computeUserLimit(application, clusterResource, required, user)); + computeUserLimit(application, clusterResource, required, user, null)); } private Resource getHeadroom(User user, Resource queueMaxCap, @@ -1000,33 +910,49 @@ public class LeafQueue implements CSQueue { */ Resource headroom = Resources.min(resourceCalculator, clusterResource, - Resources.subtract(userLimit, user.getConsumedResources()), + Resources.subtract(userLimit, user.getTotalConsumedResources()), Resources.subtract(queueMaxCap, usedResources) ); return headroom; } - - @Private - protected synchronized boolean assignToQueue(Resource clusterResource, - Resource required, FiCaSchedulerApp application, + synchronized boolean canAssignToThisQueue(Resource clusterResource, + Resource required, Set<String> nodeLabels, FiCaSchedulerApp application, boolean checkReservations) { - - Resource potentialTotalResource = Resources.add(usedResources, required); - // Check how of the cluster's absolute capacity we are currently using... - float potentialNewCapacity = Resources.divide(resourceCalculator, - clusterResource, potentialTotalResource, clusterResource); - if (potentialNewCapacity > absoluteMaxCapacity) { + // Get label of this queue can access, it's (nodeLabel AND queueLabel) + Set<String> labelCanAccess; + if (null == nodeLabels || nodeLabels.isEmpty()) { + labelCanAccess = new HashSet<String>(); + // Any queue can always access any node without label + labelCanAccess.add(RMNodeLabelsManager.NO_LABEL); + } else { + labelCanAccess = new HashSet<String>(Sets.intersection(accessibleLabels, nodeLabels)); + } + + boolean canAssign = true; + for (String label : labelCanAccess) { + if (!usedResourcesByNodeLabels.containsKey(label)) { + usedResourcesByNodeLabels.put(label, Resources.createResource(0)); + } + + Resource potentialTotalCapacity = + Resources.add(usedResourcesByNodeLabels.get(label), required); + + float potentialNewCapacity = + Resources.divide(resourceCalculator, clusterResource, + potentialTotalCapacity, + labelManager.getResourceByLabel(label, clusterResource)); // if enabled, check to see if could we potentially use this node instead // of a reserved node if the application has reserved containers - if (this.reservationsContinueLooking && checkReservations) { - + // TODO, now only consider reservation cases when the node has no label + if (this.reservationsContinueLooking && checkReservations + && label.equals(RMNodeLabelsManager.NO_LABEL)) { float potentialNewWithoutReservedCapacity = Resources.divide( resourceCalculator, clusterResource, - Resources.subtract(potentialTotalResource, - application.getCurrentReservation()), - clusterResource); + Resources.subtract(potentialTotalCapacity, + application.getCurrentReservation()), + labelManager.getResourceByLabel(label, clusterResource)); if (potentialNewWithoutReservedCapacity <= absoluteMaxCapacity) { if (LOG.isDebugEnabled()) { @@ -1048,35 +974,43 @@ public class LeafQueue implements CSQueue { // we could potentially use this node instead of reserved node return true; } - } + + // Otherwise, if any of the label of this node beyond queue limit, we + // cannot allocate on this node. Consider a small epsilon here. + if (potentialNewCapacity > getAbsoluteMaximumCapacityByNodeLabel(label) + 1e-4) { + canAssign = false; + break; + } + if (LOG.isDebugEnabled()) { LOG.debug(getQueueName() - + " usedResources: " + usedResources + + "Check assign to queue, label=" + label + + " usedResources: " + usedResourcesByNodeLabels.get(label) + " clusterResources: " + clusterResource + " currentCapacity " + Resources.divide(resourceCalculator, clusterResource, - usedResources, clusterResource) + " required " + required + usedResourcesByNodeLabels.get(label), + labelManager.getResourceByLabel(label, clusterResource)) + " potentialNewCapacity: " + potentialNewCapacity + " ( " + " max-capacity: " + absoluteMaxCapacity + ")"); } - return false; } - return true; + + return canAssign; } - - @Lock({LeafQueue.class, FiCaSchedulerApp.class}) - Resource computeUserLimitAndSetHeadroom( - FiCaSchedulerApp application, Resource clusterResource, Resource required) { - + Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, + Resource clusterResource, Resource required, Set<String> requestedLabels) { String user = application.getUser(); - User queueUser = getUser(user); - Resource userLimit = // User limit - computeUserLimit(application, clusterResource, required, queueUser); + // Compute user limit respect requested labels, + // TODO, need consider headroom respect labels also + Resource userLimit = + computeUserLimit(application, clusterResource, required, + queueUser, requestedLabels); //Max avail capacity needs to take into account usage by ancestor-siblings //which are greater than their base capacity, so we are interested in "max avail" @@ -1096,13 +1030,14 @@ public class LeafQueue implements CSQueue { queueHeadroomInfo.setClusterResource(clusterResource); } - Resource headroom = getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit); + Resource headroom = + getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit); if (LOG.isDebugEnabled()) { LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" + userLimit + " queueMaxCap=" + queueMaxCap + - " consumed=" + queueUser.getConsumedResources() + + " consumed=" + queueUser.getTotalConsumedResources() + " headroom=" + headroom); } @@ -1117,24 +1052,42 @@ public class LeafQueue implements CSQueue { } @Lock(NoLock.class) - private Resource computeUserLimit(FiCaSchedulerApp application, - Resource clusterResource, Resource required, User user) { + private Resource computeUserLimit(FiCaSchedulerApp application, + Resource clusterResource, Resource required, User user, + Set<String> requestedLabels) { // What is our current capacity? // * It is equal to the max(required, queue-capacity) if // we're running below capacity. The 'max' ensures that jobs in queues // with miniscule capacity (< 1 slot) make progress // * If we're running over capacity, then its // (usedResources + required) (which extra resources we are allocating) + Resource queueCapacity = Resource.newInstance(0, 0); + if (requestedLabels != null && !requestedLabels.isEmpty()) { + // if we have multiple labels to request, we will choose to use the first + // label + String firstLabel = requestedLabels.iterator().next(); + queueCapacity = + Resources + .max(resourceCalculator, clusterResource, queueCapacity, + Resources.multiplyAndNormalizeUp(resourceCalculator, + labelManager.getResourceByLabel(firstLabel, + clusterResource), + getAbsoluteCapacityByNodeLabel(firstLabel), + minimumAllocation)); + } else { + // else there's no label on request, just to use absolute capacity as + // capacity for nodes without label + queueCapacity = + Resources.multiplyAndNormalizeUp(resourceCalculator, labelManager + .getResourceByLabel(CommonNodeLabelsManager.NO_LABEL, clusterResource), + absoluteCapacity, minimumAllocation); + } // Allow progress for queues with miniscule capacity - final Resource queueCapacity = + queueCapacity = Resources.max( resourceCalculator, clusterResource, - Resources.multiplyAndNormalizeUp( - resourceCalculator, - clusterResource, - absoluteCapacity, - minimumAllocation), + queueCapacity, required); Resource currentCapacity = @@ -1175,7 +1128,7 @@ public class LeafQueue implements CSQueue { " userLimit=" + userLimit + " userLimitFactor=" + userLimitFactor + " required: " + required + - " consumed: " + user.getConsumedResources() + + " consumed: " + user.getTotalConsumedResources() + " limit: " + limit + " queueCapacity: " + queueCapacity + " qconsumed: " + usedResources + @@ -1191,28 +1144,33 @@ public class LeafQueue implements CSQueue { @Private protected synchronized boolean assignToUser(Resource clusterResource, String userName, Resource limit, FiCaSchedulerApp application, - boolean checkReservations) { - + boolean checkReservations, Set<String> requestLabels) { User user = getUser(userName); + + String label = CommonNodeLabelsManager.NO_LABEL; + if (requestLabels != null && !requestLabels.isEmpty()) { + label = requestLabels.iterator().next(); + } // Note: We aren't considering the current request since there is a fixed // overhead of the AM, but it's a > check, not a >= check, so... - if (Resources.greaterThan(resourceCalculator, clusterResource, - user.getConsumedResources(), limit)) { - + if (Resources + .greaterThan(resourceCalculator, clusterResource, + user.getConsumedResourceByLabel(label), + limit)) { // if enabled, check to see if could we potentially use this node instead // of a reserved node if the application has reserved containers if (this.reservationsContinueLooking && checkReservations) { if (Resources.lessThanOrEqual( resourceCalculator, clusterResource, - Resources.subtract(user.getConsumedResources(), + Resources.subtract(user.getTotalConsumedResources(), application.getCurrentReservation()), limit)) { if (LOG.isDebugEnabled()) { LOG.debug("User " + userName + " in queue " + getQueueName() + " will exceed limit based on reservations - " + " consumed: " - + user.getConsumedResources() + " reserved: " + + user.getTotalConsumedResources() + " reserved: " + application.getCurrentReservation() + " limit: " + limit); } return true; @@ -1221,14 +1179,15 @@ public class LeafQueue implements CSQueue { if (LOG.isDebugEnabled()) { LOG.debug("User " + userName + " in queue " + getQueueName() + " will exceed limit - " + " consumed: " - + user.getConsumedResources() + " limit: " + limit); + + user.getTotalConsumedResources() + " limit: " + limit); } return false; } return true; } - boolean needContainers(FiCaSchedulerApp application, Priority priority, Resource required) { + boolean needContainers(FiCaSchedulerApp application, Priority priority, + Resource required) { int requiredContainers = application.getTotalRequiredResources(priority); int reservedContainers = application.getNumReservedContainers(priority); int starvation = 0; @@ -1258,10 +1217,9 @@ public class LeafQueue implements CSQueue { return (((starvation + requiredContainers) - reservedContainers) > 0); } - private CSAssignment assignContainersOnNode(Resource clusterResource, - FiCaSchedulerNode node, FiCaSchedulerApp application, - Priority priority, RMContainer reservedContainer, boolean needToUnreserve) { - + private CSAssignment assignContainersOnNode(Resource clusterResource, + FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, + RMContainer reservedContainer, boolean needToUnreserve) { Resource assigned = Resources.none(); // Data-local @@ -1366,10 +1324,11 @@ public class LeafQueue implements CSQueue { // we can't reserve if we got here based on the limit // checks assuming we could unreserve!!! Resource userLimit = computeUserLimitAndSetHeadroom(application, - clusterResource, capability); + clusterResource, capability, null); - // Check queue max-capacity limit - if (!assignToQueue(clusterResource, capability, application, false)) { + // Check queue max-capacity limit, + // TODO: Consider reservation on labels + if (!canAssignToThisQueue(clusterResource, capability, null, application, false)) { if (LOG.isDebugEnabled()) { LOG.debug("was going to reserve but hit queue limit"); } @@ -1378,7 +1337,7 @@ public class LeafQueue implements CSQueue { // Check user limit if (!assignToUser(clusterResource, application.getUser(), userLimit, - application, false)) { + application, false, null)) { if (LOG.isDebugEnabled()) { LOG.debug("was going to reserve but hit user limit"); } @@ -1516,6 +1475,20 @@ public class LeafQueue implements CSQueue { + " request=" + request + " type=" + type + " needToUnreserve= " + needToUnreserve); } + + // check if the resource request can access the label + if (!SchedulerUtils.checkNodeLabelExpression( + labelManager.getLabelsOnNode(node.getNodeID()), + request.getNodeLabelExpression())) { + // this is a reserved container, but we cannot allocate it now according + // to label not match. This can be caused by node label changed + // We should un-reserve this container. + if (rmContainer != null) { + unreserve(application, priority, node, rmContainer); + } + return Resources.none(); + } + Resource capability = request.getCapability(); Resource available = node.getAvailableResource(); Resource totalResource = node.getTotalResource(); @@ -1695,8 +1668,9 @@ public class LeafQueue implements CSQueue { // Book-keeping if (removed) { - releaseResource(clusterResource, - application, container.getResource()); + releaseResource(clusterResource, application, + container.getResource(), + labelManager.getLabelsOnNode(node.getNodeID())); LOG.info("completedContainer" + " container=" + container + " queue=" + this + @@ -1712,18 +1686,18 @@ public class LeafQueue implements CSQueue { } } - synchronized void allocateResource(Resource clusterResource, - SchedulerApplicationAttempt application, Resource resource) { - // Update queue metrics - Resources.addTo(usedResources, resource); - CSQueueUtils.updateQueueStatistics( - resourceCalculator, this, getParent(), clusterResource, minimumAllocation); - ++numContainers; - + synchronized void allocateResource(Resource clusterResource, + SchedulerApplicationAttempt application, Resource resource, + Set<String> nodeLabels) { + super.allocateResource(clusterResource, resource, nodeLabels); + // Update user metrics String userName = application.getUser(); User user = getUser(userName); - user.assignContainer(resource); + user.assignContainer(resource, nodeLabels); + // Note this is a bit unconventional since it gets the object and modifies + // it here, rather then using set routine + Resources.subtractFrom(application.getHeadroom(), resource); // headroom metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); if (LOG.isDebugEnabled()) { @@ -1731,33 +1705,30 @@ public class LeafQueue implements CSQueue { " user=" + userName + " used=" + usedResources + " numContainers=" + numContainers + " headroom = " + application.getHeadroom() + - " user-resources=" + user.getConsumedResources() + " user-resources=" + user.getTotalConsumedResources() ); } } synchronized void releaseResource(Resource clusterResource, - FiCaSchedulerApp application, Resource resource) { - // Update queue metrics - Resources.subtractFrom(usedResources, resource); - CSQueueUtils.updateQueueStatistics( - resourceCalculator, this, getParent(), clusterResource, - minimumAllocation); - --numContainers; - + FiCaSchedulerApp application, Resource resource, Set<String> nodeLabels) { + super.releaseResource(clusterResource, resource, nodeLabels); + // Update user metrics String userName = application.getUser(); User user = getUser(userName); - user.releaseContainer(resource); + user.releaseContainer(resource, nodeLabels); metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); LOG.info(getQueueName() + " used=" + usedResources + " numContainers=" + numContainers + - " user=" + userName + " user-resources=" + user.getConsumedResources()); + " user=" + userName + " user-resources=" + user.getTotalConsumedResources()); } @Override public synchronized void updateClusterResource(Resource clusterResource) { + lastClusterResource = clusterResource; + // Update queue properties maxActiveApplications = CSQueueUtils.computeMaxActiveApplications( @@ -1786,25 +1757,29 @@ public class LeafQueue implements CSQueue { for (FiCaSchedulerApp application : activeApplications) { synchronized (application) { computeUserLimitAndSetHeadroom(application, clusterResource, - Resources.none()); + Resources.none(), null); } } } - - @Override - public QueueMetrics getMetrics() { - return metrics; - } @VisibleForTesting public static class User { Resource consumed = Resources.createResource(0, 0); + Map<String, Resource> consumedByLabel = new HashMap<String, Resource>(); int pendingApplications = 0; int activeApplications = 0; - public Resource getConsumedResources() { + public Resource getTotalConsumedResources() { return consumed; } + + public Resource getConsumedResourceByLabel(String label) { + Resource r = consumedByLabel.get(label); + if (null != r) { + return r; + } + return Resources.none(); + } public int getPendingApplications() { return pendingApplications; @@ -1836,12 +1811,46 @@ public class LeafQueue implements CSQueue { } } - public synchronized void assignContainer(Resource resource) { + public synchronized void assignContainer(Resource resource, + Set<String> nodeLabels) { Resources.addTo(consumed, resource); + + if (nodeLabels == null || nodeLabels.isEmpty()) { + if (!consumedByLabel.containsKey(RMNodeLabelsManager.NO_LABEL)) { + consumedByLabel.put(RMNodeLabelsManager.NO_LABEL, + Resources.createResource(0)); + } + Resources.addTo(consumedByLabel.get(RMNodeLabelsManager.NO_LABEL), + resource); + } else { + for (String label : nodeLabels) { + if (!consumedByLabel.containsKey(label)) { + consumedByLabel.put(label, Resources.createResource(0)); + } + Resources.addTo(consumedByLabel.get(label), resource); + } + } } - public synchronized void releaseContainer(Resource resource) { + public synchronized void releaseContainer(Resource resource, Set<String> nodeLabels) { Resources.subtractFrom(consumed, resource); + + // Update usedResources by labels + if (nodeLabels == null || nodeLabels.isEmpty()) { + if (!consumedByLabel.containsKey(RMNodeLabelsManager.NO_LABEL)) { + consumedByLabel.put(RMNodeLabelsManager.NO_LABEL, + Resources.createResource(0)); + } + Resources.subtractFrom( + consumedByLabel.get(RMNodeLabelsManager.NO_LABEL), resource); + } else { + for (String label : nodeLabels) { + if (!consumedByLabel.containsKey(label)) { + consumedByLabel.put(label, Resources.createResource(0)); + } + Resources.subtractFrom(consumedByLabel.get(label), resource); + } + } } } @@ -1854,7 +1863,8 @@ public class LeafQueue implements CSQueue { // Careful! Locking order is important! synchronized (this) { allocateResource(clusterResource, attempt, rmContainer.getContainer() - .getResource()); + .getResource(), labelManager.getLabelsOnNode(rmContainer + .getContainer().getNodeId())); } getParent().recoverContainer(clusterResource, attempt, rmContainer); } @@ -1892,7 +1902,8 @@ public class LeafQueue implements CSQueue { FiCaSchedulerApp application, RMContainer rmContainer) { if (application != null) { allocateResource(clusterResource, application, rmContainer.getContainer() - .getResource()); + .getResource(), labelManager.getLabelsOnNode(rmContainer + .getContainer().getNodeId())); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + " resource=" + rmContainer.getContainer().getResource() + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() @@ -1908,7 +1919,8 @@ public class LeafQueue implements CSQueue { FiCaSchedulerApp application, RMContainer rmContainer) { if (application != null) { releaseResource(clusterResource, application, rmContainer.getContainer() - .getResource()); + .getResource(), labelManager.getLabelsOnNode(rmContainer.getContainer() + .getNodeId())); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + " resource=" + rmContainer.getContainer().getResource() + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() @@ -1919,6 +1931,24 @@ public class LeafQueue implements CSQueue { } } + @Override + public float getAbsActualCapacity() { + if (Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, + lastClusterResource, Resources.none())) { + return absoluteCapacity; + } + + Resource resourceRespectLabels = + labelManager == null ? lastClusterResource : labelManager + .getQueueResource(queueName, accessibleLabels, lastClusterResource); + float absActualCapacity = + Resources.divide(resourceCalculator, lastClusterResource, + resourceRespectLabels, lastClusterResource); + + return absActualCapacity > absoluteCapacity ? absoluteCapacity + : absActualCapacity; + } + public void setCapacity(float capacity) { this.capacity = capacity; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 011c99c..6ffaf4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -23,12 +23,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -46,77 +48,42 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.collect.Sets; + @Private @Evolving -public class ParentQueue implements CSQueue { +public class ParentQueue extends AbstractCSQueue { private static final Log LOG = LogFactory.getLog(ParentQueue.class); - private CSQueue parent; - private final String queueName; - - private float capacity; - private float maximumCapacity; - private float absoluteCapacity; - private float absoluteMaxCapacity; - private float absoluteUsedCapacity = 0.0f; - - private float usedCapacity = 0.0f; - - protected final Set<CSQueue> childQueues; - private final Comparator<CSQueue> queueComparator; - - private Resource usedResources = Resources.createResource(0, 0); - + protected final Set<CSQueue> childQueues; private final boolean rootQueue; - - private final Resource minimumAllocation; - - private volatile int numApplications; - private volatile int numContainers; - - private QueueState state; - - private final QueueMetrics metrics; - - private QueueInfo queueInfo; - - private Map<QueueACL, AccessControlList> acls = - new HashMap<QueueACL, AccessControlList>(); + final Comparator<CSQueue> queueComparator; + volatile int numApplications; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - private final ResourceCalculator resourceCalculator; - - private boolean reservationsContinueLooking; - public ParentQueue(CapacitySchedulerContext cs, - String queueName, CSQueue parent, CSQueue old) { - minimumAllocation = cs.getMinimumResourceCapability(); + String queueName, CSQueue parent, CSQueue old) throws IOException { + super(cs, queueName, parent, old); - this.parent = parent; - this.queueName = queueName; - this.rootQueue = (parent == null); - this.resourceCalculator = cs.getResourceCalculator(); + this.queueComparator = cs.getQueueComparator(); - // must be called after parent and queueName is set - this.metrics = old != null ? old.getMetrics() : - QueueMetrics.forQueue(getQueuePath(), parent, - cs.getConfiguration().getEnableUserMetrics(), - cs.getConf()); + this.rootQueue = (parent == null); float rawCapacity = cs.getConfiguration().getCapacity(getQueuePath()); @@ -141,17 +108,14 @@ public class ParentQueue implements CSQueue { Map<QueueACL, AccessControlList> acls = cs.getConfiguration().getAcls(getQueuePath()); - - this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class); - this.queueInfo.setQueueName(queueName); + this.queueInfo.setChildQueues(new ArrayList<QueueInfo>()); - setupQueueConfigs(cs.getClusterResource(), - capacity, absoluteCapacity, - maximumCapacity, absoluteMaxCapacity, state, acls, + setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity, + maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels, + defaultLabelExpression, capacitiyByNodeLabels, maxCapacityByNodeLabels, cs.getConfiguration().getReservationContinueLook()); - this.queueComparator = cs.getQueueComparator(); this.childQueues = new TreeSet<CSQueue>(queueComparator); LOG.info("Initialized parent-queue " + queueName + @@ -159,41 +123,29 @@ public class ParentQueue implements CSQueue { ", fullname=" + getQueuePath()); } - protected synchronized void setupQueueConfigs( - Resource clusterResource, - float capacity, float absoluteCapacity, - float maximumCapacity, float absoluteMaxCapacity, + synchronized void setupQueueConfigs(Resource clusterResource, float capacity, + float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity, QueueState state, Map<QueueACL, AccessControlList> acls, - boolean continueLooking - ) { - // Sanity check - CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); - CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absoluteMaxCapacity); - - this.capacity = capacity; - this.absoluteCapacity = absoluteCapacity; - - this.maximumCapacity = maximumCapacity; - this.absoluteMaxCapacity = absoluteMaxCapacity; - - this.state = state; - - this.acls = acls; - - this.queueInfo.setCapacity(this.capacity); - this.queueInfo.setMaximumCapacity(this.maximumCapacity); - this.queueInfo.setQueueState(this.state); - - this.reservationsContinueLooking = continueLooking; - - StringBuilder aclsString = new StringBuilder(); + Set<String> accessibleLabels, String defaultLabelExpression, + Map<String, Float> nodeLabelCapacities, + Map<String, Float> maximumCapacitiesByLabel, + boolean reservationContinueLooking) throws IOException { + super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity, + maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels, + defaultLabelExpression, nodeLabelCapacities, maximumCapacitiesByLabel, + reservationContinueLooking); + StringBuilder aclsString = new StringBuilder(); for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) { aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); } - // Update metrics - CSQueueUtils.updateQueueStatistics( - resourceCalculator, this, parent, clusterResource, minimumAllocation); + StringBuilder labelStrBuilder = new StringBuilder(); + if (accessibleLabels != null) { + for (String s : accessibleLabels) { + labelStrBuilder.append(s); + labelStrBuilder.append(","); + } + } LOG.info(queueName + ", capacity=" + capacity + @@ -201,13 +153,13 @@ public class ParentQueue implements CSQueue { ", maxCapacity=" + maximumCapacity + ", asboluteMaxCapacity=" + absoluteMaxCapacity + ", state=" + state + - ", acls=" + aclsString + + ", acls=" + aclsString + + ", labels=" + labelStrBuilder.toString() + "\n" + ", reservationsContinueLooking=" + reservationsContinueLooking); } private static float PRECISION = 0.0005f; // 0.05% precision void setChildQueues(Collection<CSQueue> childQueues) { - // Validate float childCapacities = 0; for (CSQueue queue : childQueues) { @@ -221,6 +173,21 @@ public class ParentQueue implements CSQueue { " capacity of " + childCapacities + " for children of queue " + queueName); } + // check label capacities + for (String nodeLabel : labelManager.getClusterNodeLabels()) { + float capacityByLabel = getCapacityByNodeLabel(nodeLabel); + // check children's labels + float sum = 0; + for (CSQueue queue : childQueues) { + sum += queue.getCapacityByNodeLabel(nodeLabel); + } + if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION) + || (capacityByLabel == 0) && (sum > 0)) { + throw new IllegalArgumentException("Illegal" + " capacity of " + + sum + " for children of queue " + queueName + + " for label=" + nodeLabel); + } + } this.childQueues.clear(); this.childQueues.addAll(childQueues); @@ -228,21 +195,6 @@ public class ParentQueue implements CSQueue { LOG.debug("setChildQueues: " + getChildQueuesToPrint()); } } - - @Override - public synchronized CSQueue getParent() { - return parent; - } - - @Override - public synchronized void setParent(CSQueue newParentQueue) { - this.parent = (ParentQueue)newParentQueue; - } - - @Override - public String getQueueName() { - return queueName; - } @Override public String getQueuePath() { @@ -251,65 +203,6 @@ public class ParentQueue implements CSQueue { } @Override - public synchronized float getCapacity() { - return capacity; - } - - @Override - public synchronized float getAbsoluteCapacity() { - return absoluteCapacity; - } - - @Override - public float getAbsoluteMaximumCapacity() { - return absoluteMaxCapacity; - } - - @Override - public synchronized float getAbsoluteUsedCapacity() { - return absoluteUsedCapacity; - } - - @Override - public float getMaximumCapacity() { - return maximumCapacity; - } - - @Override - public ActiveUsersManager getActiveUsersManager() { - // Should never be called since all applications are submitted to LeafQueues - return null; - } - - @Override - public synchronized float getUsedCapacity() { - return usedCapacity; - } - - @Override - public synchronized Resource getUsedResources() { - return usedResources; - } - - @Override - public synchronized List<CSQueue> getChildQueues() { - return new ArrayList<CSQueue>(childQueues); - } - - public synchronized int getNumContainers() { - return numContainers; - } - - public synchronized int getNumApplications() { - return numApplications; - } - - @Override - public synchronized QueueState getState() { - return state; - } - - @Override public synchronized QueueInfo getQueueInfo( boolean includeChildQueues, boolean recursive) { queueInfo.setCurrentCapacity(usedCapacity); @@ -391,6 +284,10 @@ public class ParentQueue implements CSQueue { newlyParsedParentQueue.absoluteMaxCapacity, newlyParsedParentQueue.state, newlyParsedParentQueue.acls, + newlyParsedParentQueue.accessibleLabels, + newlyParsedParentQueue.defaultLabelExpression, + newlyParsedParentQueue.capacitiyByNodeLabels, + newlyParsedParentQueue.maxCapacityByNodeLabels, newlyParsedParentQueue.reservationsContinueLooking); // Re-configure existing child queues and add new ones @@ -434,21 +331,6 @@ public class ParentQueue implements CSQueue { } return queuesMap; } - - @Override - public boolean hasAccess(QueueACL acl, UserGroupInformation user) { - synchronized (this) { - if (acls.get(acl).isUserAllowed(user)) { - return true; - } - } - - if (parent != null) { - return parent.hasAccess(acl, user); - } - - return false; - } @Override public void submitApplication(ApplicationId applicationId, String user, @@ -521,7 +403,7 @@ public class ParentQueue implements CSQueue { } } - public synchronized void removeApplication(ApplicationId applicationId, + private synchronized void removeApplication(ApplicationId applicationId, String user) { --numApplications; @@ -532,30 +414,6 @@ public class ParentQueue implements CSQueue { " leaf-queue of parent: " + getQueueName() + " #applications: " + getNumApplications()); } - - @Override - public synchronized void setUsedCapacity(float usedCapacity) { - this.usedCapacity = usedCapacity; - } - - @Override - public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) { - this.absoluteUsedCapacity = absUsedCapacity; - } - - /** - * Set maximum capacity - used only for testing. - * @param maximumCapacity new max capacity - */ - synchronized void setMaxCapacity(float maximumCapacity) { - // Sanity check - CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); - float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent); - CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity); - - this.maximumCapacity = maximumCapacity; - this.absoluteMaxCapacity = absMaxCapacity; - } @Override public synchronized CSAssignment assignContainers( @@ -563,6 +421,12 @@ public class ParentQueue implements CSQueue { CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); + // if our queue cannot access this node, just return + if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, + labelManager.getLabelsOnNode(node.getNodeID()))) { + return assignment; + } + while (canAssign(clusterResource, node)) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to assign containers to child-queue of " @@ -570,8 +434,10 @@ public class ParentQueue implements CSQueue { } boolean localNeedToUnreserve = false; + Set<String> nodeLabels = labelManager.getLabelsOnNode(node.getNodeID()); + // Are we over maximum-capacity for this queue? - if (!assignToQueue(clusterResource)) { + if (!canAssignToThisQueue(clusterResource, nodeLabels)) { // check to see if we could if we unreserve first localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource); if (!localNeedToUnreserve) { @@ -589,7 +455,8 @@ public class ParentQueue implements CSQueue { resourceCalculator, clusterResource, assignedToChild.getResource(), Resources.none())) { // Track resource utilization for the parent-queue - allocateResource(clusterResource, assignedToChild.getResource()); + super.allocateResource(clusterResource, assignedToChild.getResource(), + nodeLabels); // Track resource utilization in this pass of the scheduler Resources.addTo(assignment.getResource(), assignedToChild.getResource()); @@ -628,22 +495,41 @@ public class ParentQueue implements CSQueue { return assignment; } - private synchronized boolean assignToQueue(Resource clusterResource) { - // Check how of the cluster's absolute capacity we are currently using... - float currentCapacity = - Resources.divide( - resourceCalculator, clusterResource, - usedResources, clusterResource); + private synchronized boolean canAssignToThisQueue(Resource clusterResource, + Set<String> nodeLabels) { + Set<String> labelCanAccess = + new HashSet<String>( + accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels + : Sets.intersection(accessibleLabels, nodeLabels)); + if (nodeLabels.isEmpty()) { + // Any queue can always access any node without label + labelCanAccess.add(RMNodeLabelsManager.NO_LABEL); + } - if (currentCapacity >= absoluteMaxCapacity) { - LOG.info(getQueueName() + - " used=" + usedResources + - " current-capacity (" + currentCapacity + ") " + - " >= max-capacity (" + absoluteMaxCapacity + ")"); - return false; + boolean canAssign = true; + for (String label : labelCanAccess) { + if (!usedResourcesByNodeLabels.containsKey(label)) { + usedResourcesByNodeLabels.put(label, Resources.createResource(0)); + } + float currentAbsoluteLabelUsedCapacity = + Resources.divide(resourceCalculator, clusterResource, + usedResourcesByNodeLabels.get(label), + labelManager.getResourceByLabel(label, clusterResource)); + // if any of the label doesn't beyond limit, we can allocate on this node + if (currentAbsoluteLabelUsedCapacity >= + getAbsoluteMaximumCapacityByNodeLabel(label)) { + if (LOG.isDebugEnabled()) { + LOG.debug(getQueueName() + " used=" + usedResources + + " current-capacity (" + usedResourcesByNodeLabels.get(label) + ") " + + " >= max-capacity (" + + labelManager.getResourceByLabel(label, clusterResource) + ")"); + } + canAssign = false; + break; + } } - return true; - + + return canAssign; } @@ -685,7 +571,7 @@ public class ParentQueue implements CSQueue { node.getAvailableResource(), minimumAllocation); } - synchronized CSAssignment assignContainersToChildQueues(Resource cluster, + private synchronized CSAssignment assignContainersToChildQueues(Resource cluster, FiCaSchedulerNode node, boolean needToUnreserve) { CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); @@ -728,11 +614,16 @@ public class ParentQueue implements CSQueue { String getChildQueuesToPrint() { StringBuilder sb = new StringBuilder(); for (CSQueue q : childQueues) { - sb.append(q.getQueuePath() + "(" + q.getUsedCapacity() + "), "); + sb.append(q.getQueuePath() + + "usedCapacity=(" + q.getUsedCapacity() + "), " + + " label=(" + + StringUtils.join(q.getAccessibleNodeLabels().iterator(), ",") + + ")"); } return sb.toString(); } - void printChildQueues() { + + private void printChildQueues() { if (LOG.isDebugEnabled()) { LOG.debug("printChildQueues - queue: " + getQueuePath() + " child-queues: " + getChildQueuesToPrint()); @@ -749,8 +640,8 @@ public class ParentQueue implements CSQueue { // Careful! Locking order is important! // Book keeping synchronized (this) { - releaseResource(clusterResource, - rmContainer.getContainer().getResource()); + super.releaseResource(clusterResource, rmContainer.getContainer() + .getResource(), labelManager.getLabelsOnNode(node.getNodeID())); LOG.info("completedContainer" + " queue=" + getQueueName() + @@ -787,27 +678,6 @@ public class ParentQueue implements CSQueue { } } - @Private - boolean getReservationContinueLooking() { - return reservationsContinueLooking; - } - - synchronized void allocateResource(Resource clusterResource, - Resource resource) { - Resources.addTo(usedResources, resource); - CSQueueUtils.updateQueueStatistics( - resourceCalculator, this, parent, clusterResource, minimumAllocation); - ++numContainers; - } - - synchronized void releaseResource(Resource clusterResource, - Resource resource) { - Resources.subtractFrom(usedResources, resource); - CSQueueUtils.updateQueueStatistics( - resourceCalculator, this, parent, clusterResource, minimumAllocation); - --numContainers; - } - @Override public synchronized void updateClusterResource(Resource clusterResource) { // Update all children @@ -821,10 +691,9 @@ public class ParentQueue implements CSQueue { } @Override - public QueueMetrics getMetrics() { - return metrics; + public synchronized List<CSQueue> getChildQueues() { + return new ArrayList<CSQueue>(childQueues); } - @Override public void recoverContainer(Resource clusterResource, @@ -834,12 +703,20 @@ public class ParentQueue implements CSQueue { } // Careful! Locking order is important! synchronized (this) { - allocateResource(clusterResource,rmContainer.getContainer().getResource()); + super.allocateResource(clusterResource, rmContainer.getContainer() + .getResource(), labelManager.getLabelsOnNode(rmContainer + .getContainer().getNodeId())); } if (parent != null) { parent.recoverContainer(clusterResource, attempt, rmContainer); } } + + @Override + public ActiveUsersManager getActiveUsersManager() { + // Should never be called since all applications are submitted to LeafQueues + return null; + } @Override public void collectSchedulerApplications( @@ -853,8 +730,9 @@ public class ParentQueue implements CSQueue { public void attachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer rmContainer) { if (application != null) { - allocateResource(clusterResource, rmContainer.getContainer() - .getResource()); + super.allocateResource(clusterResource, rmContainer.getContainer() + .getResource(), labelManager.getLabelsOnNode(rmContainer + .getContainer().getNodeId())); LOG.info("movedContainer" + " queueMoveIn=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" @@ -870,7 +748,9 @@ public class ParentQueue implements CSQueue { public void detachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer rmContainer) { if (application != null) { - releaseResource(clusterResource, rmContainer.getContainer().getResource()); + super.releaseResource(clusterResource, + rmContainer.getContainer().getResource(), + labelManager.getLabelsOnNode(rmContainer.getContainer().getNodeId())); LOG.info("movedContainer" + " queueMoveOut=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" @@ -882,7 +762,14 @@ public class ParentQueue implements CSQueue { } } - public Map<QueueACL, AccessControlList> getACLs() { - return acls; + @Override + public float getAbsActualCapacity() { + // for now, simply return actual capacity = guaranteed capacity for parent + // queue + return absoluteCapacity; + } + + public synchronized int getNumApplications() { + return numApplications; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index b87744d..0725959 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import org.apache.hadoop.yarn.api.records.Resource; @@ -47,7 +49,7 @@ public class PlanQueue extends ParentQueue { private boolean showReservationsAsQueues; public PlanQueue(CapacitySchedulerContext cs, String queueName, - CSQueue parent, CSQueue old) { + CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); this.schedulerContext = cs; @@ -104,6 +106,10 @@ public class PlanQueue extends ParentQueue { newlyParsedParentQueue.getMaximumCapacity(), newlyParsedParentQueue.getAbsoluteMaximumCapacity(), newlyParsedParentQueue.getState(), newlyParsedParentQueue.getACLs(), + newlyParsedParentQueue.accessibleLabels, + newlyParsedParentQueue.defaultLabelExpression, + newlyParsedParentQueue.capacitiyByNodeLabels, + newlyParsedParentQueue.maxCapacityByNodeLabels, newlyParsedParentQueue.getReservationContinueLooking()); updateQuotas(newlyParsedParentQueue.userLimit, http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java index 8e61821..c4424b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -42,7 +42,7 @@ public class ReservationQueue extends LeafQueue { private int maxSystemApps; public ReservationQueue(CapacitySchedulerContext cs, String queueName, - PlanQueue parent) { + PlanQueue parent) throws IOException { super(cs, queueName, parent, null); maxSystemApps = cs.getConfiguration().getMaximumSystemApplications(); // the following parameters are common to all reservation in the plan http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index d4e043d..e1050da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -270,4 +271,16 @@ public abstract class FSQueue implements Queue, Schedulable { return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]", getName(), getDemand(), getResourceUsage(), fairShare, getWeights()); } + + @Override + public Set<String> getAccessibleNodeLabels() { + // TODO, add implementation for FS + return null; + } + + @Override + public String getDefaultNodeLabelExpression() { + // TODO, add implementation for FS + return null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index ea21c2b..532edc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.commons.logging.Log; @@ -187,6 +188,18 @@ public class FifoScheduler extends updateAppHeadRoom(schedulerAttempt); updateAvailableResourcesMetrics(); } + + @Override + public Set<String> getAccessibleNodeLabels() { + // TODO add implementation for FIFO scheduler + return null; + } + + @Override + public String getDefaultNodeLabelExpression() { + // TODO add implementation for FIFO scheduler + return null; + } }; public FifoScheduler() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java index ce5dd96..76ede39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java @@ -147,6 +147,7 @@ public class Application { return used; } + @SuppressWarnings("deprecation") public synchronized void submit() throws IOException, YarnException { ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); context.setApplicationId(this.applicationId);