Revert "CapacityScheduler: Improve preemption to only kill containers that would satisfy the incoming request. (Wangda Tan)"
This reverts commit 7e8c9beb4156dcaeb3a11e60aaa06d2370626913. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fa7a4352 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fa7a4352 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fa7a4352 Branch: refs/heads/HDFS-7240 Commit: fa7a43529d529f0006c8033c2003f15b9b93f103 Parents: 7e8c9be Author: Wangda Tan <wan...@apache.org> Authored: Wed Mar 16 17:02:10 2016 -0700 Committer: Wangda Tan <wan...@apache.org> Committed: Wed Mar 16 17:02:10 2016 -0700 ---------------------------------------------------------------------- .../ProportionalCapacityPreemptionPolicy.java | 166 ++--- .../rmcontainer/RMContainer.java | 1 - .../scheduler/PreemptableResourceScheduler.java | 2 +- .../scheduler/ResourceLimits.java | 9 - .../scheduler/SchedulerNode.java | 9 +- .../scheduler/capacity/AbstractCSQueue.java | 45 +- .../scheduler/capacity/CSAssignment.java | 11 - .../scheduler/capacity/CapacityScheduler.java | 132 +--- .../CapacitySchedulerConfiguration.java | 14 +- .../capacity/CapacitySchedulerContext.java | 15 +- .../scheduler/capacity/LeafQueue.java | 69 -- .../scheduler/capacity/ParentQueue.java | 157 +---- .../allocator/AbstractContainerAllocator.java | 2 - .../capacity/allocator/ContainerAllocation.java | 12 - .../allocator/RegularContainerAllocator.java | 39 +- .../capacity/preemption/KillableContainer.java | 45 -- .../capacity/preemption/PreemptableQueue.java | 102 --- .../capacity/preemption/PreemptionManager.java | 165 ----- .../scheduler/common/AssignmentInformation.java | 6 +- .../scheduler/common/fica/FiCaSchedulerApp.java | 32 +- .../common/fica/FiCaSchedulerNode.java | 65 +- .../scheduler/event/SchedulerEventType.java | 15 +- .../resourcemanager/TestRMDispatcher.java | 4 +- .../server/resourcemanager/TestRMRestart.java | 2 +- .../applicationsmanager/TestAMRestart.java | 7 +- ...estProportionalCapacityPreemptionPolicy.java | 6 +- ...pacityPreemptionPolicyForNodePartitions.java | 2 - .../capacity/TestApplicationLimits.java | 2 - .../capacity/TestApplicationPriority.java | 6 +- .../capacity/TestCapacityScheduler.java | 8 +- .../TestCapacitySchedulerPreemption.java | 677 ------------------- .../scheduler/capacity/TestChildQueueOrder.java | 2 - .../scheduler/capacity/TestLeafQueue.java | 3 - .../TestNodeLabelContainerAllocation.java | 97 --- .../scheduler/capacity/TestParentQueue.java | 2 - .../scheduler/capacity/TestReservations.java | 2 - .../scheduler/capacity/TestUtils.java | 36 - .../fair/TestFairSchedulerPreemption.java | 2 +- 38 files changed, 186 insertions(+), 1785 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 9b499c8..3a87edb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -35,7 +35,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; @@ -126,8 +125,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic private long maxWaitTime; private CapacityScheduler scheduler; private long monitoringInterval; - private final Map<RMContainer, Long> preempted = new HashMap<>(); - + private final Map<RMContainer,Long> preempted = + new HashMap<RMContainer,Long>(); private ResourceCalculator rc; private float percentageClusterPreemptionAllowed; private double naturalTerminationFactor; @@ -136,10 +135,6 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic new HashMap<>(); private RMNodeLabelsManager nlm; - // Preemptable Entities, synced from scheduler at every run - private Map<String, PreemptableQueue> preemptableEntities = null; - private Set<ContainerId> killableContainers; - public ProportionalCapacityPreemptionPolicy() { clock = SystemClock.getInstance(); } @@ -189,64 +184,6 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic Resource clusterResources = Resources.clone(scheduler.getClusterResource()); containerBasedPreemptOrKill(root, clusterResources); } - - @SuppressWarnings("unchecked") - private void cleanupStaledKillableContainers(Resource cluster, - Set<String> leafQueueNames) { - for (String q : leafQueueNames) { - for (TempQueuePerPartition tq : getQueuePartitions(q)) { - // When queue's used - killable <= guaranteed and, killable > 0, we need - // to check if any of killable containers needs to be reverted - if (Resources.lessThanOrEqual(rc, cluster, - Resources.subtract(tq.current, tq.killable), tq.idealAssigned) - && Resources.greaterThan(rc, cluster, tq.killable, Resources.none())) { - // How many killable resources need to be reverted - // need-to-revert = already-marked-killable - (current - ideal) - Resource toBeRevertedFromKillable = Resources.subtract(tq.killable, - Resources.subtract(tq.current, tq.idealAssigned)); - - Resource alreadyReverted = Resources.createResource(0); - - for (RMContainer c : preemptableEntities.get(q).getKillableContainers( - tq.partition).values()) { - if (Resources.greaterThanOrEqual(rc, cluster, alreadyReverted, - toBeRevertedFromKillable)) { - break; - } - - if (Resources.greaterThan(rc, cluster, - Resources.add(alreadyReverted, c.getAllocatedResource()), - toBeRevertedFromKillable)) { - continue; - } else { - // This container need to be marked to unkillable - Resources.addTo(alreadyReverted, c.getAllocatedResource()); - rmContext.getDispatcher().getEventHandler().handle( - new ContainerPreemptEvent(c.getApplicationAttemptId(), c, - SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE)); - } - } - - } - } - } - } - - private void syncKillableContainersFromScheduler() { - // sync preemptable entities from scheduler - preemptableEntities = - scheduler.getPreemptionManager().getShallowCopyOfPreemptableEntities(); - - killableContainers = new HashSet<>(); - for (Map.Entry<String, PreemptableQueue> entry : preemptableEntities - .entrySet()) { - PreemptableQueue entity = entry.getValue(); - for (Map<ContainerId, RMContainer> map : entity.getKillableContainers() - .values()) { - killableContainers.addAll(map.keySet()); - } - } - } /** * This method selects and tracks containers to be preempted. If a container @@ -264,8 +201,6 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic .getNodeLabelManager().getClusterNodeLabelNames()); allPartitions.add(RMNodeLabelsManager.NO_LABEL); - syncKillableContainersFromScheduler(); - // extract a summary of the queues from scheduler synchronized (scheduler) { queueToPartitions.clear(); @@ -293,17 +228,13 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed); } - // remove containers from killable list when we want to preempt less resources - // from queue. - cleanupStaledKillableContainers(clusterResources, leafQueueNames); - // based on ideal allocation select containers to be preempted from each // queue and each application Map<ApplicationAttemptId,Set<RMContainer>> toPreempt = getContainersToPreempt(leafQueueNames, clusterResources); if (LOG.isDebugEnabled()) { - logToCSV(new ArrayList<>(leafQueueNames)); + logToCSV(new ArrayList<String>(leafQueueNames)); } // if we are in observeOnly mode return before any action is taken @@ -323,10 +254,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // if we tried to preempt this for more than maxWaitTime if (preempted.get(container) != null && preempted.get(container) + maxWaitTime < clock.getTime()) { - // mark container killable + // kill it rmContext.getDispatcher().getEventHandler().handle( new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE)); + SchedulerEventType.KILL_PREEMPTED_CONTAINER)); preempted.remove(container); } else { if (preempted.get(container) != null) { @@ -402,14 +333,14 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // qAlloc tracks currently active queues (will decrease progressively as // demand is met) - List<TempQueuePerPartition> qAlloc = new ArrayList<>(queues); + List<TempQueuePerPartition> qAlloc = new ArrayList<TempQueuePerPartition>(queues); // unassigned tracks how much resources are still to assign, initialized // with the total capacity for this set of queues Resource unassigned = Resources.clone(tot_guarant); // group queues based on whether they have non-zero guaranteed capacity - Set<TempQueuePerPartition> nonZeroGuarQueues = new HashSet<>(); - Set<TempQueuePerPartition> zeroGuarQueues = new HashSet<>(); + Set<TempQueuePerPartition> nonZeroGuarQueues = new HashSet<TempQueuePerPartition>(); + Set<TempQueuePerPartition> zeroGuarQueues = new HashSet<TempQueuePerPartition>(); for (TempQueuePerPartition q : qAlloc) { if (Resources @@ -484,8 +415,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // idealAssigned >= current + pending), remove it from consideration. // Sort queues from most under-guaranteed to most over-guaranteed. TQComparator tqComparator = new TQComparator(rc, tot_guarant); - PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10, - tqComparator); + PriorityQueue<TempQueuePerPartition> orderedByNeed = + new PriorityQueue<TempQueuePerPartition>(10, tqComparator); for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) { TempQueuePerPartition q = i.next(); if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) { @@ -543,7 +474,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // percentage of guaranteed. protected Collection<TempQueuePerPartition> getMostUnderservedQueues( PriorityQueue<TempQueuePerPartition> orderedByNeed, TQComparator tqComparator) { - ArrayList<TempQueuePerPartition> underserved = new ArrayList<>(); + ArrayList<TempQueuePerPartition> underserved = new ArrayList<TempQueuePerPartition>(); while (!orderedByNeed.isEmpty()) { TempQueuePerPartition q1 = orderedByNeed.remove(); underserved.add(q1); @@ -571,7 +502,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic if (ignoreGuar) { for (TempQueuePerPartition q : queues) { - q.normalizedGuarantee = 1.0f / queues.size(); + q.normalizedGuarantee = (float) 1.0f / ((float) queues.size()); } } else { for (TempQueuePerPartition q : queues) { @@ -584,9 +515,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic } } - private String getPartitionByRMContainer(RMContainer rmContainer) { - return scheduler.getSchedulerNode(rmContainer.getAllocatedNode()) - .getPartition(); + private String getPartitionByNodeId(NodeId nodeId) { + return scheduler.getSchedulerNode(nodeId).getPartition(); } /** @@ -604,7 +534,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic return false; } - String nodePartition = getPartitionByRMContainer(rmContainer); + String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode()); Resource toObtainByPartition = resourceToObtainByPartitions.get(nodePartition); @@ -645,7 +575,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) { Set<RMContainer> set; if (null == (set = preemptMap.get(appAttemptId))) { - set = new HashSet<>(); + set = new HashSet<RMContainer>(); preemptMap.put(appAttemptId, set); } set.add(containerToPreempt); @@ -657,7 +587,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * over-capacity queue. It uses {@link #NATURAL_TERMINATION_FACTOR} to * account for containers that will naturally complete. * - * @param leafQueueNames set of leaf queues to preempt from + * @param queues set of leaf queues to preempt from * @param clusterResource total amount of cluster resources * @return a map of applciationID to set of containers to preempt */ @@ -665,8 +595,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic Set<String> leafQueueNames, Resource clusterResource) { Map<ApplicationAttemptId, Set<RMContainer>> preemptMap = - new HashMap<>(); - List<RMContainer> skippedAMContainerlist = new ArrayList<>(); + new HashMap<ApplicationAttemptId, Set<RMContainer>>(); + List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>(); // Loop all leaf queues for (String queueName : leafQueueNames) { @@ -684,7 +614,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic LeafQueue leafQueue = null; Map<String, Resource> resToObtainByPartition = - new HashMap<>(); + new HashMap<String, Resource>(); for (TempQueuePerPartition qT : getQueuePartitions(queueName)) { leafQueue = qT.leafQueue; // we act only if we are violating balance by more than @@ -773,6 +703,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * @param clusterResource * @param preemptMap * @param skippedAMContainerlist + * @param resToObtain * @param skippedAMSize * @param maxAMCapacityForThisQueue */ @@ -820,7 +751,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // first drop reserved containers towards rsrcPreempt List<RMContainer> reservedContainers = - new ArrayList<>(app.getReservedContainers()); + new ArrayList<RMContainer>(app.getReservedContainers()); for (RMContainer c : reservedContainers) { if (resToObtainByPartition.isEmpty()) { return; @@ -840,7 +771,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // if more resources are to be freed go through all live containers in // reverse priority and reverse allocation order and mark them for // preemption - List<RMContainer> liveContainers = new ArrayList<>(app.getLiveContainers()); + List<RMContainer> liveContainers = + new ArrayList<RMContainer>(app.getLiveContainers()); sortContainers(liveContainers); @@ -856,11 +788,6 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic continue; } - // Skip already marked to killable containers - if (killableContainers.contains(c.getContainerId())) { - continue; - } - // Try to preempt this container tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, clusterResource, preemptMap); @@ -899,10 +826,6 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic return "ProportionalCapacityPreemptionPolicy"; } - @VisibleForTesting - public Map<RMContainer, Long> getToPreemptContainers() { - return preempted; - } /** * This method walks a tree of CSQueue and clones the portion of the state @@ -928,11 +851,6 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic partitionToLookAt); Resource guaranteed = Resources.multiply(partitionResource, absCap); Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap); - Resource killable = Resources.none(); - if (null != preemptableEntities.get(queueName)) { - killable = preemptableEntities.get(queueName) - .getKillableResource(partitionToLookAt); - } // when partition is a non-exclusive partition, the actual maxCapacity // could more than specified maxCapacity @@ -957,7 +875,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic l.getTotalPendingResourcesConsideringUserLimit( partitionResource, partitionToLookAt); ret = new TempQueuePerPartition(queueName, current, pending, guaranteed, - maxCapacity, preemptionDisabled, partitionToLookAt, killable); + maxCapacity, preemptionDisabled, partitionToLookAt); if (preemptionDisabled) { ret.untouchableExtra = extra; } else { @@ -968,7 +886,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic Resource pending = Resource.newInstance(0, 0); ret = new TempQueuePerPartition(curQueue.getQueueName(), current, pending, - guaranteed, maxCapacity, false, partitionToLookAt, killable); + guaranteed, maxCapacity, false, partitionToLookAt); Resource childrensPreemptable = Resource.newInstance(0, 0); for (CSQueue c : curQueue.getChildQueues()) { TempQueuePerPartition subq = @@ -1014,7 +932,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic Map<String, TempQueuePerPartition> queuePartitions; if (null == (queuePartitions = queueToPartitions.get(queueName))) { - queuePartitions = new HashMap<>(); + queuePartitions = new HashMap<String, TempQueuePerPartition>(); queueToPartitions.put(queueName, queuePartitions); } queuePartitions.put(queuePartition.partition, queuePartition); @@ -1053,10 +971,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic final Resource guaranteed; final Resource maxCapacity; final String partition; - final Resource killable; Resource idealAssigned; Resource toBePreempted; - // For logging purpose Resource actuallyPreempted; Resource untouchableExtra; @@ -1070,7 +986,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic TempQueuePerPartition(String queueName, Resource current, Resource pending, Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled, - String partition, Resource killableResource) { + String partition) { this.queueName = queueName; this.current = current; this.pending = pending; @@ -1080,12 +996,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic this.actuallyPreempted = Resource.newInstance(0, 0); this.toBePreempted = Resource.newInstance(0, 0); this.normalizedGuarantee = Float.NaN; - this.children = new ArrayList<>(); + this.children = new ArrayList<TempQueuePerPartition>(); this.untouchableExtra = Resource.newInstance(0, 0); this.preemptableExtra = Resource.newInstance(0, 0); this.preemptionDisabled = preemptionDisabled; this.partition = partition; - this.killable = killableResource; } public void setLeafQueue(LeafQueue l){ @@ -1103,6 +1018,12 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic Resources.addTo(pending, q.pending); } + public void addChildren(ArrayList<TempQueuePerPartition> queues) { + assert leafQueue == null; + children.addAll(queues); + } + + public ArrayList<TempQueuePerPartition> getChildren(){ return children; } @@ -1143,13 +1064,18 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic return sb.toString(); } + public void printAll() { + LOG.info(this.toString()); + for (TempQueuePerPartition sub : this.getChildren()) { + sub.printAll(); + } + } + public void assignPreemption(float scalingFactor, ResourceCalculator rc, Resource clusterResource) { - if (Resources.greaterThan(rc, clusterResource, - Resources.subtract(current, killable), idealAssigned)) { - toBePreempted = Resources.multiply(Resources.subtract( - Resources.subtract(current, killable), idealAssigned), - scalingFactor); + if (Resources.greaterThan(rc, clusterResource, current, idealAssigned)) { + toBePreempted = Resources.multiply( + Resources.subtract(current, idealAssigned), scalingFactor); } else { toBePreempted = Resource.newInstance(0, 0); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index dfe0886..5d26931 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; /** * Represents the ResourceManager's view of an application container. See http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java index b73c538..ee7e101 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java @@ -45,6 +45,6 @@ public interface PreemptableResourceScheduler extends ResourceScheduler { * Ask the scheduler to forcibly interrupt the container given as input * @param container */ - void markContainerForKillable(RMContainer container); + void killPreemptedContainer(RMContainer container); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java index 721eb36..c545e9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java @@ -38,8 +38,6 @@ public class ResourceLimits { // containers. private volatile Resource headroom; - private boolean allowPreempt = false; - public ResourceLimits(Resource limit) { this(limit, Resources.none()); } @@ -74,11 +72,4 @@ public class ResourceLimits { this.amountNeededUnreserve = amountNeededUnreserve; } - public boolean isAllowPreemption() { - return allowPreempt; - } - - public void setIsAllowPreemption(boolean allowPreempt) { - this.allowPreempt = allowPreempt; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 6c4f300..33ab2f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -64,8 +64,9 @@ public abstract class SchedulerNode { private volatile ResourceUtilization nodeUtilization = ResourceUtilization.newInstance(0, 0, 0f); - /* set of containers that are allocated containers */ - protected final Map<ContainerId, RMContainer> launchedContainers = + + /** Set of containers that are allocated containers. */ + private final Map<ContainerId, RMContainer> launchedContainers = new HashMap<>(); private final RMNode rmNode; @@ -167,7 +168,7 @@ public abstract class SchedulerNode { * @param deltaResource Change in the resource allocation. * @param increase True if the change is an increase of allocation. */ - protected synchronized void changeContainerResource(ContainerId containerId, + private synchronized void changeContainerResource(ContainerId containerId, Resource deltaResource, boolean increase) { if (increase) { deductUnallocatedResource(deltaResource); @@ -241,7 +242,7 @@ public abstract class SchedulerNode { * Update the resources of the node when allocating a new container. * @param container Container to allocate. */ - protected synchronized void updateResource(Container container) { + private synchronized void updateResource(Container container) { addUnallocatedResource(container.getResource()); --numContainers; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 955f8fa..39ca29b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -46,7 +45,6 @@ import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -442,8 +440,11 @@ public abstract class AbstractCSQueue implements CSQueue { Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager.getResourceByLabel(nodePartition, clusterResource), queueCapacities.getAbsoluteMaximumCapacity(nodePartition), minimumAllocation); - return Resources.min(resourceCalculator, clusterResource, - queueMaxResource, currentResourceLimits.getLimit()); + if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { + return Resources.min(resourceCalculator, clusterResource, + queueMaxResource, currentResourceLimits.getLimit()); + } + return queueMaxResource; } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { // When we doing non-exclusive resource allocation, maximum capacity of // all queues on this label equals to total resource with the label. @@ -473,19 +474,12 @@ public abstract class AbstractCSQueue implements CSQueue { Resource nowTotalUsed = queueUsage.getUsed(nodePartition); - // Set headroom for currentResourceLimits: - // When queue is a parent queue: Headroom = limit - used + killable - // When queue is a leaf queue: Headroom = limit - used (leaf queue cannot preempt itself) - Resource usedExceptKillable = nowTotalUsed; - if (null != getChildQueues() && !getChildQueues().isEmpty()) { - usedExceptKillable = Resources.subtract(nowTotalUsed, - getTotalKillableResource(nodePartition)); - } - currentResourceLimits.setHeadroom( - Resources.subtract(currentLimitResource, usedExceptKillable)); + // Set headroom for currentResourceLimits + currentResourceLimits.setHeadroom(Resources.subtract(currentLimitResource, + nowTotalUsed)); if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, - usedExceptKillable, currentLimitResource)) { + nowTotalUsed, currentLimitResource)) { // if reservation continous looking enabled, check to see if could we // potentially use this node instead of a reserved node if the application @@ -497,7 +491,7 @@ public abstract class AbstractCSQueue implements CSQueue { resourceCouldBeUnreserved, Resources.none())) { // resource-without-reserved = used - reserved Resource newTotalWithoutReservedResource = - Resources.subtract(usedExceptKillable, resourceCouldBeUnreserved); + Resources.subtract(nowTotalUsed, resourceCouldBeUnreserved); // when total-used-without-reserved-resource < currentLimit, we still // have chance to allocate on this node by unreserving some containers @@ -626,10 +620,11 @@ public abstract class AbstractCSQueue implements CSQueue { // considering all labels in cluster, only those labels which are // use some resource of this queue can be considered. Set<String> nodeLabels = new HashSet<String>(); - if (this.getAccessibleNodeLabels() != null && this.getAccessibleNodeLabels() - .contains(RMNodeLabelsManager.ANY)) { - nodeLabels.addAll(Sets.union(this.getQueueCapacities().getNodePartitionsSet(), - this.getQueueResourceUsage().getNodePartitionsSet())); + if (this.getAccessibleNodeLabels() != null + && this.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) { + nodeLabels.addAll(Sets.union(this.getQueueCapacities() + .getNodePartitionsSet(), this.getQueueResourceUsage() + .getNodePartitionsSet())); } else { nodeLabels.addAll(this.getAccessibleNodeLabels()); } @@ -641,14 +636,4 @@ public abstract class AbstractCSQueue implements CSQueue { } return nodeLabels; } - - public Resource getTotalKillableResource(String partition) { - return csContext.getPreemptionManager().getKillableResource(queueName, - partition); - } - - public Iterator<RMContainer> getKillableContainers(String partition) { - return csContext.getPreemptionManager().getKillableContainers(queueName, - partition); - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java index 6406efe..68f6f12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java @@ -26,8 +26,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.Assignment import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.resource.Resources; -import java.util.List; - @Private @Unstable public class CSAssignment { @@ -44,7 +42,6 @@ public class CSAssignment { private boolean fulfilledReservation; private final AssignmentInformation assignmentInformation; private boolean increaseAllocation; - private List<RMContainer> containersToKill; public CSAssignment(Resource resource, NodeType type) { this(resource, type, null, null, false, false); @@ -150,12 +147,4 @@ public class CSAssignment { public void setIncreasedAllocation(boolean flag) { increaseAllocation = flag; } - - public void setContainersToKill(List<RMContainer> containersToKill) { - this.containersToKill = containersToKill; - } - - public List<RMContainer> getContainersToKill() { - return containersToKill; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index cf5c3b5..735306a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -108,8 +108,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicE import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -150,10 +148,6 @@ public class CapacityScheduler extends // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; - private PreemptionManager preemptionManager = new PreemptionManager(); - - private volatile boolean isLazyPreemptionEnabled = false; - static final Comparator<CSQueue> nonPartitionedQueueComparator = new Comparator<CSQueue>() { @Override @@ -304,11 +298,12 @@ public class CapacityScheduler extends initMaximumResourceCapability(this.conf.getMaximumAllocation()); this.calculator = this.conf.getResourceCalculator(); this.usePortForNodeName = this.conf.getUsePortForNodeName(); - this.applications = new ConcurrentHashMap<>(); + this.applications = + new ConcurrentHashMap<ApplicationId, + SchedulerApplication<FiCaSchedulerApp>>(); this.labelManager = rmContext.getNodeLabelManager(); authorizer = YarnAuthorizationProvider.getInstance(yarnConf); initializeQueues(this.conf); - this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled(); scheduleAsynchronously = this.conf.getScheduleAynschronously(); asyncScheduleInterval = @@ -374,9 +369,6 @@ public class CapacityScheduler extends refreshMaximumAllocation(this.conf.getMaximumAllocation()); throw new IOException("Failed to re-init queues", t); } - - // update lazy preemption - this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled(); } long getAsyncScheduleInterval() { @@ -511,9 +503,6 @@ public class CapacityScheduler extends LOG.info("Initialized root queue " + root); updatePlacementRules(); setQueueAcls(authorizer, queues); - - // Notify Preemption Manager - preemptionManager.refreshQueues(null, root); } @Lock(CapacityScheduler.class) @@ -542,9 +531,6 @@ public class CapacityScheduler extends labelManager.reinitializeQueueLabels(getQueueToLabels()); setQueueAcls(authorizer, queues); - - // Notify Preemption Manager - preemptionManager.refreshQueues(null, root); } @VisibleForTesting @@ -1267,10 +1253,8 @@ public class CapacityScheduler extends // Try to schedule more if there are no reservations to fulfill if (node.getReservedContainer() == null) { - if (calculator.computeAvailableContainers(Resources - .add(node.getUnallocatedResource(), node.getTotalKillableResources()), - minimumAllocation) > 0) { - + if (calculator.computeAvailableContainers(node.getUnallocatedResource(), + minimumAllocation) > 0) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to schedule on node: " + node.getNodeName() + ", available: " + node.getUnallocatedResource()); @@ -1279,8 +1263,10 @@ public class CapacityScheduler extends assignment = root.assignContainers( getClusterResource(), node, + // TODO, now we only consider limits for parent for non-labeled + // resources, should consider labeled resources as well. new ResourceLimits(labelManager.getResourceByLabel( - node.getPartition(), getClusterResource())), + RMNodeLabelsManager.NO_LABEL, getClusterResource())), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); if (Resources.greaterThan(calculator, getClusterResource(), assignment.getResource(), Resources.none())) { @@ -1450,20 +1436,11 @@ public class CapacityScheduler extends markContainerForPreemption(aid, containerToBePreempted); } break; - case MARK_CONTAINER_FOR_KILLABLE: - { - ContainerPreemptEvent containerKillableEvent = (ContainerPreemptEvent)event; - RMContainer killableContainer = containerKillableEvent.getContainer(); - markContainerForKillable(killableContainer); - } - break; - case MARK_CONTAINER_FOR_NONKILLABLE: + case KILL_PREEMPTED_CONTAINER: { - if (isLazyPreemptionEnabled) { - ContainerPreemptEvent cancelKillContainerEvent = - (ContainerPreemptEvent) event; - markContainerForNonKillable(cancelKillContainerEvent.getContainer()); - } + ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event; + RMContainer containerToBeKilled = killContainerEvent.getContainer(); + killPreemptedContainer(containerToBeKilled); } break; default: @@ -1571,14 +1548,14 @@ public class CapacityScheduler extends protected void completedContainerInternal( RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { + Container container = rmContainer.getContainer(); - ContainerId containerId = container.getId(); // Get the application for the finished container FiCaSchedulerApp application = getCurrentAttemptForContainer(container.getId()); ApplicationId appId = - containerId.getApplicationAttemptId().getApplicationId(); + container.getId().getApplicationAttemptId().getApplicationId(); if (application == null) { LOG.info("Container " + container + " of" + " finished application " + appId + " completed with event " + event); @@ -1592,6 +1569,15 @@ public class CapacityScheduler extends LeafQueue queue = (LeafQueue)application.getQueue(); queue.completedContainer(getClusterResource(), application, node, rmContainer, containerStatus, event, null, true); + + if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) { + schedulerHealth.updatePreemption(Time.now(), container.getNodeId(), + container.getId(), queue.getQueuePath()); + schedulerHealth.updateSchedulerPreemptionCounts(1); + } else { + schedulerHealth.updateRelease(lastNodeUpdateTime, container.getNodeId(), + container.getId(), queue.getQueuePath()); + } } @Override @@ -1627,7 +1613,7 @@ public class CapacityScheduler extends ApplicationAttemptId applicationAttemptId) { return super.getApplicationAttempt(applicationAttemptId); } - + @Lock(Lock.NoLock.class) public FiCaSchedulerNode getNode(NodeId nodeId) { return nodeTracker.getNode(nodeId); @@ -1668,60 +1654,15 @@ public class CapacityScheduler extends } } - public synchronized void markContainerForKillable( - RMContainer killableContainer) { - if (LOG.isDebugEnabled()) { - LOG.debug(SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE + ": container" - + killableContainer.toString()); - } - - if (!isLazyPreemptionEnabled) { - super.completedContainer(killableContainer, SchedulerUtils - .createPreemptedContainerStatus(killableContainer.getContainerId(), - SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL); - } else { - FiCaSchedulerNode node = (FiCaSchedulerNode) getSchedulerNode( - killableContainer.getAllocatedNode()); - - FiCaSchedulerApp application = getCurrentAttemptForContainer( - killableContainer.getContainerId()); - - node.markContainerToKillable(killableContainer.getContainerId()); - - // notify PreemptionManager - // Get the application for the finished container - if (null != application) { - String leafQueueName = application.getCSLeafQueue().getQueueName(); - getPreemptionManager().addKillableContainer( - new KillableContainer(killableContainer, node.getPartition(), - leafQueueName)); - } } - } - - private synchronized void markContainerForNonKillable( - RMContainer nonKillableContainer) { + @Override + public void killPreemptedContainer(RMContainer cont) { if (LOG.isDebugEnabled()) { - LOG.debug( - SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE + ": container" - + nonKillableContainer.toString()); - } - - FiCaSchedulerNode node = (FiCaSchedulerNode) getSchedulerNode( - nonKillableContainer.getAllocatedNode()); - - FiCaSchedulerApp application = getCurrentAttemptForContainer( - nonKillableContainer.getContainerId()); - - node.markContainerToNonKillable(nonKillableContainer.getContainerId()); - - // notify PreemptionManager - // Get the application for the finished container - if (null != application) { - String leafQueueName = application.getCSLeafQueue().getQueueName(); - getPreemptionManager().removeKillableContainer( - new KillableContainer(nonKillableContainer, node.getPartition(), - leafQueueName)); + LOG.debug(SchedulerEventType.KILL_PREEMPTED_CONTAINER + ": container" + + cont.toString()); } + super.completedContainer(cont, SchedulerUtils + .createPreemptedContainerStatus(cont.getContainerId(), + SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL); } @Override @@ -2004,7 +1945,6 @@ public class CapacityScheduler extends return ret; } - @Override public SchedulerHealth getSchedulerHealth() { return this.schedulerHealth; } @@ -2014,11 +1954,6 @@ public class CapacityScheduler extends } @Override - public long getLastNodeUpdateTime() { - return lastNodeUpdateTime; - } - - @Override public Priority checkAndGetApplicationPriority(Priority priorityFromContext, String user, String queueName, ApplicationId applicationId) throws YarnException { @@ -2119,9 +2054,4 @@ public class CapacityScheduler extends + rmApp.getQueue() + " for application: " + applicationId + " for the user: " + rmApp.getUser()); } - - @Override - public PreemptionManager getPreemptionManager() { - return preemptionManager; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 3729264..3756d9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -257,12 +257,6 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public static final String RESERVATION_ENFORCEMENT_WINDOW = "reservation-enforcement-window"; - @Private - public static final String LAZY_PREEMPTION_ENALBED = PREFIX + "lazy-preemption-enabled"; - - @Private - public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false; - public CapacitySchedulerConfiguration() { this(new Configuration()); } @@ -1013,11 +1007,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @VisibleForTesting public void setOrderingPolicyParameter(String queue, String parameterKey, String parameterValue) { - set(getQueuePrefix(queue) + ORDERING_POLICY + "." + parameterKey, - parameterValue); - } - - public boolean getLazyPreemptionEnabled() { - return getBoolean(LAZY_PREEMPTION_ENALBED, DEFAULT_LAZY_PREEMPTION_ENABLED); + set(getQueuePrefix(queue) + ORDERING_POLICY + "." + + parameterKey, parameterValue); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 1203272..2a0dd0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -18,20 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import java.util.Comparator; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import java.util.Comparator; - /** * Read-only interface to {@link CapacityScheduler} context. */ @@ -64,12 +61,4 @@ public interface CapacitySchedulerContext { PartitionedQueueComparator getPartitionedQueueComparator(); FiCaSchedulerNode getNode(NodeId nodeId); - - FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId attemptId); - - PreemptionManager getPreemptionManager(); - - SchedulerHealth getSchedulerHealth(); - - long getLastNodeUpdateTime(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 3dc2090..c625fae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -37,11 +37,9 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; -import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -65,9 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps; @@ -827,40 +823,6 @@ public class LeafQueue extends AbstractCSQueue { assignment.setExcessReservation(null); } } - - private void killToPreemptContainers(Resource clusterResource, - FiCaSchedulerNode node, - CSAssignment assignment) { - if (assignment.getContainersToKill() != null) { - StringBuilder sb = new StringBuilder("Killing containers: ["); - - for (RMContainer c : assignment.getContainersToKill()) { - FiCaSchedulerApp application = csContext.getApplicationAttempt( - c.getApplicationAttemptId()); - LeafQueue q = application.getCSLeafQueue(); - q.completedContainer(clusterResource, application, node, c, SchedulerUtils - .createPreemptedContainerStatus(c.getContainerId(), - SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL, - null, false); - sb.append("(container=" + c.getContainerId() + " resource=" + c - .getAllocatedResource() + ")"); - } - - sb.append("] for container=" + assignment.getAssignmentInformation() - .getFirstAllocatedOrReservedContainerId() + " resource=" + assignment - .getResource()); - LOG.info(sb.toString()); - - } - } - - private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) { - // Set preemption-allowed: - // For leaf queue, only under-utilized queue is allowed to preempt resources from other queues - float usedCapacity = queueCapacities.getAbsoluteUsedCapacity(nodePartition); - float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition); - limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity); - } @Override public synchronized CSAssignment assignContainers(Resource clusterResource, @@ -873,8 +835,6 @@ public class LeafQueue extends AbstractCSQueue { + " #applications=" + orderingPolicy.getNumSchedulableEntities()); } - setPreemptionAllowed(currentResourceLimits, node.getPartition()); - // Check for reserved resources RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { @@ -886,7 +846,6 @@ public class LeafQueue extends AbstractCSQueue { currentResourceLimits, schedulingMode, reservedContainer); handleExcessReservedContainer(clusterResource, assignment, node, application); - killToPreemptContainers(clusterResource, node, assignment); return assignment; } } @@ -948,7 +907,6 @@ public class LeafQueue extends AbstractCSQueue { handleExcessReservedContainer(clusterResource, assignment, node, application); - killToPreemptContainers(clusterResource, node, assignment); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -1252,34 +1210,11 @@ public class LeafQueue extends AbstractCSQueue { } } - private void updateSchedulerHealthForCompletedContainer( - RMContainer rmContainer, ContainerStatus containerStatus) { - // Update SchedulerHealth for released / preempted container - SchedulerHealth schedulerHealth = csContext.getSchedulerHealth(); - if (null == schedulerHealth) { - // Only do update if we have schedulerHealth - return; - } - - if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) { - schedulerHealth.updatePreemption(Time.now(), rmContainer.getAllocatedNode(), - rmContainer.getContainerId(), getQueuePath()); - schedulerHealth.updateSchedulerPreemptionCounts(1); - } else { - schedulerHealth.updateRelease(csContext.getLastNodeUpdateTime(), - rmContainer.getAllocatedNode(), rmContainer.getContainerId(), - getQueuePath()); - } - } - @Override public void completedContainer(Resource clusterResource, FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue, boolean sortQueues) { - // Update SchedulerHealth for released / preempted container - updateSchedulerHealthForCompletedContainer(rmContainer, containerStatus); - if (application != null) { // unreserve container increase request if it previously reserved. if (rmContainer.hasIncreaseReservation()) { @@ -1330,10 +1265,6 @@ public class LeafQueue extends AbstractCSQueue { rmContainer, null, event, this, sortQueues); } } - - // Notify PreemptionManager - csContext.getPreemptionManager().removeKillableContainer( - new KillableContainer(rmContainer, node.getPartition(), queueName)); } synchronized void allocateResource(Resource clusterResource, http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6fcd6c1..7cf5565 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -18,6 +18,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,7 +49,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.nodelabels.RMNodeLabel; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -46,25 +57,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - @Private @Evolving public class ParentQueue extends AbstractCSQueue { @@ -388,11 +386,6 @@ public class ParentQueue extends AbstractCSQueue { // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY && !accessibleToPartition(node.getPartition())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip this queue=" + getQueuePath() - + ", because it is not able to access partition=" + node - .getPartition()); - } return CSAssignment.NULL_ASSIGNMENT; } @@ -438,7 +431,7 @@ public class ParentQueue extends AbstractCSQueue { resourceCalculator, clusterResource, assignedToChild.getResource(), Resources.none())) { // Track resource utilization for the parent-queue - allocateResource(clusterResource, assignedToChild.getResource(), + super.allocateResource(clusterResource, assignedToChild.getResource(), node.getPartition(), assignedToChild.isIncreasedAllocation()); // Track resource utilization in this pass of the scheduler @@ -501,38 +494,29 @@ public class ParentQueue extends AbstractCSQueue { } private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { - // Two conditions need to meet when trying to allocate: - // 1) Node doesn't have reserved container - // 2) Node's available-resource + killable-resource should > 0 - return node.getReservedContainer() == null && Resources.greaterThanOrEqual( - resourceCalculator, clusterResource, Resources - .add(node.getUnallocatedResource(), node.getTotalKillableResources()), - minimumAllocation); + return (node.getReservedContainer() == null) && + Resources.greaterThanOrEqual(resourceCalculator, clusterResource, + node.getUnallocatedResource(), minimumAllocation); } - + private ResourceLimits getResourceLimitsOfChild(CSQueue child, - Resource clusterResource, ResourceLimits parentLimits, - String nodePartition) { + Resource clusterResource, ResourceLimits parentLimits) { // Set resource-limit of a given child, child.limit = // min(my.limit - my.used + child.used, child.max) // Parent available resource = parent-limit - parent-used-resource - Resource parentMaxAvailableResource = Resources.subtract( - parentLimits.getLimit(), queueUsage.getUsed(nodePartition)); - // Deduct killable from used - Resources.addTo(parentMaxAvailableResource, - getTotalKillableResource(nodePartition)); + Resource parentMaxAvailableResource = + Resources.subtract(parentLimits.getLimit(), getUsedResources()); // Child's limit = parent-available-resource + child-used - Resource childLimit = Resources.add(parentMaxAvailableResource, - child.getQueueResourceUsage().getUsed(nodePartition)); + Resource childLimit = + Resources.add(parentMaxAvailableResource, child.getUsedResources()); // Get child's max resource - Resource childConfiguredMaxResource = Resources.multiplyAndNormalizeDown( - resourceCalculator, - labelManager.getResourceByLabel(nodePartition, clusterResource), - child.getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition), - minimumAllocation); + Resource childConfiguredMaxResource = + Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager + .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource), + child.getAbsoluteMaximumCapacity(), minimumAllocation); // Child's limit should be capped by child configured max resource childLimit = @@ -584,7 +568,7 @@ public class ParentQueue extends AbstractCSQueue { // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = - getResourceLimitsOfChild(childQueue, cluster, limits, node.getPartition()); + getResourceLimitsOfChild(childQueue, cluster, limits); assignment = childQueue.assignContainers(cluster, node, childLimits, schedulingMode); @@ -730,8 +714,8 @@ public class ParentQueue extends AbstractCSQueue { // Update all children for (CSQueue childQueue : childQueues) { // Get ResourceLimits of child queue before assign containers - ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, - clusterResource, resourceLimits, RMNodeLabelsManager.NO_LABEL); + ResourceLimits childLimits = + getResourceLimitsOfChild(childQueue, clusterResource, resourceLimits); childQueue.updateClusterResource(clusterResource, childLimits); } @@ -754,8 +738,8 @@ public class ParentQueue extends AbstractCSQueue { synchronized (this) { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); - allocateResource(clusterResource, - rmContainer.getContainer().getResource(), node.getPartition(), false); + super.allocateResource(clusterResource, rmContainer.getContainer() + .getResource(), node.getPartition(), false); } if (parent != null) { parent.recoverContainer(clusterResource, attempt, rmContainer); @@ -782,7 +766,7 @@ public class ParentQueue extends AbstractCSQueue { if (application != null) { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); - allocateResource(clusterResource, rmContainer.getContainer() + super.allocateResource(clusterResource, rmContainer.getContainer() .getResource(), node.getPartition(), false); LOG.info("movedContainer" + " queueMoveIn=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" @@ -818,79 +802,4 @@ public class ParentQueue extends AbstractCSQueue { public synchronized int getNumApplications() { return numApplications; } - - synchronized void allocateResource(Resource clusterResource, - Resource resource, String nodePartition, boolean changeContainerResource) { - super.allocateResource(clusterResource, resource, nodePartition, - changeContainerResource); - - /** - * check if we need to kill (killable) containers if maximum resource violated. - * Doing this because we will deduct killable resource when going from root. - * For example: - * <pre> - * Root - * / \ - * a b - * / \ - * a1 a2 - * </pre> - * - * a: max=10G, used=10G, killable=2G - * a1: used=8G, killable=2G - * a2: used=2G, pending=2G, killable=0G - * - * When we get queue-a to allocate resource, even if queue-a - * reaches its max resource, we deduct its used by killable, so we can allocate - * at most 2G resources. ResourceLimits passed down to a2 has headroom set to 2G. - * - * If scheduler finds a 2G available resource in existing cluster, and assigns it - * to a2, now a2's used= 2G + 2G = 4G, and a's used = 8G + 4G = 12G > 10G - * - * When this happens, we have to preempt killable container (on same or different - * nodes) of parent queue to avoid violating parent's max resource. - */ - if (getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition) - < getQueueCapacities().getAbsoluteUsedCapacity(nodePartition)) { - killContainersToEnforceMaxQueueCapacity(nodePartition, clusterResource); - } - } - - private void killContainersToEnforceMaxQueueCapacity(String partition, - Resource clusterResource) { - Iterator<RMContainer> killableContainerIter = getKillableContainers( - partition); - if (!killableContainerIter.hasNext()) { - return; - } - - Resource partitionResource = labelManager.getResourceByLabel(partition, - null); - Resource maxResource = Resources.multiply(partitionResource, - getQueueCapacities().getAbsoluteMaximumCapacity(partition)); - - while (Resources.greaterThan(resourceCalculator, partitionResource, - queueUsage.getUsed(partition), maxResource)) { - RMContainer toKillContainer = killableContainerIter.next(); - FiCaSchedulerApp attempt = csContext.getApplicationAttempt( - toKillContainer.getContainerId().getApplicationAttemptId()); - FiCaSchedulerNode node = csContext.getNode( - toKillContainer.getAllocatedNode()); - if (null != attempt && null != node) { - LeafQueue lq = attempt.getCSLeafQueue(); - lq.completedContainer(clusterResource, attempt, node, toKillContainer, - SchedulerUtils.createPreemptedContainerStatus( - toKillContainer.getContainerId(), - SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL, - null, false); - LOG.info("Killed container=" + toKillContainer.getContainerId() - + " from queue=" + lq.getQueueName() + " to make queue=" + this - .getQueueName() + "'s max-capacity enforced"); - } - - if (!killableContainerIter.hasNext()) { - break; - } - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index afac235..ee01bd1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -108,8 +108,6 @@ public abstract class AbstractContainerAllocator { assignment.setFulfilledReservation(true); } } - - assignment.setContainersToKill(result.getToKillContainers()); } return assignment; http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java index 8f749f6..1df9410 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java @@ -19,14 +19,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.util.resource.Resources; -import java.util.List; - public class ContainerAllocation { /** * Skip the locality (e.g. node-local, rack-local, any), and look at other @@ -59,7 +56,6 @@ public class ContainerAllocation { NodeType containerNodeType = NodeType.NODE_LOCAL; NodeType requestNodeType = NodeType.NODE_LOCAL; Container updatedContainer; - private List<RMContainer> toKillContainers; public ContainerAllocation(RMContainer containerToBeUnreserved, Resource resourceToBeAllocated, AllocationState state) { @@ -90,12 +86,4 @@ public class ContainerAllocation { public Container getUpdatedContainer() { return updatedContainer; } - - public void setToKillContainers(List<RMContainer> toKillContainers) { - this.toKillContainers = toKillContainers; - } - - public List<RMContainer> getToKillContainers() { - return toKillContainers; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index a5ca2d8..e168edf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -42,9 +42,6 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import java.util.ArrayList; -import java.util.List; - /** * Allocate normal (new) containers, considers locality/label, etc. Using * delayed scheduling mechanism to get better locality allocation. @@ -438,6 +435,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { return ContainerAllocation.LOCALITY_SKIPPED; } + assert Resources.greaterThan( + rc, clusterResource, available, Resources.none()); + boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer( priority, capability); @@ -460,29 +460,6 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { boolean reservationsContinueLooking = application.getCSLeafQueue().getReservationContinueLooking(); - // Check if we need to kill some containers to allocate this one - List<RMContainer> toKillContainers = null; - if (availableContainers == 0 && currentResoureLimits.isAllowPreemption()) { - Resource availableAndKillable = Resources.clone(available); - for (RMContainer killableContainer : node - .getKillableContainers().values()) { - if (null == toKillContainers) { - toKillContainers = new ArrayList<>(); - } - toKillContainers.add(killableContainer); - Resources.addTo(availableAndKillable, - killableContainer.getAllocatedResource()); - if (Resources.fitsIn(rc, - clusterResource, - capability, - availableAndKillable)) { - // Stop if we find enough spaces - availableContainers = 1; - break; - } - } - } - if (availableContainers > 0) { // Allocate... // We will only do continuous reservation when this is not allocated from @@ -522,12 +499,12 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { new ContainerAllocation(unreservedContainer, request.getCapability(), AllocationState.ALLOCATED); result.containerNodeType = type; - result.setToKillContainers(toKillContainers); return result; } else { // if we are allowed to allocate but this node doesn't have space, reserve // it or if this was an already a reserved container, reserve it again if (shouldAllocOrReserveNewContainer || rmContainer != null) { + if (reservationsContinueLooking && rmContainer == null) { // we could possibly ignoring queue capacity or user limits when // reservationsContinueLooking is set. Make sure we didn't need to @@ -545,7 +522,6 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { new ContainerAllocation(null, request.getCapability(), AllocationState.RESERVED); result.containerNodeType = type; - result.setToKillContainers(null); return result; } // Skip the locality request @@ -637,7 +613,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } ContainerAllocation doAllocation(ContainerAllocation allocationResult, - FiCaSchedulerNode node, Priority priority, + Resource clusterResource, FiCaSchedulerNode node, + SchedulingMode schedulingMode, Priority priority, RMContainer reservedContainer) { // Create the container if necessary Container container = @@ -701,7 +678,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { if (AllocationState.ALLOCATED == result.state || AllocationState.RESERVED == result.state) { - result = doAllocation(result, node, priority, reservedContainer); + result = + doAllocation(result, clusterResource, node, schedulingMode, priority, + reservedContainer); } return result; http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/KillableContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/KillableContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/KillableContainer.java deleted file mode 100644 index 675b0b4..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/KillableContainer.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption; - -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; - -public class KillableContainer { - RMContainer container; - String partition; - String leafQueueName; - - public KillableContainer(RMContainer container, String partition, String leafQueueName) { - this.container = container; - this.partition = partition; - this.leafQueueName = leafQueueName; - } - - public RMContainer getRMContainer() { - return this.container; - } - - public String getNodePartition() { - return this.partition; - } - - public String getLeafQueueName() { - return this.leafQueueName; - } -}