YARN-5864. Capacity Scheduler - Queue Priorities. (wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ce832059 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ce832059 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ce832059 Branch: refs/heads/YARN-3926 Commit: ce832059db077fa95922198b066a737ed4f609fe Parents: 69fa816 Author: Wangda Tan <[email protected]> Authored: Mon Jan 23 10:52:14 2017 -0800 Committer: Wangda Tan <[email protected]> Committed: Mon Jan 23 10:52:14 2017 -0800 ---------------------------------------------------------------------- .../dev-support/findbugs-exclude.xml | 4 - .../AbstractPreemptableResourceCalculator.java | 14 +- .../capacity/PreemptionCandidatesSelector.java | 3 +- .../ProportionalCapacityPreemptionPolicy.java | 37 ++ ...QueuePriorityContainerCandidateSelector.java | 510 +++++++++++++++++ .../monitor/capacity/TempQueuePerPartition.java | 32 +- .../monitor/capacity/TempSchedulerNode.java | 120 ++++ .../rmcontainer/RMContainerImpl.java | 7 +- .../scheduler/SchedulerNode.java | 2 +- .../scheduler/capacity/AbstractCSQueue.java | 10 + .../scheduler/capacity/CSQueue.java | 7 + .../scheduler/capacity/CapacityScheduler.java | 75 ++- .../CapacitySchedulerConfiguration.java | 236 +++++++- .../capacity/CapacitySchedulerContext.java | 4 - .../capacity/CapacitySchedulerQueueManager.java | 3 - .../scheduler/capacity/LeafQueue.java | 4 +- .../scheduler/capacity/ParentQueue.java | 70 ++- .../capacity/PartitionedQueueComparator.java | 72 --- .../PriorityUtilizationQueueOrderingPolicy.java | 186 ++++++ .../capacity/policy/QueueOrderingPolicy.java | 52 ++ .../scheduler/common/fica/FiCaSchedulerApp.java | 96 +++- ...alCapacityPreemptionPolicyMockFramework.java | 64 +++ .../TestPreemptionForQueueWithPriorities.java | 361 ++++++++++++ ...estProportionalCapacityPreemptionPolicy.java | 31 +- ...pacityPreemptionPolicyForNodePartitions.java | 11 +- ...alCapacityPreemptionPolicyMockFramework.java | 13 +- .../CapacitySchedulerPreemptionTestBase.java | 22 +- .../capacity/TestApplicationLimits.java | 9 - .../TestApplicationLimitsByPartition.java | 3 - ...TestCapacitySchedulerSurgicalPreemption.java | 572 ++++++++++++++++++- .../scheduler/capacity/TestChildQueueOrder.java | 3 - .../capacity/TestContainerAllocation.java | 111 ++++ .../scheduler/capacity/TestLeafQueue.java | 13 +- .../scheduler/capacity/TestParentQueue.java | 3 - .../scheduler/capacity/TestReservations.java | 2 - ...tPriorityUtilizationQueueOrderingPolicy.java | 222 +++++++ .../policy/TestFairOrderingPolicy.java | 11 +- 37 files changed, 2764 insertions(+), 231 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index ab36a4e..c090749 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -183,10 +183,6 @@ <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.RecoveryComparator" /> <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" /> </Match> - <Match> - <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PartitionedQueueComparator" /> - <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" /> - </Match> <!-- Ignore some irrelevant class name warning --> <Match> <Class name="org.apache.hadoop.yarn.api.records.SerializedException" /> http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java index 8255a30..a80f317 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -49,13 +50,11 @@ public class AbstractPreemptableResourceCalculator { @Override public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) { - if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) { - return -1; - } - if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) { - return 1; - } - return 0; + double assigned1 = getIdealPctOfGuaranteed(tq1); + double assigned2 = getIdealPctOfGuaranteed(tq2); + + return PriorityUtilizationQueueOrderingPolicy.compare(assigned1, + assigned2, tq1.relativePriority, tq2.relativePriority); } // Calculates idealAssigned / guaranteed @@ -156,6 +155,7 @@ public class AbstractPreemptableResourceCalculator { // way, the most underserved queue(s) are always given resources first. Collection<TempQueuePerPartition> underserved = getMostUnderservedQueues( orderedByNeed, tqComparator); + for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i .hasNext();) { TempQueuePerPartition sub = i.next(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java index b48a287..4d8afaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java @@ -48,7 +48,8 @@ public abstract class PreemptionCandidatesSelector { * @param selectedCandidates already selected candidates from previous policies * @param clusterResource total resource * @param totalPreemptedResourceAllowed how many resources allowed to be - * preempted in this round + * preempted in this round. Should be + * updated(in-place set) after the call * @return merged selected candidates. */ public abstract Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates( http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 324e845..3bf6994 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -193,6 +194,14 @@ public class ProportionalCapacityPreemptionPolicy rc = scheduler.getResourceCalculator(); nlm = scheduler.getRMContext().getNodeLabelManager(); + // Do we need white queue-priority preemption policy? + boolean isQueuePriorityPreemptionEnabled = + csConfig.getPUOrderingPolicyUnderUtilizedPreemptionEnabled(); + if (isQueuePriorityPreemptionEnabled) { + candidatesSelectionPolicies.add( + new QueuePriorityContainerCandidateSelector(this)); + } + // Do we need to specially consider reserved containers? boolean selectCandidatesForResevedContainers = csConfig.getBoolean( CapacitySchedulerConfiguration. @@ -352,6 +361,8 @@ public class ProportionalCapacityPreemptionPolicy .clone(nlm.getResourceByLabel(partitionToLookAt, clusterResources)), partitionToLookAt); } + + // Update effective priority of queues } this.leafQueueNames = ImmutableSet.copyOf(getLeafQueueNames( @@ -368,13 +379,28 @@ public class ProportionalCapacityPreemptionPolicy new HashMap<>(); for (PreemptionCandidatesSelector selector : candidatesSelectionPolicies) { + long startTime = 0; if (LOG.isDebugEnabled()) { LOG.debug(MessageFormat .format("Trying to use {0} to select preemption candidates", selector.getClass().getName())); + startTime = clock.getTime(); } toPreempt = selector.selectCandidates(toPreempt, clusterResources, totalPreemptionAllowed); + + if (LOG.isDebugEnabled()) { + LOG.debug(MessageFormat + .format("{0} uses {1} millisecond to run", + selector.getClass().getName(), clock.getTime() - startTime)); + int totalSelected = 0; + for (Set<RMContainer> set : toPreempt.values()) { + totalSelected += set.size(); + } + LOG.debug(MessageFormat + .format("So far, total {0} containers selected to be preempted", + totalSelected)); + } } if (LOG.isDebugEnabled()) { @@ -470,11 +496,22 @@ public class ProportionalCapacityPreemptionPolicy reserved, curQueue); if (curQueue instanceof ParentQueue) { + String configuredOrderingPolicy = + ((ParentQueue) curQueue).getQueueOrderingPolicy().getConfigName(); + // Recursively add children for (CSQueue c : curQueue.getChildQueues()) { TempQueuePerPartition subq = cloneQueues(c, partitionResource, partitionToLookAt); + + // If we respect priority + if (StringUtils.equals( + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY, + configuredOrderingPolicy)) { + subq.relativePriority = c.getPriority().getPriority(); + } ret.addChild(subq); + subq.parent = ret; } } } finally { http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java new file mode 100644 index 0000000..438bce9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java @@ -0,0 +1,510 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class QueuePriorityContainerCandidateSelector + extends PreemptionCandidatesSelector { + private static final Log LOG = + LogFactory.getLog(QueuePriorityContainerCandidateSelector.class); + + // Configured timeout before doing reserved container preemption + private long minTimeout; + + // Allow move reservation around for better placement? + private boolean allowMoveReservation; + + // All the reserved containers of the system which could possible preempt from + // queue with lower priorities + private List<RMContainer> reservedContainers; + + // From -> To + // A digraph to represent if one queue has higher priority than another. + // For example, a->b means queue=a has higher priority than queue=b + private Table<String, String, Boolean> priorityDigraph = + HashBasedTable.create(); + + private Resource clusterResource; + private Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates; + private Resource totalPreemptionAllowed; + + // A cached scheduler node map, will be refreshed each round. + private Map<NodeId, TempSchedulerNode> tempSchedulerNodeMap = new HashMap<>(); + + // Have we touched (make any changes to the node) for this round + // Once a node is touched, we will not try to move reservations to the node + private Set<NodeId> touchedNodes; + + // Resource which marked to preempt from other queues. + // <Queue, Partition, Resource-marked-to-be-preempted-from-other-queue> + private Table<String, String, Resource> toPreemptedFromOtherQueues = + HashBasedTable.create(); + + private final Comparator<RMContainer> + CONTAINER_CREATION_TIME_COMPARATOR = new Comparator<RMContainer>() { + @Override + public int compare(RMContainer o1, RMContainer o2) { + if (preemptionAllowed(o1.getQueueName(), o2.getQueueName())) { + return -1; + } else if (preemptionAllowed(o2.getQueueName(), o1.getQueueName())) { + return 1; + } + + // If two queues cannot preempt each other, compare creation time. + return Long.compare(o1.getCreationTime(), o2.getCreationTime()); + } + }; + + QueuePriorityContainerCandidateSelector( + CapacitySchedulerPreemptionContext preemptionContext) { + super(preemptionContext); + + // Initialize parameters + CapacitySchedulerConfiguration csc = + preemptionContext.getScheduler().getConfiguration(); + + minTimeout = csc.getPUOrderingPolicyUnderUtilizedPreemptionDelay(); + allowMoveReservation = + csc.getPUOrderingPolicyUnderUtilizedPreemptionMoveReservation(); + } + + private List<TempQueuePerPartition> getPathToRoot(TempQueuePerPartition tq) { + List<TempQueuePerPartition> list = new ArrayList<>(); + while (tq != null) { + list.add(tq); + tq = tq.parent; + } + return list; + } + + private void intializePriorityDigraph() { + LOG.info("Initializing priority preemption directed graph:"); + + // Make sure we iterate all leaf queue combinations + for (String q1 : preemptionContext.getLeafQueueNames()) { + for (String q2 : preemptionContext.getLeafQueueNames()) { + // Make sure we only calculate each combination once instead of all + // permutations + if (q1.compareTo(q2) < 0) { + TempQueuePerPartition tq1 = preemptionContext.getQueueByPartition(q1, + RMNodeLabelsManager.NO_LABEL); + TempQueuePerPartition tq2 = preemptionContext.getQueueByPartition(q2, + RMNodeLabelsManager.NO_LABEL); + + List<TempQueuePerPartition> path1 = getPathToRoot(tq1); + List<TempQueuePerPartition> path2 = getPathToRoot(tq2); + + // Get direct ancestor below LCA (Lowest common ancestor) + int i = path1.size() - 1; + int j = path2.size() - 1; + while (path1.get(i).queueName.equals(path2.get(j).queueName)) { + i--; + j--; + } + + // compare priority of path1[i] and path2[j] + int p1 = path1.get(i).relativePriority; + int p2 = path2.get(j).relativePriority; + if (p1 < p2) { + priorityDigraph.put(q2, q1, true); + if (LOG.isDebugEnabled()) { + LOG.info("- Added priority ordering edge: " + q2 + " >> " + q1); + } + } else if (p2 < p1) { + priorityDigraph.put(q1, q2, true); + if (LOG.isDebugEnabled()) { + LOG.info("- Added priority ordering edge: " + q1 + " >> " + q2); + } + } + } + } + } + } + + /** + * Do we allow demandingQueue preempt resource from toBePreemptedQueue + * + * @param demandingQueue demandingQueue + * @param toBePreemptedQueue toBePreemptedQueue + * @return can/cannot + */ + private boolean preemptionAllowed(String demandingQueue, + String toBePreemptedQueue) { + return priorityDigraph.contains(demandingQueue, + toBePreemptedQueue); + } + + /** + * Can we preempt enough resource for given: + * + * @param requiredResource askedResource + * @param demandingQueue demandingQueue + * @param schedulerNode node + * @param lookingForNewReservationPlacement Are we trying to look for move + * reservation to the node + * @param newlySelectedContainers newly selected containers, will be set when + * we can preempt enough resources from the node. + * + * @return can/cannot + */ + private boolean canPreemptEnoughResourceForAsked(Resource requiredResource, + String demandingQueue, FiCaSchedulerNode schedulerNode, + boolean lookingForNewReservationPlacement, + List<RMContainer> newlySelectedContainers) { + // Do not check touched nodes again. + if (touchedNodes.contains(schedulerNode.getNodeID())) { + return false; + } + + TempSchedulerNode node = tempSchedulerNodeMap.get(schedulerNode.getNodeID()); + if (null == node) { + node = TempSchedulerNode.fromSchedulerNode(schedulerNode); + tempSchedulerNodeMap.put(schedulerNode.getNodeID(), node); + } + + if (null != schedulerNode.getReservedContainer() + && lookingForNewReservationPlacement) { + // Node reserved by the others, skip this node + // We will not try to move the reservation to node which reserved already. + return false; + } + + // Need to preemption = asked - (node.total - node.allocated) + Resource lacking = Resources.subtract(requiredResource, Resources + .subtract(node.getTotalResource(), node.getAllocatedResource())); + + // On each host, simply check if we could preempt containers from + // lower-prioritized queues or not + List<RMContainer> runningContainers = node.getRunningContainers(); + Collections.sort(runningContainers, CONTAINER_CREATION_TIME_COMPARATOR); + + // First of all, consider already selected containers + for (RMContainer runningContainer : runningContainers) { + if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected( + runningContainer, selectedCandidates)) { + Resources.subtractFrom(lacking, + runningContainer.getAllocatedResource()); + } + } + + // If we already can allocate the reserved container after preemption, + // skip following steps + if (Resources.fitsIn(rc, clusterResource, lacking, + Resources.none())) { + return true; + } + + Resource allowed = Resources.clone(totalPreemptionAllowed); + Resource selected = Resources.createResource(0); + + for (RMContainer runningContainer : runningContainers) { + if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected( + runningContainer, selectedCandidates)) { + // ignore selected containers + continue; + } + + // Only preempt resource from queue with lower priority + if (!preemptionAllowed(demandingQueue, + runningContainer.getQueueName())) { + continue; + } + + // Don't preempt AM container + if (runningContainer.isAMContainer()) { + continue; + } + + // Not allow to preempt more than limit + if (Resources.greaterThanOrEqual(rc, clusterResource, allowed, + runningContainer.getAllocatedResource())) { + Resources.subtractFrom(allowed, + runningContainer.getAllocatedResource()); + Resources.subtractFrom(lacking, + runningContainer.getAllocatedResource()); + Resources.addTo(selected, runningContainer.getAllocatedResource()); + + if (null != newlySelectedContainers) { + newlySelectedContainers.add(runningContainer); + } + } + + // Lacking <= 0 means we can allocate the reserved container + if (Resources.fitsIn(rc, clusterResource, lacking, Resources.none())) { + return true; + } + } + + return false; + } + + private boolean preChecksForMovingReservedContainerToNode( + RMContainer reservedContainer, FiCaSchedulerNode newNode) { + // Don't do this if it has hard-locality preferences + if (reservedContainer.getReservedSchedulerKey().getContainerToUpdate() + != null) { + // This means a container update request (like increase / promote) + return false; + } + + // For normal requests + FiCaSchedulerApp app = + preemptionContext.getScheduler().getApplicationAttempt( + reservedContainer.getApplicationAttemptId()); + if (!app.getAppSchedulingInfo().canDelayTo( + reservedContainer.getAllocatedSchedulerKey(), ResourceRequest.ANY)) { + // This is a hard locality request + return false; + } + + // Check if newNode's partition matches requested partition + if (!StringUtils.equals(reservedContainer.getNodeLabelExpression(), + newNode.getPartition())) { + return false; + } + + return true; + } + + private void tryToMakeBetterReservationPlacement( + RMContainer reservedContainer, + List<FiCaSchedulerNode> allSchedulerNodes) { + for (FiCaSchedulerNode targetNode : allSchedulerNodes) { + // Precheck if we can move the rmContainer to the new targetNode + if (!preChecksForMovingReservedContainerToNode(reservedContainer, + targetNode)) { + continue; + } + + if (canPreemptEnoughResourceForAsked( + reservedContainer.getReservedResource(), + reservedContainer.getQueueName(), targetNode, true, null)) { + NodeId fromNode = reservedContainer.getNodeId(); + + // We can place container to this targetNode, so just go ahead and notify + // scheduler + if (preemptionContext.getScheduler().moveReservedContainer( + reservedContainer, targetNode)) { + LOG.info("Successfully moved reserved container=" + reservedContainer + .getContainerId() + " from targetNode=" + fromNode + + " to targetNode=" + targetNode.getNodeID()); + touchedNodes.add(targetNode.getNodeID()); + } + } + } + } + + /** + * Do we allow the demanding queue preempt resource from other queues? + * A satisfied queue is not allowed to preempt resource from other queues. + * @param demandingQueue + * @return allowed/not + */ + private boolean isQueueSatisfied(String demandingQueue, + String partition) { + TempQueuePerPartition tq = preemptionContext.getQueueByPartition( + demandingQueue, partition); + if (null == tq) { + return false; + } + + Resource guaranteed = tq.getGuaranteed(); + Resource usedDeductReservd = Resources.subtract(tq.getUsed(), + tq.getReserved()); + Resource markedToPreemptFromOtherQueue = toPreemptedFromOtherQueues.get( + demandingQueue, partition); + if (null == markedToPreemptFromOtherQueue) { + markedToPreemptFromOtherQueue = Resources.none(); + } + + // return Used - reserved + to-preempt-from-other-queue >= guaranteed + boolean flag = Resources.greaterThanOrEqual(rc, clusterResource, + Resources.add(usedDeductReservd, markedToPreemptFromOtherQueue), + guaranteed); + return flag; + } + + private void incToPreempt(String queue, String partition, + Resource allocated) { + Resource total = toPreemptedFromOtherQueues.get(queue, partition); + if (null == total) { + total = Resources.createResource(0); + toPreemptedFromOtherQueues.put(queue, partition, total); + } + + Resources.addTo(total, allocated); + } + + @Override + public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates( + Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, + Resource clusterResource, + Resource totalPreemptedResourceAllowed) { + // Initialize digraph from queues + // TODO (wangda): only do this when queue refreshed. + priorityDigraph.clear(); + intializePriorityDigraph(); + + // When all queues are set to same priority, or priority is not respected, + // direct return. + if (priorityDigraph.isEmpty()) { + return selectedCandidates; + } + + // Save parameters to be shared by other methods + this.selectedCandidates = selectedCandidates; + this.clusterResource = clusterResource; + this.totalPreemptionAllowed = totalPreemptedResourceAllowed; + + toPreemptedFromOtherQueues.clear(); + + reservedContainers = new ArrayList<>(); + + // Clear temp-scheduler-node-map every time when doing selection of + // containers. + tempSchedulerNodeMap.clear(); + touchedNodes = new HashSet<>(); + + // Add all reserved containers for analysis + List<FiCaSchedulerNode> allSchedulerNodes = + preemptionContext.getScheduler().getAllNodes(); + for (FiCaSchedulerNode node : allSchedulerNodes) { + RMContainer reservedContainer = node.getReservedContainer(); + if (null != reservedContainer) { + // Add to reservedContainers list if the queue that the reserved + // container belongs to has high priority than at least one queue + if (priorityDigraph.containsRow( + reservedContainer.getQueueName())) { + reservedContainers.add(reservedContainer); + } + } + } + + // Sort reserved container by creation time + Collections.sort(reservedContainers, CONTAINER_CREATION_TIME_COMPARATOR); + + long currentTime = System.currentTimeMillis(); + + // From the begining of the list + for (RMContainer reservedContainer : reservedContainers) { + // Only try to preempt reserved container after reserved container created + // and cannot be allocated after minTimeout + if (currentTime - reservedContainer.getCreationTime() < minTimeout) { + continue; + } + + FiCaSchedulerNode node = preemptionContext.getScheduler().getNode( + reservedContainer.getReservedNode()); + if (null == node) { + // Something is wrong, ignore + continue; + } + + List<RMContainer> newlySelectedToBePreemptContainers = new ArrayList<>(); + + // Check if we can preempt for this queue + // We will skip if the demanding queue is already satisfied. + String demandingQueueName = reservedContainer.getQueueName(); + boolean demandingQueueSatisfied = isQueueSatisfied(demandingQueueName, + node.getPartition()); + + // We will continue check if it is possible to preempt reserved container + // from the node. + boolean canPreempt = false; + if (!demandingQueueSatisfied) { + canPreempt = canPreemptEnoughResourceForAsked( + reservedContainer.getReservedResource(), demandingQueueName, node, + false, newlySelectedToBePreemptContainers); + } + + // Add selected container if we can allocate reserved container by + // preemption others + if (canPreempt) { + touchedNodes.add(node.getNodeID()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to preempt following containers to make reserved " + + "container=" + reservedContainer.getContainerId() + " on node=" + + node.getNodeID() + " can be allocated:"); + } + + // Update to-be-preempt + incToPreempt(demandingQueueName, node.getPartition(), + reservedContainer.getReservedResource()); + + for (RMContainer c : newlySelectedToBePreemptContainers) { + if (LOG.isDebugEnabled()) { + LOG.debug(" --container=" + c.getContainerId() + " resource=" + c + .getReservedResource()); + } + + Set<RMContainer> containers = selectedCandidates.get( + c.getApplicationAttemptId()); + if (null == containers) { + containers = new HashSet<>(); + selectedCandidates.put(c.getApplicationAttemptId(), containers); + } + containers.add(c); + + // Update totalPreemptionResourceAllowed + Resources.subtractFrom(totalPreemptedResourceAllowed, + c.getAllocatedResource()); + } + } else if (!demandingQueueSatisfied) { + // We failed to get enough resource to allocate the container + // This typically happens when the reserved node is proper, will + // try to see if we can reserve the container on a better host. + // Only do this if the demanding queue is not satisfied. + // + // TODO (wangda): do more tests before making it usable + // + if (allowMoveReservation) { + tryToMakeBetterReservationPlacement(reservedContainer, + allSchedulerNodes); + } + } + } + + return selectedCandidates; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 9783457..7eab015 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -53,6 +53,12 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity { protected Resource pendingDeductReserved; + // Relative priority of this queue to its parent + // If parent queue's ordering policy doesn't respect priority, + // this will be always 0 + int relativePriority = 0; + TempQueuePerPartition parent = null; + TempQueuePerPartition(String queueName, Resource current, boolean preemptionDisabled, String partition, Resource killable, float absCapacity, float absMaxCapacity, Resource totalPartitionResource, @@ -114,8 +120,15 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity { Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax( Resources.subtract(getMax(), idealAssigned), Resource.newInstance(0, 0)); - // remain = avail - min(avail, (max - assigned), (current + pending - - // assigned)) + // accepted = min{avail, + // max - assigned, + // current + pending - assigned, + // # Make sure a queue will not get more than max of its + // # used/guaranteed, this is to make sure preemption won't + // # happen if all active queues are beyond their guaranteed + // # This is for leaf queue only. + // max(guaranteed, used) - assigned} + // remain = avail - accepted Resource accepted = Resources.min(rc, clusterResource, absMaxCapIdealAssignedDelta, Resources.min(rc, clusterResource, avail, Resources @@ -137,6 +150,21 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity { .subtract(Resources.add(getUsed(), (considersReservedResource ? pending : pendingDeductReserved)), idealAssigned))); + + // For leaf queue: accept = min(accept, max(guaranteed, used) - assigned) + // Why only for leaf queue? + // Because for a satisfied parent queue, it could have some under-utilized + // leaf queues. Such under-utilized leaf queue could preemption resources + // from over-utilized leaf queue located at other hierarchies. + if (null == children || children.isEmpty()) { + Resource maxOfGuranteedAndUsedDeductAssigned = Resources.subtract( + Resources.max(rc, clusterResource, getUsed(), getGuaranteed()), + idealAssigned); + maxOfGuranteedAndUsedDeductAssigned = Resources.max(rc, clusterResource, + maxOfGuranteedAndUsedDeductAssigned, Resources.none()); + accepted = Resources.min(rc, clusterResource, accepted, + maxOfGuranteedAndUsedDeductAssigned); + } Resource remain = Resources.subtract(avail, accepted); Resources.addTo(idealAssigned, accepted); return remain; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java new file mode 100644 index 0000000..320f262 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.List; + +/** + * This class will save necessary information which copied from + * FiCaSchedulerNode. This is added majorly for performance consideration, this + * can be cached to avoid hitting scheduler again and again. In addition, + * we can add some preemption-required fields to the class. + */ +public class TempSchedulerNode { + private List<RMContainer> runningContainers; + private RMContainer reservedContainer; + private Resource totalResource; + + // excluded reserved resource + private Resource allocatedResource; + + // total - allocated + private Resource availableResource; + + // just a shortcut of reservedContainer.getResource. + private Resource reservedResource; + + private NodeId nodeId; + + public static TempSchedulerNode fromSchedulerNode( + FiCaSchedulerNode schedulerNode) { + TempSchedulerNode n = new TempSchedulerNode(); + n.totalResource = Resources.clone(schedulerNode.getTotalResource()); + n.allocatedResource = Resources.clone(schedulerNode.getAllocatedResource()); + n.runningContainers = schedulerNode.getCopiedListOfRunningContainers(); + n.reservedContainer = schedulerNode.getReservedContainer(); + if (n.reservedContainer != null) { + n.reservedResource = n.reservedContainer.getReservedResource(); + } else { + n.reservedResource = Resources.none(); + } + n.availableResource = Resources.subtract(n.totalResource, + n.allocatedResource); + n.nodeId = schedulerNode.getNodeID(); + return n; + } + + public NodeId getNodeId() { + return nodeId; + } + + public List<RMContainer> getRunningContainers() { + return runningContainers; + } + + public void setRunningContainers(List<RMContainer> runningContainers) { + this.runningContainers = runningContainers; + } + + public RMContainer getReservedContainer() { + return reservedContainer; + } + + public void setReservedContainer(RMContainer reservedContainer) { + this.reservedContainer = reservedContainer; + } + + public Resource getTotalResource() { + return totalResource; + } + + public void setTotalResource(Resource totalResource) { + this.totalResource = totalResource; + } + + public Resource getAllocatedResource() { + return allocatedResource; + } + + public void setAllocatedResource(Resource allocatedResource) { + this.allocatedResource = allocatedResource; + } + + public Resource getAvailableResource() { + return availableResource; + } + + public void setAvailableResource(Resource availableResource) { + this.availableResource = availableResource; + } + + public Resource getReservedResource() { + return reservedResource; + } + + public void setReservedResource(Resource reservedResource) { + this.reservedResource = reservedResource; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 79709a3..72ce1a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -556,7 +556,12 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { container.reservedResource = e.getReservedResource(); container.reservedNode = e.getReservedNode(); container.reservedSchedulerKey = e.getReservedSchedulerKey(); - + + Container c = container.getContainer(); + if (c != null) { + c.setNodeId(container.reservedNode); + } + if (!EnumSet.of(RMContainerState.NEW, RMContainerState.RESERVED) .contains(container.getState())) { // When container's state != NEW/RESERVED, it is an increase reservation http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 15fd830..59ca81b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -407,7 +407,7 @@ public abstract class SchedulerNode { * Set the reserved container in the node. * @param reservedContainer Reserved container in the node. */ - protected synchronized void + public synchronized void setReservedContainer(RMContainer reservedContainer) { this.reservedContainer = reservedContainer; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index cefa1e2..e9ef319 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -109,6 +109,8 @@ public abstract class AbstractCSQueue implements CSQueue { protected ReentrantReadWriteLock.ReadLock readLock; protected ReentrantReadWriteLock.WriteLock writeLock; + volatile Priority priority = Priority.newInstance(0); + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this.labelManager = cs.getRMContext().getNodeLabelManager(); @@ -336,6 +338,9 @@ public abstract class AbstractCSQueue implements CSQueue { csContext.getConfiguration().getReservationContinueLook(); this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this); + + this.priority = csContext.getConfiguration().getQueuePriority( + getQueuePath()); } finally { writeLock.unlock(); } @@ -934,4 +939,9 @@ public abstract class AbstractCSQueue implements CSQueue { this.writeLock.unlock(); } } + + @Override + public Priority getPriority() { + return this.priority; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index e30ec39..2e3ced5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -32,6 +32,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; @@ -372,4 +373,10 @@ public interface CSQueue extends SchedulerQueue<CSQueue> { */ public void validateSubmitApplication(ApplicationId applicationId, String userName, String queue) throws AccessControlException; + + /** + * Get priority of queue + * @return queue priority + */ + Priority getPriority(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ced310e..03bdd3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -271,16 +271,6 @@ public class CapacityScheduler extends } @Override - public Comparator<CSQueue> getNonPartitionedQueueComparator() { - return CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR; - } - - @Override - public PartitionedQueueComparator getPartitionedQueueComparator() { - return CapacitySchedulerQueueManager.PARTITIONED_QUEUE_COMPARATOR; - } - - @Override public int getNumClusterNodes() { return nodeTracker.nodeCount(); } @@ -2512,4 +2502,69 @@ public class CapacityScheduler extends public CapacitySchedulerQueueManager getCapacitySchedulerQueueManager() { return this.queueManager; } + + /** + * Try to move a reserved container to a targetNode. + * If the targetNode is reserved by another application (other than this one). + * The previous reservation will be cancelled. + * + * @param toBeMovedContainer reserved container will be moved + * @param targetNode targetNode + * @return true if move succeeded. Return false if the targetNode is reserved by + * a different container or move failed because of any other reasons. + */ + public boolean moveReservedContainer(RMContainer toBeMovedContainer, + FiCaSchedulerNode targetNode) { + try { + writeLock.lock(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to move container=" + toBeMovedContainer + " to node=" + + targetNode.getNodeID()); + } + + FiCaSchedulerNode sourceNode = getNode(toBeMovedContainer.getNodeId()); + if (null == sourceNode) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to move reservation, cannot find source node=" + + toBeMovedContainer.getNodeId()); + } + return false; + } + + // Target node updated? + if (getNode(targetNode.getNodeID()) != targetNode) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Failed to move reservation, node updated or removed, moving " + + "cancelled."); + } + return false; + } + + // Target node's reservation status changed? + if (targetNode.getReservedContainer() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Target node's reservation status changed, moving cancelled."); + } + return false; + } + + FiCaSchedulerApp app = getApplicationAttempt( + toBeMovedContainer.getApplicationAttemptId()); + if (null == app) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot find to-be-moved container's application=" + + toBeMovedContainer.getApplicationAttemptId()); + } + return false; + } + + // finally, move the reserved container + return app.moveReservation(toBeMovedContainer, sourceNode, targetNode); + } finally { + writeLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index eb148d2..43ec390 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -18,19 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.StringTokenizer; - import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -44,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.ReservationACL; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -51,6 +41,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingP import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; @@ -59,7 +51,17 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.collect.ImmutableSet; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.StringTokenizer; public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration { @@ -127,14 +129,21 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final String MAXIMUM_ALLOCATION_VCORES = "maximum-allocation-vcores"; - + + /** + * Ordering policy of queues + */ public static final String ORDERING_POLICY = "ordering-policy"; - - public static final String FIFO_ORDERING_POLICY = "fifo"; - public static final String FAIR_ORDERING_POLICY = "fair"; + /* + * Ordering policy inside a leaf queue to sort apps + */ + public static final String FIFO_APP_ORDERING_POLICY = "fifo"; + + public static final String FAIR_APP_ORDERING_POLICY = "fair"; - public static final String DEFAULT_ORDERING_POLICY = FIFO_ORDERING_POLICY; + public static final String DEFAULT_APP_ORDERING_POLICY = + FIFO_APP_ORDERING_POLICY; @Private public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000; @@ -298,6 +307,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur String queueName = PREFIX + queue + DOT; return queueName; } + + static String getQueueOrderingPolicyPrefix(String queue) { + String queueName = PREFIX + queue + DOT + ORDERING_POLICY + DOT; + return queueName; + } private String getNodeLabelPrefix(String queue, String label) { if (label.equals(CommonNodeLabelsManager.NO_LABEL)) { @@ -400,20 +414,23 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur DEFAULT_USER_LIMIT); return userLimit; } - + + // TODO (wangda): We need to better distinguish app ordering policy and queue + // ordering policy's classname / configuration options, etc. And dedup code + // if possible. @SuppressWarnings("unchecked") - public <S extends SchedulableEntity> OrderingPolicy<S> getOrderingPolicy( + public <S extends SchedulableEntity> OrderingPolicy<S> getAppOrderingPolicy( String queue) { - String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY, - DEFAULT_ORDERING_POLICY); + String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY, + DEFAULT_APP_ORDERING_POLICY); OrderingPolicy<S> orderingPolicy; - if (policyType.trim().equals(FIFO_ORDERING_POLICY)) { + if (policyType.trim().equals(FIFO_APP_ORDERING_POLICY)) { policyType = FifoOrderingPolicy.class.getName(); } - if (policyType.trim().equals(FAIR_ORDERING_POLICY)) { + if (policyType.trim().equals(FAIR_APP_ORDERING_POLICY)) { policyType = FairOrderingPolicy.class.getName(); } try { @@ -734,6 +751,20 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur return Resources.createResource(maximumMemory, maximumCores); } + @Private + public Priority getQueuePriority(String queue) { + String queuePolicyPrefix = getQueuePrefix(queue); + Priority pri = Priority.newInstance( + getInt(queuePolicyPrefix + "priority", 0)); + return pri; + } + + @Private + public void setQueuePriority(String queue, int priority) { + String queuePolicyPrefix = getQueuePrefix(queue); + setInt(queuePolicyPrefix + "priority", priority); + } + /** * Get the per queue setting for the maximum limit to allocate to * each container request. @@ -1204,4 +1235,161 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur getInt(QUEUE_GLOBAL_MAX_APPLICATION, (int) UNDEFINED); return maxApplicationsPerQueue; } + + /** + * Ordering policy inside a parent queue to sort queues + */ + + /** + * Less relative usage queue can get next resource, this is default + */ + public static final String QUEUE_UTILIZATION_ORDERING_POLICY = "utilization"; + + /** + * Combination of relative usage and priority + */ + public static final String QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY = + "priority-utilization"; + + public static final String DEFAULT_QUEUE_ORDERING_POLICY = + QUEUE_UTILIZATION_ORDERING_POLICY; + + + @Private + public void setQueueOrderingPolicy(String queue, String policy) { + set(getQueuePrefix(queue) + ORDERING_POLICY, policy); + } + + @Private + public QueueOrderingPolicy getQueueOrderingPolicy(String queue, + String parentPolicy) { + String defaultPolicy = parentPolicy; + if (null == defaultPolicy) { + defaultPolicy = DEFAULT_QUEUE_ORDERING_POLICY; + } + + String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY, + defaultPolicy); + + QueueOrderingPolicy qop; + if (policyType.trim().equals(QUEUE_UTILIZATION_ORDERING_POLICY)) { + // Doesn't respect priority + qop = new PriorityUtilizationQueueOrderingPolicy(false); + } else if (policyType.trim().equals( + QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY)) { + qop = new PriorityUtilizationQueueOrderingPolicy(true); + } else { + String message = + "Unable to construct queue ordering policy=" + policyType + " queue=" + + queue; + throw new YarnRuntimeException(message); + } + + return qop; + } + + /* + * Get global configuration for ordering policies + */ + private String getOrderingPolicyGlobalConfigKey(String orderPolicyName, + String configKey) { + return PREFIX + ORDERING_POLICY + DOT + orderPolicyName + DOT + configKey; + } + + /** + * Global configurations of queue-priority-utilization ordering policy + */ + private static final String UNDER_UTILIZED_PREEMPTION_ENABLED = + "underutilized-preemption.enabled"; + + /** + * Do we allow under-utilized queue with higher priority to preempt queue + * with lower priority *even if queue with lower priority is not satisfied*. + * + * For example, two queues, a and b + * a.priority = 1, (a.used-capacity - a.reserved-capacity) = 40% + * b.priority = 0, b.used-capacity = 30% + * + * Set this configuration to true to allow queue-a to preempt container from + * queue-b. + * + * (The reason why deduct reserved-capacity from used-capacity for queue with + * higher priority is: the reserved-capacity is just scheduler's internal + * implementation to allocate large containers, it is not possible for + * application to use such reserved-capacity. It is possible that a queue with + * large container requests have a large number of containers but cannot + * allocate from any of them. But scheduler will make sure a satisfied queue + * will not preempt resource from any other queues. A queue is considered to + * be satisfied when queue's used-capacity - reserved-capacity ⥠+ * guaranteed-capacity.) + * + * @return allowed or not + */ + public boolean getPUOrderingPolicyUnderUtilizedPreemptionEnabled() { + return getBoolean(getOrderingPolicyGlobalConfigKey( + QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY, + UNDER_UTILIZED_PREEMPTION_ENABLED), false); + } + + @VisibleForTesting + public void setPUOrderingPolicyUnderUtilizedPreemptionEnabled( + boolean enabled) { + setBoolean(getOrderingPolicyGlobalConfigKey( + QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY, + UNDER_UTILIZED_PREEMPTION_ENABLED), enabled); + } + + private static final String UNDER_UTILIZED_PREEMPTION_DELAY = + "underutilized-preemption.reserved-container-delay-ms"; + + /** + * When a reserved container of an underutilized queue is created. Preemption + * will kick in after specified delay (in ms). + * + * The total time to preempt resources for a reserved container from higher + * priority queue will be: reserved-container-delay-ms + + * {@link CapacitySchedulerConfiguration#PREEMPTION_WAIT_TIME_BEFORE_KILL}. + * + * This parameter is added to make preemption from lower priority queue which + * is underutilized to be more careful. This parameter takes effect when + * underutilized-preemption.enabled set to true. + * + * @return delay + */ + public long getPUOrderingPolicyUnderUtilizedPreemptionDelay() { + return getLong(getOrderingPolicyGlobalConfigKey( + QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY, + UNDER_UTILIZED_PREEMPTION_DELAY), 60000L); + } + + @VisibleForTesting + public void setPUOrderingPolicyUnderUtilizedPreemptionDelay( + long timeout) { + setLong(getOrderingPolicyGlobalConfigKey( + QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY, + UNDER_UTILIZED_PREEMPTION_DELAY), timeout); + } + + private static final String UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION = + "underutilized-preemption.allow-move-reservation"; + + /** + * When doing preemption from under-satisfied queues for priority queue. + * Do we allow move reserved container from one host to another? + * + * @return allow or not + */ + public boolean getPUOrderingPolicyUnderUtilizedPreemptionMoveReservation() { + return getBoolean(getOrderingPolicyGlobalConfigKey( + QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY, + UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION), false); + } + + @VisibleForTesting + public void setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation( + boolean allowMoveReservation) { + setBoolean(getOrderingPolicyGlobalConfigKey( + QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY, + UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION), allowMoveReservation); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 504acb9..9aeaec6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -61,10 +61,6 @@ public interface CapacitySchedulerContext { Configuration getConf(); ResourceCalculator getResourceCalculator(); - - Comparator<CSQueue> getNonPartitionedQueueComparator(); - - PartitionedQueueComparator getPartitionedQueueComparator(); FiCaSchedulerNode getNode(NodeId nodeId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index f204c74..8cae6c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -75,9 +75,6 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< } }; - static final PartitionedQueueComparator PARTITIONED_QUEUE_COMPARATOR = - new PartitionedQueueComparator(); - static class QueueHook { public CSQueue hook(CSQueue queue) { return queue; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 0ea56e7..bacfeef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -185,7 +185,7 @@ public class LeafQueue extends AbstractCSQueue { CapacitySchedulerConfiguration conf = csContext.getConfiguration(); setOrderingPolicy( - conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath())); + conf.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath())); userLimit = conf.getUserLimit(getQueuePath()); userLimitFactor = conf.getUserLimitFactor(getQueuePath()); @@ -287,7 +287,7 @@ public class LeafQueue extends AbstractCSQueue { .toString() + "\n" + "reservationsContinueLooking = " + reservationsContinueLooking + "\n" + "preemptionDisabled = " + getPreemptionDisabled() + "\n" + "defaultAppPriorityPerQueue = " - + defaultAppPriorityPerQueue); + + defaultAppPriorityPerQueue + "\npriority = " + priority); } finally { writeLock.unlock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index ec2cccb..75ab610 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -18,18 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -64,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Activi import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; @@ -73,29 +62,34 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.Placeme import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; import org.apache.hadoop.yarn.util.resource.Resources; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + @Private @Evolving public class ParentQueue extends AbstractCSQueue { private static final Log LOG = LogFactory.getLog(ParentQueue.class); - protected final Set<CSQueue> childQueues; + protected final List<CSQueue> childQueues; private final boolean rootQueue; - private final Comparator<CSQueue> nonPartitionedQueueComparator; - private final PartitionedQueueComparator partitionQueueComparator; private volatile int numApplications; private final CapacitySchedulerContext scheduler; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + private QueueOrderingPolicy queueOrderingPolicy; + public ParentQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); this.scheduler = cs; - this.nonPartitionedQueueComparator = cs.getNonPartitionedQueueComparator(); - this.partitionQueueComparator = new PartitionedQueueComparator(); - this.rootQueue = (parent == null); float rawCapacity = cs.getConfiguration().getNonLabeledQueueCapacity(getQueuePath()); @@ -107,7 +101,7 @@ public class ParentQueue extends AbstractCSQueue { ". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE); } - this.childQueues = new TreeSet<CSQueue>(nonPartitionedQueueComparator); + this.childQueues = new ArrayList<>(); setupQueueConfigs(cs.getClusterResource()); @@ -116,7 +110,14 @@ public class ParentQueue extends AbstractCSQueue { ", fullname=" + getQueuePath()); } - void setupQueueConfigs(Resource clusterResource) + // returns what is configured queue ordering policy + private String getQueueOrderingPolicyConfigName() { + return queueOrderingPolicy == null ? + null : + queueOrderingPolicy.getConfigName(); + } + + protected void setupQueueConfigs(Resource clusterResource) throws IOException { try { writeLock.lock(); @@ -134,13 +135,22 @@ public class ParentQueue extends AbstractCSQueue { } } + // Initialize queue ordering policy + queueOrderingPolicy = csContext.getConfiguration().getQueueOrderingPolicy( + getQueuePath(), parent == null ? + null : + ((ParentQueue) parent).getQueueOrderingPolicyConfigName()); + queueOrderingPolicy.setQueues(childQueues); + LOG.info(queueName + ", capacity=" + this.queueCapacities.getCapacity() + ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() + ", maxCapacity=" + this.queueCapacities.getMaximumCapacity() + ", absoluteMaxCapacity=" + this.queueCapacities .getAbsoluteMaximumCapacity() + ", state=" + getState() + ", acls=" + aclsString + ", labels=" + labelStrBuilder.toString() + "\n" - + ", reservationsContinueLooking=" + reservationsContinueLooking); + + ", reservationsContinueLooking=" + reservationsContinueLooking + + ", orderingPolicy=" + getQueueOrderingPolicyConfigName() + + ", priority=" + priority); } finally { writeLock.unlock(); } @@ -294,8 +304,8 @@ public class ParentQueue extends AbstractCSQueue { // Re-configure existing child queues and add new ones // The CS has already checked to ensure all existing child queues are present! - Map<String, CSQueue> currentChildQueues = getQueues(childQueues); - Map<String, CSQueue> newChildQueues = getQueues( + Map<String, CSQueue> currentChildQueues = getQueuesMap(childQueues); + Map<String, CSQueue> newChildQueues = getQueuesMap( newlyParsedParentQueue.childQueues); for (Map.Entry<String, CSQueue> e : newChildQueues.entrySet()) { String newChildQueueName = e.getKey(); @@ -338,7 +348,7 @@ public class ParentQueue extends AbstractCSQueue { } } - Map<String, CSQueue> getQueues(Set<CSQueue> queues) { + private Map<String, CSQueue> getQueuesMap(List<CSQueue> queues) { Map<String, CSQueue> queuesMap = new HashMap<String, CSQueue>(); for (CSQueue queue : queues) { queuesMap.put(queue.getQueueName(), queue); @@ -680,13 +690,7 @@ public class ParentQueue extends AbstractCSQueue { private Iterator<CSQueue> sortAndGetChildrenAllocationIterator( String partition) { - // Previously we keep a sorted list for default partition, it is not good - // when multi-threading scheduler is enabled, so to make a simpler code - // now re-sort queue every time irrespective to node partition. - partitionQueueComparator.setPartitionToLookAt(partition); - List<CSQueue> childrenList = new ArrayList<>(childQueues); - Collections.sort(childrenList, partitionQueueComparator); - return childrenList.iterator(); + return queueOrderingPolicy.getAssignmentIterator(partition); } private CSAssignment assignContainersToChildQueues(Resource cluster, @@ -1083,4 +1087,8 @@ public class ParentQueue extends AbstractCSQueue { this.writeLock.unlock(); } } + + public QueueOrderingPolicy getQueueOrderingPolicy() { + return queueOrderingPolicy; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java deleted file mode 100644 index 477c615..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java +++ /dev/null @@ -1,72 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; - -import java.util.Comparator; - -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; - -public class PartitionedQueueComparator implements Comparator<CSQueue> { - private String partitionToLookAt = null; - - public void setPartitionToLookAt(String partitionToLookAt) { - this.partitionToLookAt = partitionToLookAt; - } - - - @Override - public int compare(CSQueue q1, CSQueue q2) { - /* - * 1. Check accessible to given partition, if one queue accessible and - * the other not, accessible queue goes first. - */ - boolean q1Accessible = - q1.getAccessibleNodeLabels().contains(partitionToLookAt) - || q1.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY); - boolean q2Accessible = - q2.getAccessibleNodeLabels().contains(partitionToLookAt) - || q2.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY); - if (q1Accessible && !q2Accessible) { - return -1; - } else if (!q1Accessible && q2Accessible) { - return 1; - } - - /* - * - * 2. When two queue has same accessibility, check who will go first: - * Now we simply compare their used resource on the partition to lookAt - */ - float used1 = q1.getQueueCapacities().getUsedCapacity(partitionToLookAt); - float used2 = q2.getQueueCapacities().getUsedCapacity(partitionToLookAt); - if (Math.abs(used1 - used2) < 1e-6) { - // When used capacity is same, compare their guaranteed-capacity - float cap1 = q1.getQueueCapacities().getCapacity(partitionToLookAt); - float cap2 = q2.getQueueCapacities().getCapacity(partitionToLookAt); - - // when cap1 == cap2, we will compare queue's name - if (Math.abs(cap1 - cap2) < 1e-6) { - return q1.getQueueName().compareTo(q2.getQueueName()); - } - return Float.compare(cap2, cap1); - } - - return Float.compare(used1, used2); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
