http://git-wip-us.apache.org/repos/asf/hadoop/blob/90dd3a81/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java new file mode 100644 index 0000000..fccd2a7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + + +/** + * Temporary data-structure tracking resource availability, pending resource + * need, current utilization for an application. + */ +public class TempAppPerPartition extends AbstractPreemptionEntity { + + // Following fields are settled and used by candidate selection policies + private final int priority; + private final ApplicationId applicationId; + + FiCaSchedulerApp app; + + TempAppPerPartition(FiCaSchedulerApp app, Resource usedPerPartition, + Resource amUsedPerPartition, Resource reserved, + Resource pendingPerPartition) { + super(app.getQueueName(), usedPerPartition, amUsedPerPartition, reserved, + pendingPerPartition); + + this.priority = app.getPriority().getPriority(); + this.applicationId = app.getApplicationId(); + this.app = app; + } + + public FiCaSchedulerApp getFiCaSchedulerApp() { + return app; + } + + public void assignPreemption(Resource killable) { + Resources.addTo(toBePreempted, killable); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(" NAME: " + getApplicationId()).append(" PRIO: ").append(priority) + .append(" CUR: ").append(getUsed()).append(" PEN: ").append(pending) + .append(" RESERVED: ").append(reserved).append(" IDEAL_ASSIGNED: ") + .append(idealAssigned).append(" PREEMPT_OTHER: ") + .append(getToBePreemptFromOther()).append(" IDEAL_PREEMPT: ") + .append(toBePreempted).append(" ACTUAL_PREEMPT: ") + .append(getActuallyToBePreempted()).append("\n"); + + return sb.toString(); + } + + void appendLogString(StringBuilder sb) { + sb.append(queueName).append(", ").append(getUsed().getMemorySize()) + .append(", ").append(getUsed().getVirtualCores()).append(", ") + .append(pending.getMemorySize()).append(", ") + .append(pending.getVirtualCores()).append(", ") + .append(idealAssigned.getMemorySize()).append(", ") + .append(idealAssigned.getVirtualCores()).append(", ") + .append(toBePreempted.getMemorySize()).append(", ") + .append(toBePreempted.getVirtualCores()).append(", ") + .append(getActuallyToBePreempted().getMemorySize()).append(", ") + .append(getActuallyToBePreempted().getVirtualCores()); + } + + public int getPriority() { + return priority; + } + + public ApplicationId getApplicationId() { + return applicationId; + } + + public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator, + Resource cluster, Resource toBeDeduct, String partition) { + if (Resources.greaterThan(resourceCalculator, cluster, + getActuallyToBePreempted(), toBeDeduct)) { + Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct); + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/90dd3a81/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 04ed135..28099c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -25,34 +25,29 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; +import java.util.Collection; /** * Temporary data-structure tracking resource availability, pending resource * need, current utilization. This is per-queue-per-partition data structure */ -public class TempQueuePerPartition { +public class TempQueuePerPartition extends AbstractPreemptionEntity { // Following fields are copied from scheduler - final String queueName; final String partition; - final Resource pending; - private final Resource current; private final Resource killable; - private final Resource reserved; private final float absCapacity; private final float absMaxCapacity; final Resource totalPartitionResource; - // Following fields are setted and used by candidate selection policies - Resource idealAssigned; - Resource toBePreempted; + // Following fields are settled and used by candidate selection policies Resource untouchableExtra; Resource preemptableExtra; - private Resource actuallyToBePreempted; double normalizedGuarantee; final ArrayList<TempQueuePerPartition> children; + private Collection<TempAppPerPartition> apps; LeafQueue leafQueue; boolean preemptionDisabled; @@ -60,8 +55,8 @@ public class TempQueuePerPartition { boolean preemptionDisabled, String partition, Resource killable, float absCapacity, float absMaxCapacity, Resource totalPartitionResource, Resource reserved, CSQueue queue) { - this.queueName = queueName; - this.current = current; + super(queueName, current, Resource.newInstance(0, 0), reserved, + Resource.newInstance(0, 0)); if (queue instanceof LeafQueue) { LeafQueue l = (LeafQueue) queue; @@ -72,11 +67,9 @@ public class TempQueuePerPartition { pending = Resources.createResource(0); } - this.idealAssigned = Resource.newInstance(0, 0); - this.actuallyToBePreempted = Resource.newInstance(0, 0); - this.toBePreempted = Resource.newInstance(0, 0); this.normalizedGuarantee = Float.NaN; this.children = new ArrayList<>(); + this.apps = new ArrayList<>(); this.untouchableExtra = Resource.newInstance(0, 0); this.preemptableExtra = Resource.newInstance(0, 0); this.preemptionDisabled = preemptionDisabled; @@ -85,7 +78,6 @@ public class TempQueuePerPartition { this.absCapacity = absCapacity; this.absMaxCapacity = absMaxCapacity; this.totalPartitionResource = totalPartitionResource; - this.reserved = reserved; } public void setLeafQueue(LeafQueue l) { @@ -95,7 +87,9 @@ public class TempQueuePerPartition { /** * When adding a child we also aggregate its pending resource needs. - * @param q the child queue to add to this queue + * + * @param q + * the child queue to add to this queue */ public void addChild(TempQueuePerPartition q) { assert leafQueue == null; @@ -103,14 +97,10 @@ public class TempQueuePerPartition { Resources.addTo(pending, q.pending); } - public ArrayList<TempQueuePerPartition> getChildren(){ + public ArrayList<TempQueuePerPartition> getChildren() { return children; } - public Resource getUsed() { - return current; - } - public Resource getUsedDeductReservd() { return Resources.subtract(current, reserved); } @@ -122,28 +112,30 @@ public class TempQueuePerPartition { Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax( Resources.subtract(getMax(), idealAssigned), Resource.newInstance(0, 0)); - // remain = avail - min(avail, (max - assigned), (current + pending - assigned)) + // remain = avail - min(avail, (max - assigned), (current + pending - + // assigned)) Resource accepted = Resources.min(rc, clusterResource, - absMaxCapIdealAssignedDelta, Resources.min(rc, clusterResource, avail, - Resources - /* - * When we're using FifoPreemptionSelector - * (considerReservedResource = false). - * - * We should deduct reserved resource to avoid excessive preemption: - * - * For example, if an under-utilized queue has used = reserved = 20. - * Preemption policy will try to preempt 20 containers - * (which is not satisfied) from different hosts. - * - * In FifoPreemptionSelector, there's no guarantee that preempted - * resource can be used by pending request, so policy will preempt - * resources repeatly. - */ - .subtract(Resources.add( - (considersReservedResource ? getUsed() : - getUsedDeductReservd()), - pending), idealAssigned))); + absMaxCapIdealAssignedDelta, + Resources.min(rc, clusterResource, avail, Resources + /* + * When we're using FifoPreemptionSelector (considerReservedResource + * = false). + * + * We should deduct reserved resource to avoid excessive preemption: + * + * For example, if an under-utilized queue has used = reserved = 20. + * Preemption policy will try to preempt 20 containers (which is not + * satisfied) from different hosts. + * + * In FifoPreemptionSelector, there's no guarantee that preempted + * resource can be used by pending request, so policy will preempt + * resources repeatly. + */ + .subtract( + Resources.add((considersReservedResource + ? getUsed() + : getUsedDeductReservd()), pending), + idealAssigned))); Resource remain = Resources.subtract(avail, accepted); Resources.addTo(idealAssigned, accepted); return remain; @@ -162,8 +154,7 @@ public class TempQueuePerPartition { untouchableExtra = Resources.none(); preemptableExtra = Resources.none(); - Resource extra = Resources.subtract(getUsed(), - getGuaranteed()); + Resource extra = Resources.subtract(getUsed(), getGuaranteed()); if (Resources.lessThan(rc, totalPartitionResource, extra, Resources.none())) { extra = Resources.none(); @@ -197,26 +188,21 @@ public class TempQueuePerPartition { @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append(" NAME: " + queueName) - .append(" CUR: ").append(current) - .append(" PEN: ").append(pending) - .append(" RESERVED: ").append(reserved) - .append(" GAR: ").append(getGuaranteed()) - .append(" NORM: ").append(normalizedGuarantee) - .append(" IDEAL_ASSIGNED: ").append(idealAssigned) - .append(" IDEAL_PREEMPT: ").append(toBePreempted) - .append(" ACTUAL_PREEMPT: ").append(actuallyToBePreempted) + sb.append(" NAME: " + queueName).append(" CUR: ").append(current) + .append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved) + .append(" GAR: ").append(getGuaranteed()).append(" NORM: ") + .append(normalizedGuarantee).append(" IDEAL_ASSIGNED: ") + .append(idealAssigned).append(" IDEAL_PREEMPT: ").append(toBePreempted) + .append(" ACTUAL_PREEMPT: ").append(getActuallyToBePreempted()) .append(" UNTOUCHABLE: ").append(untouchableExtra) - .append(" PREEMPTABLE: ").append(preemptableExtra) - .append("\n"); + .append(" PREEMPTABLE: ").append(preemptableExtra).append("\n"); return sb.toString(); } public void assignPreemption(float scalingFactor, ResourceCalculator rc, Resource clusterResource) { - Resource usedDeductKillable = Resources.subtract( - getUsed(), killable); + Resource usedDeductKillable = Resources.subtract(getUsed(), killable); Resource totalResource = Resources.add(getUsed(), pending); // The minimum resource that we need to keep for a queue is: @@ -224,7 +210,8 @@ public class TempQueuePerPartition { // // Doing this because when we calculate ideal allocation doesn't consider // reserved resource, ideal-allocation calculated could be less than - // guaranteed and total. We should avoid preempt from a queue if it is already + // guaranteed and total. We should avoid preempt from a queue if it is + // already // <= its guaranteed resource. Resource minimumQueueResource = Resources.max(rc, clusterResource, Resources.min(rc, clusterResource, totalResource, getGuaranteed()), @@ -233,33 +220,26 @@ public class TempQueuePerPartition { if (Resources.greaterThan(rc, clusterResource, usedDeductKillable, minimumQueueResource)) { toBePreempted = Resources.multiply( - Resources.subtract(usedDeductKillable, minimumQueueResource), scalingFactor); + Resources.subtract(usedDeductKillable, minimumQueueResource), + scalingFactor); } else { toBePreempted = Resources.none(); } } - public Resource getActuallyToBePreempted() { - return actuallyToBePreempted; - } - - public void setActuallyToBePreempted(Resource res) { - this.actuallyToBePreempted = res; - } - public void deductActuallyToBePreempted(ResourceCalculator rc, Resource cluster, Resource toBeDeduct) { - if (Resources.greaterThan(rc, cluster, actuallyToBePreempted, toBeDeduct)) { - Resources.subtractFrom(actuallyToBePreempted, toBeDeduct); + if (Resources.greaterThan(rc, cluster, getActuallyToBePreempted(), + toBeDeduct)) { + Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct); } - actuallyToBePreempted = Resources.max(rc, cluster, actuallyToBePreempted, - Resources.none()); + setActuallyToBePreempted(Resources.max(rc, cluster, + getActuallyToBePreempted(), Resources.none())); } void appendLogString(StringBuilder sb) { - sb.append(queueName).append(", ") - .append(current.getMemorySize()).append(", ") - .append(current.getVirtualCores()).append(", ") + sb.append(queueName).append(", ").append(current.getMemorySize()) + .append(", ").append(current.getVirtualCores()).append(", ") .append(pending.getMemorySize()).append(", ") .append(pending.getVirtualCores()).append(", ") .append(getGuaranteed().getMemorySize()).append(", ") @@ -267,9 +247,17 @@ public class TempQueuePerPartition { .append(idealAssigned.getMemorySize()).append(", ") .append(idealAssigned.getVirtualCores()).append(", ") .append(toBePreempted.getMemorySize()).append(", ") - .append(toBePreempted.getVirtualCores() ).append(", ") - .append(actuallyToBePreempted.getMemorySize()).append(", ") - .append(actuallyToBePreempted.getVirtualCores()); + .append(toBePreempted.getVirtualCores()).append(", ") + .append(getActuallyToBePreempted().getMemorySize()).append(", ") + .append(getActuallyToBePreempted().getVirtualCores()); + } + + public void addAllApps(Collection<TempAppPerPartition> orderedApps) { + this.apps = orderedApps; + } + + public Collection<TempAppPerPartition> getApps() { + return apps; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/90dd3a81/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 6db5074..cea5aa4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1045,6 +1045,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur private static final String PREEMPTION_CONFIG_PREFIX = "yarn.resourcemanager.monitor.capacity.preemption."; + private static final String INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX = + "intra-queue-preemption."; + /** If true, run the policy but do not affect the cluster with preemption and * kill events. */ public static final String PREEMPTION_OBSERVE_ONLY = @@ -1098,4 +1101,32 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur PREEMPTION_CONFIG_PREFIX + "select_based_on_reserved_containers"; public static final boolean DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS = false; + + /** + * For intra-queue preemption, priority/user-limit/fairness based selectors + * can help to preempt containers. + */ + public static final String INTRAQUEUE_PREEMPTION_ENABLED = + PREEMPTION_CONFIG_PREFIX + + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "enabled"; + public static final boolean DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED = false; + + /** + * For intra-queue preemption, consider those queues which are above used cap + * limit. + */ + public static final String INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD = + PREEMPTION_CONFIG_PREFIX + + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "minimum-threshold"; + public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD = + 0.5f; + + /** + * For intra-queue preemption, allowable maximum-preemptable limit per queue. + */ + public static final String INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT = + PREEMPTION_CONFIG_PREFIX + + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "max-allowable-limit"; + public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT = + 0.2f; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/90dd3a81/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 3c51961..214c6e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -519,6 +519,7 @@ public class LeafQueue extends AbstractCSQueue { // queue metrics are updated, more resource may be available // activate the pending applications if possible activateApplications(); + } finally { writeLock.unlock(); } @@ -1148,8 +1149,9 @@ public class LeafQueue extends AbstractCSQueue { Resource clusterResource, FiCaSchedulerApp application, String partition) { return getHeadroom(user, queueCurrentLimit, clusterResource, - computeUserLimit(application, clusterResource, user, partition, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), partition); + computeUserLimit(application.getUser(), clusterResource, user, + partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + partition); } private Resource getHeadroom(User user, @@ -1221,7 +1223,7 @@ public class LeafQueue extends AbstractCSQueue { // Compute user limit respect requested labels, // TODO, need consider headroom respect labels also Resource userLimit = - computeUserLimit(application, clusterResource, queueUser, + computeUserLimit(application.getUser(), clusterResource, queueUser, nodePartition, schedulingMode); setQueueResourceLimitsInfo(clusterResource); @@ -1259,7 +1261,7 @@ public class LeafQueue extends AbstractCSQueue { } @Lock(NoLock.class) - private Resource computeUserLimit(FiCaSchedulerApp application, + private Resource computeUserLimit(String userName, Resource clusterResource, User user, String nodePartition, SchedulingMode schedulingMode) { Resource partitionResource = labelManager.getResourceByLabel(nodePartition, @@ -1359,7 +1361,6 @@ public class LeafQueue extends AbstractCSQueue { minimumAllocation); if (LOG.isDebugEnabled()) { - String userName = application.getUser(); LOG.debug("User limit computation for " + userName + " in queue " + getQueueName() + " userLimitPercent=" + userLimit + @@ -2010,6 +2011,17 @@ public class LeafQueue extends AbstractCSQueue { .getSchedulableEntities()); } + /** + * Obtain (read-only) collection of all applications. + */ + public Collection<FiCaSchedulerApp> getAllApplications() { + Collection<FiCaSchedulerApp> apps = new HashSet<FiCaSchedulerApp>( + pendingOrderingPolicy.getSchedulableEntities()); + apps.addAll(orderingPolicy.getSchedulableEntities()); + + return Collections.unmodifiableCollection(apps); + } + // Consider the headroom for each user in the queue. // Total pending for the queue = // sum(for each user(min((user's headroom), sum(user's pending requests)))) @@ -2026,7 +2038,7 @@ public class LeafQueue extends AbstractCSQueue { if (!userNameToHeadroom.containsKey(userName)) { User user = getUser(userName); Resource headroom = Resources.subtract( - computeUserLimit(app, resources, user, partition, + computeUserLimit(app.getUser(), resources, user, partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), user.getUsed(partition)); // Make sure headroom is not negative. @@ -2048,6 +2060,16 @@ public class LeafQueue extends AbstractCSQueue { } + public synchronized Resource getUserLimitPerUser(String userName, + Resource resources, String partition) { + + // Check user resource limit + User user = getUser(userName); + + return computeUserLimit(userName, resources, user, partition, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + } + @Override public void collectSchedulerApplications( Collection<ApplicationAttemptId> apps) { @@ -2103,8 +2125,8 @@ public class LeafQueue extends AbstractCSQueue { } /** - * return all ignored partition exclusivity RMContainers in the LeafQueue, this - * will be used by preemption policy. + * @return all ignored partition exclusivity RMContainers in the LeafQueue, + * this will be used by preemption policy. */ public Map<String, TreeSet<RMContainer>> getIgnoreExclusivityRMContainers() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/90dd3a81/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index aa7ad50..ebe70d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -312,6 +313,23 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return false; } + public synchronized Map<String, Resource> getTotalPendingRequestsPerPartition() { + + Map<String, Resource> ret = new HashMap<String, Resource>(); + Resource res = null; + for (SchedulerRequestKey key : appSchedulingInfo.getSchedulerKeys()) { + ResourceRequest rr = appSchedulingInfo.getResourceRequest(key, "*"); + if ((res = ret.get(rr.getNodeLabelExpression())) == null) { + res = Resources.createResource(0, 0); + ret.put(rr.getNodeLabelExpression(), res); + } + + Resources.addTo(res, + Resources.multiply(rr.getCapability(), rr.getNumContainers())); + } + return ret; + } + public void markContainerForPreemption(ContainerId cont) { try { writeLock.lock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/90dd3a81/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index 3d3f1ea..5b8425b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -65,11 +65,14 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeSet; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doAnswer; @@ -164,13 +167,17 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { mClock); } - private void mockContainers(String containersConfig, ApplicationAttemptId attemptId, - String queueName, List<RMContainer> reservedContainers, - List<RMContainer> liveContainers) { + private void mockContainers(String containersConfig, FiCaSchedulerApp app, + ApplicationAttemptId attemptId, String queueName, + List<RMContainer> reservedContainers, List<RMContainer> liveContainers) { int containerId = 1; int start = containersConfig.indexOf("=") + 1; int end = -1; + Resource used = Resource.newInstance(0, 0); + Resource pending = Resource.newInstance(0, 0); + Priority pri = Priority.newInstance(0); + while (start < containersConfig.length()) { while (start < containersConfig.length() && containersConfig.charAt(start) != '(') { @@ -192,43 +199,52 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { // now we found start/end, get container values String[] values = containersConfig.substring(start + 1, end).split(","); - if (values.length != 6) { + if (values.length < 6 || values.length > 8) { throw new IllegalArgumentException("Format to define container is:" - + "(priority,resource,host,expression,repeat,reserved)"); + + "(priority,resource,host,expression,repeat,reserved, pending)"); } - Priority pri = Priority.newInstance(Integer.valueOf(values[0])); + pri.setPriority(Integer.valueOf(values[0])); Resource res = parseResourceFromString(values[1]); NodeId host = NodeId.newInstance(values[2], 1); - String exp = values[3]; + String label = values[3]; + String userName = "user"; int repeat = Integer.valueOf(values[4]); boolean reserved = Boolean.valueOf(values[5]); + if (values.length >= 7) { + Resources.addTo(pending, parseResourceFromString(values[6])); + } + if (values.length == 8) { + userName = values[7]; + } for (int i = 0; i < repeat; i++) { Container c = mock(Container.class); + Resources.addTo(used, res); when(c.getResource()).thenReturn(res); when(c.getPriority()).thenReturn(pri); SchedulerRequestKey sk = SchedulerRequestKey.extractFrom(c); RMContainerImpl rmc = mock(RMContainerImpl.class); when(rmc.getAllocatedSchedulerKey()).thenReturn(sk); when(rmc.getAllocatedNode()).thenReturn(host); - when(rmc.getNodeLabelExpression()).thenReturn(exp); + when(rmc.getNodeLabelExpression()).thenReturn(label); when(rmc.getAllocatedResource()).thenReturn(res); when(rmc.getContainer()).thenReturn(c); when(rmc.getApplicationAttemptId()).thenReturn(attemptId); when(rmc.getQueueName()).thenReturn(queueName); - final ContainerId cId = ContainerId.newContainerId(attemptId, containerId); - when(rmc.getContainerId()).thenReturn( - cId); + final ContainerId cId = ContainerId.newContainerId(attemptId, + containerId); + when(rmc.getContainerId()).thenReturn(cId); doAnswer(new Answer<Integer>() { @Override public Integer answer(InvocationOnMock invocation) throws Throwable { - return cId.compareTo(((RMContainer) invocation.getArguments()[0]) - .getContainerId()); + return cId.compareTo( + ((RMContainer) invocation.getArguments()[0]).getContainerId()); } }).when(rmc).compareTo(any(RMContainer.class)); if (containerId == 1) { when(rmc.isAMContainer()).thenReturn(true); + when(app.getAMResource(label)).thenReturn(res); } if (reserved) { @@ -243,25 +259,44 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { // If this is a non-exclusive allocation String partition = null; - if (exp.isEmpty() + if (label.isEmpty() && !(partition = nodeIdToSchedulerNodes.get(host).getPartition()) - .isEmpty()) { + .isEmpty()) { LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName); - Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers = - queue.getIgnoreExclusivityRMContainers(); + Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers = queue + .getIgnoreExclusivityRMContainers(); if (!ignoreExclusivityContainers.containsKey(partition)) { ignoreExclusivityContainers.put(partition, new TreeSet<RMContainer>()); } ignoreExclusivityContainers.get(partition).add(rmc); } - LOG.debug("add container to app=" + attemptId + " res=" + res - + " node=" + host + " nodeLabelExpression=" + exp + " partition=" + LOG.debug("add container to app=" + attemptId + " res=" + res + " node=" + + host + " nodeLabelExpression=" + label + " partition=" + partition); containerId++; } + // Some more app specific aggregated data can be better filled here. + when(app.getPriority()).thenReturn(pri); + when(app.getUser()).thenReturn(userName); + when(app.getCurrentConsumption()).thenReturn(used); + when(app.getCurrentReservation()) + .thenReturn(Resources.createResource(0, 0)); + + Map<String, Resource> pendingForDefaultPartition = + new HashMap<String, Resource>(); + // Add for default partition for now. + pendingForDefaultPartition.put(label, pending); + when(app.getTotalPendingRequestsPerPartition()) + .thenReturn(pendingForDefaultPartition); + + // need to set pending resource in resource usage as well + ResourceUsage ru = new ResourceUsage(); + ru.setUsed(label, used); + when(app.getAppAttemptResourceUsage()).thenReturn(ru); + start = end + 1; } } @@ -277,6 +312,8 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { */ private void mockApplications(String appsConfig) { int id = 1; + HashMap<String, HashSet<String>> userMap = new HashMap<String, HashSet<String>>(); + LeafQueue queue = null; for (String a : appsConfig.split(";")) { String[] strs = a.split("\t"); String queueName = strs[0]; @@ -285,24 +322,49 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { List<RMContainer> liveContainers = new ArrayList<RMContainer>(); List<RMContainer> reservedContainers = new ArrayList<RMContainer>(); ApplicationId appId = ApplicationId.newInstance(0L, id); - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + ApplicationAttemptId appAttemptId = ApplicationAttemptId + .newInstance(appId, 1); - mockContainers(strs[1], appAttemptId, queueName, reservedContainers, + FiCaSchedulerApp app = mock(FiCaSchedulerApp.class); + when(app.getAMResource(anyString())) + .thenReturn(Resources.createResource(0, 0)); + mockContainers(strs[1], app, appAttemptId, queueName, reservedContainers, liveContainers); + LOG.debug("Application mock: queue: " + queueName + ", appId:" + appId); - FiCaSchedulerApp app = mock(FiCaSchedulerApp.class); when(app.getLiveContainers()).thenReturn(liveContainers); when(app.getReservedContainers()).thenReturn(reservedContainers); when(app.getApplicationAttemptId()).thenReturn(appAttemptId); when(app.getApplicationId()).thenReturn(appId); - when(app.getPriority()).thenReturn(Priority.newInstance(0)); // add to LeafQueue - LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName); + queue = (LeafQueue) nameToCSQueues.get(queueName); queue.getApplications().add(app); + queue.getAllApplications().add(app); + HashSet<String> users = userMap.get(queueName); + if (null == users) { + users = new HashSet<String>(); + userMap.put(queueName, users); + } + + users.add(app.getUser()); id++; } + + for (String queueName : userMap.keySet()) { + queue = (LeafQueue) nameToCSQueues.get(queueName); + // Currently we have user-limit test support only for default label. + Resource totResoucePerPartition = partitionToResource.get(""); + Resource capacity = Resources.multiply(totResoucePerPartition, + queue.getQueueCapacities().getAbsoluteCapacity()); + HashSet<String> users = userMap.get(queue.getQueueName()); + Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size()); + for (String user : users) { + when(queue.getUserLimitPerUser(eq(user), any(Resource.class), + anyString())).thenReturn(userLimit); + } + } } private void addContainerToSchedulerNode(NodeId nodeId, RMContainer container, @@ -436,10 +498,18 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { new Comparator<FiCaSchedulerApp>() { @Override public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { - return a1.getApplicationId().compareTo(a2.getApplicationId()); + if (a1.getPriority() != null + && !a1.getPriority().equals(a2.getPriority())) { + return a1.getPriority().compareTo(a2.getPriority()); + } + + int res = a1.getApplicationId() + .compareTo(a2.getApplicationId()); + return res; } }); when(leafQueue.getApplications()).thenReturn(apps); + when(leafQueue.getAllApplications()).thenReturn(apps); OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class); when(so.getPreemptionIterator()).thenAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { @@ -518,10 +588,15 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { float absUsed = Resources.divide(rc, totResoucePerPartition, parseResourceFromString(values[2].trim()), totResoucePerPartition) + epsilon; + float used = Resources.divide(rc, totResoucePerPartition, + parseResourceFromString(values[2].trim()), + parseResourceFromString(values[0].trim())) + epsilon; Resource pending = parseResourceFromString(values[3].trim()); qc.setAbsoluteCapacity(partitionName, absGuaranteed); qc.setAbsoluteMaximumCapacity(partitionName, absMax); qc.setAbsoluteUsedCapacity(partitionName, absUsed); + qc.setUsedCapacity(partitionName, used); + when(queue.getUsedCapacity()).thenReturn(used); ru.setPending(partitionName, pending); if (!isParent(queueExprArray, idx)) { LeafQueue lq = (LeafQueue) queue; @@ -536,6 +611,7 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { reserved = parseResourceFromString(values[4].trim()); ru.setReserved(partitionName, reserved); } + LOG.debug("Setup queue=" + queueName + " partition=" + partitionName + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax + ",abs_used" + absUsed + ",pending_resource=" + pending http://git-wip-us.apache.org/repos/asf/hadoop/blob/90dd3a81/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java new file mode 100644 index 0000000..19fb0d2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java @@ -0,0 +1,868 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Test class for IntraQueuePreemption scenarios. + */ +public class TestProportionalCapacityPreemptionPolicyIntraQueue + extends + ProportionalCapacityPreemptionPolicyMockFramework { + @Before + public void setup() { + super.setup(); + conf.setBoolean( + CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true); + policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); + } + + @Test + public void testSimpleIntraQueuePreemption() throws IOException { + /** + * The simplest test preemption, Queue structure is: + * + * <pre> + * root + * / | | \ + * a b c d + * </pre> + * + * Guaranteed resource of a/b/c/d are 11:40:20:29 Total cluster resource = + * 100 + * Scenario: + * Queue B has few running apps and two high priority apps have demand. + * Apps which are running at low priority (4) will preempt few of its + * resources to meet the demand. + */ + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 80 120 0]);" + // root + "-a(=[11 100 11 50 0]);" + // a + "-b(=[40 100 38 60 0]);" + // b + "-c(=[20 100 10 10 0]);" + // c + "-d(=[29 100 20 0 0])"; // d + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved, + // pending) + "a\t" // app1 in a + + "(1,1,n1,,6,false,25);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,5,false,25);" + // app2 a + "b\t" // app3 in b + + "(4,1,n1,,34,false,20);" + // app3 b + "b\t" // app4 in b + + "(4,1,n1,,2,false,10);" + // app4 b + "b\t" // app4 in b + + "(5,1,n1,,1,false,10);" + // app5 b + "b\t" // app4 in b + + "(6,1,n1,,1,false,10);" + // app6 in b + "c\t" // app1 in a + + "(1,1,n1,,10,false,10);" + "d\t" // app7 in c + + "(1,1,n1,,20,false,0)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // For queue B, app3 and app4 were of lower priority. Hence take 8 + // containers from them by hitting the intraQueuePreemptionDemand of 20%. + verify(mDisp, times(1)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + verify(mDisp, times(7)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testNoPreemptionForSamePriorityApps() throws IOException { + /** + * Queue structure is: + * + * <pre> + * root + * / | | \ + * a b c d + * </pre> + * + * Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource = + * 100 + * Scenario: In queue A/B, all apps are running at same priority. However + * there are many demands also from these apps. Since all apps are at same + * priority, preemption should not occur here. + */ + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 80 120 0]);" + // root + "-a(=[10 100 10 50 0]);" + // a + "-b(=[40 100 40 60 0]);" + // b + "-c(=[20 100 10 10 0]);" + // c + "-d(=[30 100 20 0 0])"; // d + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved, + // pending) + "a\t" // app1 in a + + "(1,1,n1,,6,false,25);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,5,false,25);" + // app2 a + "b\t" // app3 in b + + "(1,1,n1,,34,false,20);" + // app3 b + "b\t" // app4 in b + + "(1,1,n1,,2,false,10);" + // app4 b + "b\t" // app4 in b + + "(1,1,n1,,1,false,20);" + // app5 b + "b\t" // app4 in b + + "(1,1,n1,,1,false,10);" + // app6 in b + "c\t" // app1 in a + + "(1,1,n1,,10,false,10);" + "d\t" // app7 in c + + "(1,1,n1,,20,false,0)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // For queue B, none of the apps should be preempted. + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(5)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(6)))); + } + + @Test + public void testNoPreemptionWhenQueueIsUnderCapacityLimit() + throws IOException { + /** + * Queue structure is: + * + * <pre> + * root + * / \ + * a b + * </pre> + * + * Scenario: + * Guaranteed resource of a/b are 40:60 Total cluster resource = 100 BY + * default, this limit is 50%. Test to verify that there wont be any + * preemption since used capacity is under 50% for queue a/b even though + * there are demands from high priority apps in queue. + */ + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 35 80 0]);" + // root + "-a(=[40 100 10 50 0]);" + // a + "-b(=[60 100 25 30 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,5,false,25);" + // app1 a + "a\t" // app2 in a + + "(2,1,n1,,5,false,25);" + // app2 a + "b\t" // app3 in b + + "(4,1,n1,,40,false,20);" + // app3 b + "b\t" // app1 in a + + "(6,1,n1,,5,false,20)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // For queue A/B, none of the apps should be preempted as used capacity + // is under 50%. + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + } + + @Test + public void testLimitPreemptionWithMaxIntraQueuePreemptableLimit() + throws IOException { + /** + * Queue structure is: + * + * <pre> + * root + * / \ + * a b + * </pre> + * + * Guaranteed resource of a/b are 40:60 Total cluster resource = 100 + * maxIntraQueuePreemptableLimit by default is 50%. This test is to verify + * that the maximum preemption should occur upto 50%, eventhough demand is + * more. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 55 170 0]);" + // root + "-a(=[40 100 10 50 0]);" + // a + "-b(=[60 100 45 120 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,5,false,25);" + // app1 a + "a\t" // app2 in a + + "(2,1,n1,,5,false,25);" + // app2 a + "b\t" // app3 in b + + "(4,1,n1,,40,false,20);" + // app3 b + "b\t" // app1 in a + + "(6,1,n1,,5,false,100)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // For queueB, eventhough app4 needs 100 resources, only 30 resources were + // preempted. (max is 50% of guaranteed cap of any queue + // "maxIntraQueuePreemptable") + verify(mDisp, times(30)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testLimitPreemptionWithTotalPreemptedResourceAllowed() + throws IOException { + /** + * Queue structure is: + * + * <pre> + * root + * / \ + * a b + * </pre> + * + * Scenario: + * Guaranteed resource of a/b are 40:60 Total cluster resource = 100 + * totalPreemption allowed is 10%. This test is to verify that only + * 10% is preempted. + */ + + // report "ideal" preempt as 10%. Ensure preemption happens only for 10% + conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, + (float) 0.1); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 55 170 0]);" + // root + "-a(=[40 100 10 50 0]);" + // a + "-b(=[60 100 45 120 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,5,false,25);" + // app1 a + "a\t" // app2 in a + + "(2,1,n1,,5,false,25);" + // app2 a + "b\t" // app3 in b + + "(4,1,n1,,40,false,20);" + // app3 b + "b\t" // app1 in a + + "(6,1,n1,,5,false,100)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // For queue B eventhough app4 needs 100 resources, only 10 resources were + // preempted. This is the 10% limit of TOTAL_PREEMPTION_PER_ROUND. + verify(mDisp, times(10)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testAlreadySelectedContainerFromInterQueuePreemption() + throws IOException { + /** + * Queue structure is: + * + * <pre> + * root + * / \ + * a b + * </pre> + * + * Scenario: + * Guaranteed resource of a/b are 40:60 Total cluster resource = 100 + * QueueB is under utilized and QueueA has to release 9 containers here. + * However within queue A, high priority app has also a demand for 20. + * So additional 11 more containers will be preempted making a tota of 20. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 95 170 0]);" + // root + "-a(=[60 100 70 50 0]);" + // a + "-b(=[40 100 25 120 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,50,false,15);" + // app1 a + "a\t" // app2 in a + + "(2,1,n1,,20,false,20);" + // app2 a + "b\t" // app3 in b + + "(4,1,n1,,20,false,20);" + // app3 b + "b\t" // app1 in a + + "(4,1,n1,,5,false,100)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // As per intra queue preemption algorithm, 20 more containers were needed + // for app2 (in queue a). Inter queue pre-emption had already preselected 9 + // containers and hence preempted only 11 more. + verify(mDisp, times(20)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testSkipAMContainersInInterQueuePreemption() throws IOException { + /** + * Queue structure is: + * + * <pre> + * root + * / \ + * a b + * </pre> + * + * Scenario: + * Guaranteed resource of a/b are 60:40 Total cluster resource = 100 + * While preempting containers during intra-queue preemption, AM containers + * will be spared for now. Verify the same. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 170 0]);" + // root + "-a(=[60 100 60 50 0]);" + // a + "-b(=[40 100 40 120 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,30,false,10);" + "a\t" // app2 in a + + "(1,1,n1,,10,false,20);" + "a\t" // app3 in a + + "(2,1,n1,,20,false,20);" + "b\t" // app4 in b + + "(4,1,n1,,20,false,20);" + "b\t" // app5 in a + + "(4,1,n1,,20,false,100)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Ensure that only 9 containers are preempted from app2 (sparing 1 AM) + verify(mDisp, times(11)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(9)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testSkipAMContainersInInterQueuePreemptionSingleApp() + throws IOException { + /** + * Queue structure is: + * + * <pre> + * root + * / \ + * a b + * </pre> + * + * Scenario: + * Guaranteed resource of a/b are 50:50 Total cluster resource = 100 + * Spare Am container from a lower priority app during its preemption + * cycle. Eventhough there are more demand and no other low priority + * apps are present, still AM contaier need to soared. + */ + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 170 0]);" + // root + "-a(=[50 100 50 50 0]);" + // a + "-b(=[50 100 50 120 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,10,false,10);" + "a\t" // app1 in a + + "(2,1,n1,,40,false,10);" + "b\t" // app2 in a + + "(4,1,n1,,20,false,20);" + "b\t" // app3 in b + + "(4,1,n1,,30,false,100)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Make sure that app1's Am container is spared. Only 9/10 is preempted. + verify(mDisp, times(9)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testNoPreemptionForSingleApp() throws IOException { + /** + * Queue structure is: + * + * <pre> + * root + * / \ + * a b + * </pre> + * + * Scenario: + * Guaranteed resource of a/b are 60:40 Total cluster resource = 100 + * Only one app is running in queue. And it has more demand but no + * resource are available in queue. Preemption must not occur here. + */ + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 20 50 0]);" + // root + "-a(=[60 100 20 50 0]);" + // a + "-b(=[40 100 0 0 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(4,1,n1,,20,false,50)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Ensure there are 0 preemptions since only one app is running in queue. + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + @Test + public void testOverutilizedQueueResourceWithInterQueuePreemption() + throws IOException { + /** + * Queue structure is: + * + * <pre> + * root + * / \ + * a b + * </pre> + * Scenario: + * Guaranteed resource of a/b are 20:80 Total cluster resource = 100 + * QueueB is under utilized and 20 resource will be released from queueA. + * 10 containers will also selected for intra-queue too but it will be + * pre-selected. + */ + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 70 0]);" + // root + "-a(=[20 100 100 30 0]);" + // a + "-b(=[80 100 0 20 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,50,false,0);" + "a\t" // app1 in a + + "(3,1,n1,,50,false,30);" + "b\t" // app2 in a + + "(4,1,n1,,0,false,20)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Complete demand request from QueueB for 20 resource must be preempted. + verify(mDisp, times(20)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testNodePartitionIntraQueuePreemption() throws IOException { + /** + * The simplest test of node label, Queue structure is: + * + * <pre> + * root + * / \ + * a b + * </pre> + * + * Scenario: + * Both a/b can access x, and guaranteed capacity of them is 50:50. Two + * nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster, + * app1/app2/app3 in a, and app4/app5 in b. app1 uses 50 x, app2 uses 50 + * NO_LABEL, app3 uses 50 x, app4 uses 50 NO_LABEL. a has 20 pending + * resource for x for app3 of priority 2 + * + * After preemption, it should preempt 20 from app1 + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;" + // default partition + "x=100,true"; // partition=x + String nodesConfig = "n1=x;" + // n1 has partition=x + "n2="; // n2 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100],x=[100 100 100 100]);" + // root + "-a(=[50 100 50 50],x=[50 100 50 50]);" + // a + "-b(=[50 100 50 50],x=[50 100 50 50])"; // b + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n1,x,50,false,10);" + // 50 * x in n1 + "a\t" // app2 in a + + "(2,1,n1,x,0,false,20);" + // 0 * x in n1 + "a\t" // app2 in a + + "(1,1,n2,,50,false);" + // 50 default in n2 + "b\t" // app3 in b + + "(1,1,n1,x,50,false);" + // 50 * x in n1 + "b\t" // app4 in b + + "(1,1,n2,,50,false)"; // 50 default in n2 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 20 preempted from app1 + verify(mDisp, times(20)) + .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1)))); + verify(mDisp, never()) + .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2)))); + verify(mDisp, never()) + .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(3)))); + } + + @Test + public void testComplexIntraQueuePreemption() throws IOException { + /** + * The complex test preemption, Queue structure is: + * + * <pre> + * root + * / | | \ + * a b c d + * </pre> + * + * Scenario: + * Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource = + * 100 + * All queues under its capacity, but within each queue there are many + * under served applications. + */ + + // report "ideal" preempt as 50%. Ensure preemption happens only for 50% + conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, + (float) 0.5); + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 75 130 0]);" + // root + "-a(=[10 100 5 50 0]);" + // a + "-b(=[40 100 35 60 0]);" + // b + "-c(=[20 100 10 10 0]);" + // c + "-d(=[30 100 25 10 0])"; // d + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved, + // pending) + "a\t" // app1 in a + + "(1,1,n1,,5,false,25);" + // app1 a + "a\t" + + "(4,1,n1,,0,false,25);" + // app2 a + "a\t" + + "(5,1,n1,,0,false,2);" + // app3 a + "b\t" + + "(3,1,n1,,5,false,20);" + // app4 b + "b\t" + + "(4,1,n1,,15,false,10);" + // app5 b + "b\t" + + "(4,1,n1,,10,false,10);" + // app6 b + "b\t" + + "(5,1,n1,,3,false,5);" + // app7 b + "b\t" + + "(5,1,n1,,0,false,2);" + // app8 b + "b\t" + + "(6,1,n1,,2,false,10);" + // app9 in b + "c\t" + + "(1,1,n1,,8,false,10);" + // app10 in c + "c\t" + + "(1,1,n1,,2,false,5);" + // app11 in c + "c\t" + + "(2,1,n1,,0,false,3);" + "d\t" // app12 in c + + "(2,1,n1,,25,false,0);" + "d\t" // app13 in d + + "(1,1,n1,,0,false,20)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // High priority app in queueA has 30 resource demand. But low priority + // app has only 5 resource. Hence preempt 4 here sparing AM. + verify(mDisp, times(4)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + // Multiple high priority apps has demand of 17. This will be preempted + // from another set of low priority apps. + verify(mDisp, times(4)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + verify(mDisp, times(9)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(6)))); + verify(mDisp, times(4)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(5)))); + // Only 3 resources will be freed in this round for queue C as we + // are trying to save AM container. + verify(mDisp, times(2)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(10)))); + verify(mDisp, times(1)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(11)))); + } + + @Test + public void testIntraQueuePreemptionWithTwoUsers() + throws IOException { + /** + * Queue structure is: + * + * <pre> + * root + * / \ + * a b + * </pre> + * + * Scenario: + * Guaranteed resource of a/b are 40:60 Total cluster resource = 100 + * Consider 2 users in a queue, assume minimum user limit factor is 50%. + * Hence in queueB of 40, each use has a quota of 20. app4 of high priority + * has a demand of 30 and its already using 5. Adhering to userlimit only + * 15 more must be preempted. If its same user,20 would have been preempted + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 55 170 0]);" + // root + "-a(=[60 100 10 50 0]);" + // a + "-b(=[40 100 40 120 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,5,false,25);" + // app1 a + "a\t" // app2 in a + + "(2,1,n1,,5,false,25);" + // app2 a + "b\t" // app3 in b + + "(4,1,n1,,35,false,20,user1);" + // app3 b + "b\t" // app4 in b + + "(6,1,n1,,5,false,30,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Considering user-limit of 50% since only 2 users are there, only preempt + // 15 more (5 is already running) eventhough demand is for 30. + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testComplexNodePartitionIntraQueuePreemption() + throws IOException { + /** + * The simplest test of node label, Queue structure is: + * + * <pre> + * root + * / \ + * a b + * </pre> + * + * Scenario: + * Both a/b can access x, and guaranteed capacity of them is 50:50. Two + * nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster, + * app1-app4 in a, and app5-app9 in b. + * + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;" + // default partition + "x=100,true"; // partition=x + String nodesConfig = "n1=x;" + // n1 has partition=x + "n2="; // n2 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100],x=[100 100 100 100]);" + // root + "-a(=[50 100 50 50],x=[50 100 40 50]);" + // a + "-b(=[50 100 35 50],x=[50 100 50 50])"; // b + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n1,x,35,false,10);" + // 20 * x in n1 + "a\t" // app2 in a + + "(1,1,n1,x,5,false,10);" + // 20 * x in n1 + "a\t" // app3 in a + + "(2,1,n1,x,0,false,20);" + // 0 * x in n1 + "a\t" // app4 in a + + "(1,1,n2,,50,false);" + // 50 default in n2 + "b\t" // app5 in b + + "(1,1,n1,x,50,false);" + // 50 * x in n1 + "b\t" // app6 in b + + "(1,1,n2,,25,false);" + // 25 * default in n2 + "b\t" // app7 in b + + "(1,1,n2,,3,false);" + // 3 * default in n2 + "b\t" // app8 in b + + "(1,1,n2,,2,false);" + // 2 * default in n2 + "b\t" // app9 in b + + "(5,1,n2,,5,false,30)"; // 50 default in n2 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Label X: app3 has demand of 20 for label X. Hence app2 will loose + // 4 (sparing AM) and 16 more from app1 till preemption limit is met. + verify(mDisp, times(16)) + .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1)))); + verify(mDisp, times(4)) + .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2)))); + + // Default Label:For a demand of 30, preempt from all low priority + // apps of default label. 25 will be preempted as preemption limit is + // met. + verify(mDisp, times(1)) + .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(8)))); + verify(mDisp, times(2)) + .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(7)))); + verify(mDisp, times(22)) + .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(6)))); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
