YARN-1015. FS should watch node resource utilization and allocate opportunistic containers if appropriate.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bdf3e661 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bdf3e661 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bdf3e661 Branch: refs/heads/YARN-1011 Commit: bdf3e661b7558020c284442f24ade677406b19e8 Parents: eba8436 Author: Haibo Chen <haiboc...@apache.org> Authored: Fri Nov 17 07:47:32 2017 -0800 Committer: Haibo Chen <haiboc...@apache.org> Committed: Sun Jan 28 18:08:11 2018 -0800 ---------------------------------------------------------------------- .../sls/scheduler/FairSchedulerMetrics.java | 4 +- .../hadoop/yarn/conf/YarnConfiguration.java | 5 + .../src/main/resources/yarn-default.xml | 13 + .../scheduler/SchedulerNode.java | 48 ++ .../scheduler/fair/FSAppAttempt.java | 166 ++++--- .../scheduler/fair/FSLeafQueue.java | 51 +- .../scheduler/fair/FSParentQueue.java | 36 +- .../resourcemanager/scheduler/fair/FSQueue.java | 39 +- .../scheduler/fair/FairScheduler.java | 97 ++-- .../fair/FairSchedulerConfiguration.java | 5 + .../scheduler/fair/Schedulable.java | 17 +- .../DominantResourceFairnessPolicy.java | 8 +- .../fair/policies/FairSharePolicy.java | 4 +- .../webapp/dao/FairSchedulerQueueInfo.java | 2 +- .../yarn/server/resourcemanager/MockNodes.java | 60 ++- .../TestWorkPreservingRMRestart.java | 2 +- .../scheduler/fair/FakeSchedulable.java | 9 +- .../scheduler/fair/TestAppRunnability.java | 9 +- .../scheduler/fair/TestFSAppAttempt.java | 4 +- .../scheduler/fair/TestFSLeafQueue.java | 4 +- .../scheduler/fair/TestFSSchedulerNode.java | 4 +- .../scheduler/fair/TestFairScheduler.java | 468 +++++++++++++++++-- .../scheduler/fair/TestSchedulingPolicy.java | 10 +- 23 files changed, 861 insertions(+), 204 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java index a5aee74..1f4e7c7 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java @@ -75,7 +75,7 @@ public class FairSchedulerMetrics extends SchedulerMetrics { case DEMAND: return schedulable.getDemand().getMemorySize(); case USAGE: - return schedulable.getResourceUsage().getMemorySize(); + return schedulable.getGuaranteedResourceUsage().getMemorySize(); case MINSHARE: return schedulable.getMinShare().getMemorySize(); case MAXSHARE: @@ -96,7 +96,7 @@ public class FairSchedulerMetrics extends SchedulerMetrics { case DEMAND: return schedulable.getDemand().getVirtualCores(); case USAGE: - return schedulable.getResourceUsage().getVirtualCores(); + return schedulable.getGuaranteedResourceUsage().getVirtualCores(); case MINSHARE: return schedulable.getMinShare().getVirtualCores(); case MAXSHARE: http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index ed8404c..15f6480 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -268,6 +268,11 @@ public class YarnConfiguration extends Configuration { /** UserGroupMappingPlacementRule configuration string. */ public static final String USER_GROUP_PLACEMENT_RULE = "user-group"; + public static final String RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED = + RM_PREFIX + "scheduler.oversubscription.enabled"; + public static final boolean DEFAULT_RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED + = false; + /** Enable Resource Manager webapp ui actions */ public static final String RM_WEBAPP_UI_ACTIONS_ENABLED = RM_PREFIX + "webapp.ui-actions.enabled"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index da31694..7962dae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -389,6 +389,19 @@ </property> <property> + <description> + If set to true, the scheduler will try to over-allocate resources on the + nodes that allow overallocation. To enable overallocatin on a node, set + {code}yarn.nodemanager.overallocation.memory-utilization-threshold{code} + and + {code}yarn.nodemanager.overallocation.cpu-utilization-threshold{code} + to a number in the range (0.0, 1.0) + </description> + <name>yarn.resourcemanager.scheduler.oversubscription.enabled</name> + <value>false</value> + </property> + + <property> <description>Enable RM to recover state after starting. If true, then yarn.resourcemanager.store.class must be specified. </description> <name>yarn.resourcemanager.recovery.enabled</name> http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 349944e..e942981 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; @@ -59,6 +61,10 @@ public abstract class SchedulerNode { private static final Log LOG = LogFactory.getLog(SchedulerNode.class); private Resource capacity; + // The resource available within the node's capacity that can be given out + // to run GUARANTEED containers, including reserved, preempted and any + // remaining free resources. Resources allocated to OPPORTUNISTIC containers + // are tracked in allocatedResourceOpportunistic private Resource unallocatedResource = Resource.newInstance(0, 0); private RMContainer reservedContainer; @@ -596,6 +602,48 @@ public abstract class SchedulerNode { return this.nodeUtilization; } + /** + * Get the amount of resources that can be allocated to opportunistic + * containers in the case of overallocation. It is calculated as + * node capacity - (node utilization + resources of allocated-yet-not-started + * containers). + * @return the amount of resources that are available to be allocated to + * opportunistic containers + */ + public synchronized Resource allowedResourceForOverAllocation() { + OverAllocationInfo overAllocationInfo = rmNode.getOverAllocationInfo(); + if (overAllocationInfo == null) { + LOG.debug("Overallocation is disabled on node: " + rmNode.getHostName()); + return Resources.none(); + } + + ResourceUtilization projectedNodeUtilization = ResourceUtilization. + newInstance(getNodeUtilization()); + // account for resources allocated in this heartbeat + projectedNodeUtilization.addTo( + (int) (resourceAllocatedPendingLaunch.getMemorySize()), 0, + (float) resourceAllocatedPendingLaunch.getVirtualCores() / + capacity.getVirtualCores()); + + ResourceThresholds thresholds = + overAllocationInfo.getOverAllocationThresholds(); + Resource overAllocationThreshold = Resources.createResource( + (long) (capacity.getMemorySize() * thresholds.getMemoryThreshold()), + (int) (capacity.getVirtualCores() * thresholds.getCpuThreshold())); + long allowedMemory = Math.max(0, overAllocationThreshold.getMemorySize() + - projectedNodeUtilization.getPhysicalMemory()); + int allowedCpu = Math.max(0, (int) + (overAllocationThreshold.getVirtualCores() - + projectedNodeUtilization.getCPU() * capacity.getVirtualCores())); + + Resource resourceAllowedForOpportunisticContainers = + Resources.createResource(allowedMemory, allowedCpu); + + // TODO cap the resources allocated to OPPORTUNISTIC containers on a node + // in terms of its capacity. i.e. return min(max_ratio * capacity, allowed) + return resourceAllowedForOpportunisticContainers; + } + private static class ContainerInfo { private final RMContainer container; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 0305702..970ffcb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -169,7 +170,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt rmContainer.getNodeLabelExpression(), getUser(), 1, containerResource); this.attemptResourceUsage.decUsed(containerResource); - getQueue().decUsedResource(containerResource); + getQueue().decUsedGuaranteedResource(containerResource); // Clear resource utilization metrics cache. lastMemoryAggregateAllocationUpdateTime = -1; @@ -178,30 +179,35 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } } - private void unreserveInternal( + private boolean unreserveInternal( SchedulerRequestKey schedulerKey, FSSchedulerNode node) { try { writeLock.lock(); Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get( schedulerKey); - RMContainer reservedContainer = reservedContainers.remove( - node.getNodeID()); - if (reservedContainers.isEmpty()) { - this.reservedContainers.remove(schedulerKey); - } + boolean unreserved = false; + if (reservedContainers != null) { + RMContainer reservedContainer = reservedContainers.remove( + node.getNodeID()); + if (reservedContainers.isEmpty()) { + this.reservedContainers.remove(schedulerKey); + } - // Reset the re-reservation count - resetReReservations(schedulerKey); + // Reset the re-reservation count + resetReReservations(schedulerKey); - Resource resource = reservedContainer.getContainer().getResource(); - this.attemptResourceUsage.decReserved(resource); + Resource resource = reservedContainer.getContainer().getResource(); + this.attemptResourceUsage.decReserved(resource); + unreserved = true; - LOG.info( - "Application " + getApplicationId() + " unreserved " + " on node " - + node + ", currently has " + reservedContainers.size() - + " at priority " + schedulerKey.getPriority() - + "; currentReservation " + this.attemptResourceUsage - .getReserved()); + LOG.info( + "Application " + getApplicationId() + " unreserved " + " on node " + + node + ", currently has " + reservedContainers.size() + + " at priority " + schedulerKey.getPriority() + + "; currentReservation " + this.attemptResourceUsage + .getReserved()); + } + return unreserved; } finally { writeLock.unlock(); } @@ -229,7 +235,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt SchedulingPolicy policy = fsQueue.getPolicy(); Resource queueFairShare = fsQueue.getFairShare(); - Resource queueUsage = fsQueue.getResourceUsage(); + Resource queueUsage = fsQueue.getGuaranteedResourceUsage(); Resource clusterResource = this.scheduler.getClusterResource(); Resource clusterUsage = this.scheduler.getRootQueueMetrics() .getAllocatedResources(); @@ -420,7 +426,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt public RMContainer allocate(NodeType type, FSSchedulerNode node, SchedulerRequestKey schedulerKey, PendingAsk pendingAsk, - Container reservedContainer) { + Container reservedContainer, boolean opportunistic) { RMContainer rmContainer; Container container; @@ -445,9 +451,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } container = reservedContainer; + ExecutionType executionType = opportunistic ? + ExecutionType.OPPORTUNISTIC : ExecutionType.GUARANTEED; if (container == null) { container = createContainer(node, pendingAsk.getPerAllocationResource(), - schedulerKey); + schedulerKey, executionType); } // Create RMContainer @@ -463,8 +471,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // Update consumption and track allocations ContainerRequest containerRequest = appSchedulingInfo.allocate( type, node, schedulerKey, container); - this.attemptResourceUsage.incUsed(container.getResource()); - getQueue().incUsedResource(container.getResource()); + if (executionType.equals(ExecutionType.GUARANTEED)) { + this.attemptResourceUsage.incUsed(container.getResource()); + getQueue().incUsedGuaranteedResource(container.getResource()); + } else { + this.attemptOpportunisticResourceUsage.incUsed(container.getResource()); + } // Update resource requests related to "request" and store in RMContainer ((RMContainerImpl) rmContainer).setContainerRequest(containerRequest); @@ -621,7 +633,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt private Resource getUsageAfterPreemptingContainer(Resource containerResources, Resource alreadyConsideringForPreemption) { - Resource usageAfterPreemption = Resources.clone(getResourceUsage()); + Resource usageAfterPreemption = + Resources.clone(getGuaranteedResourceUsage()); // Subtract resources of containers already queued for preemption synchronized (preemptionVariablesLock) { @@ -648,7 +661,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt * @return Container */ private Container createContainer(FSSchedulerNode node, Resource capability, - SchedulerRequestKey schedulerKey) { + SchedulerRequestKey schedulerKey, ExecutionType executionType) { NodeId nodeId = node.getRMNode().getNodeID(); ContainerId containerId = BuilderUtils.newContainerId( @@ -658,7 +671,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode().getHttpAddress(), capability, schedulerKey.getPriority(), null, - schedulerKey.getAllocationRequestId()); + executionType, schedulerKey.getAllocationRequestId()); } @Override @@ -670,7 +683,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt super.recoverContainer(node, rmContainer); if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) { - getQueue().incUsedResource(rmContainer.getContainer().getResource()); + getQueue().incUsedGuaranteedResource( + rmContainer.getContainer().getResource()); } // If not running unmanaged, the first container we recover is always @@ -708,7 +722,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt if (reservedContainer == null) { reservedContainer = createContainer(node, perAllocationResource, - schedulerKey); + schedulerKey, ExecutionType.GUARANTEED); getMetrics().reserveResource(node.getPartition(), getUser(), reservedContainer.getResource()); RMContainer rmContainer = @@ -765,11 +779,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt public void unreserve(SchedulerRequestKey schedulerKey, FSSchedulerNode node) { RMContainer rmContainer = node.getReservedContainer(); - unreserveInternal(schedulerKey, node); - node.unreserveResource(this); - clearReservation(node); - getMetrics().unreserveResource(node.getPartition(), - getUser(), rmContainer.getContainer().getResource()); + if (unreserveInternal(schedulerKey, node)) { + node.unreserveResource(this); + clearReservation(node); + getMetrics().unreserveResource(node.getPartition(), + getUser(), rmContainer.getContainer().getResource()); + } } private void setReservation(SchedulerNode node) { @@ -843,13 +858,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt */ private Resource assignContainer( FSSchedulerNode node, PendingAsk pendingAsk, NodeType type, - boolean reserved, SchedulerRequestKey schedulerKey) { + boolean reserved, boolean opportunistic, + SchedulerRequestKey schedulerKey) { // How much does this request need? Resource capability = pendingAsk.getPerAllocationResource(); // How much does the node have? - Resource available = node.getUnallocatedResource(); + Resource available = opportunistic ? + node.allowedResourceForOverAllocation() : node.getUnallocatedResource(); Container reservedContainer = null; if (reserved) { @@ -861,33 +878,39 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // Inform the application of the new container for this request RMContainer allocatedContainer = allocate(type, node, schedulerKey, pendingAsk, - reservedContainer); - if (allocatedContainer == null) { - // Did the application need this resource? - if (reserved) { - unreserve(schedulerKey, node); - } - return Resources.none(); - } + reservedContainer, opportunistic); - // If we had previously made a reservation, delete it + // delete the previous reservation, if any if (reserved) { unreserve(schedulerKey, node); } - // Inform the node - node.allocateContainer(allocatedContainer); + if (allocatedContainer != null) { + if (opportunistic) { + // if an OPPORTUNISTIC container is allocated, we need to + // unreserve anything that we may have reserved in our + // previous attempt to assign GUARANTEED containers for this + // scheduling request. + unreserve(schedulerKey, node); + } + - // If not running unmanaged, the first container we allocate is always - // the AM. Set the amResource for this app and update the leaf queue's AM - // usage - if (!isAmRunning() && !getUnmanagedAM()) { - setAMResource(capability); - getQueue().addAMResourceUsage(capability); - setAmRunning(true); - } + // Inform the node + node.allocateContainer(allocatedContainer); + + // If not running unmanaged, the first container we allocate + // is always the AM. Set amResource for this app and update + // the leaf queue's AM usage + if (!isAmRunning() && !getUnmanagedAM()) { + setAMResource(capability); + getQueue().addAMResourceUsage(capability); + setAmRunning(true); + } - return capability; + return capability; + } else { + return Resources.none(); + } } if (LOG.isDebugEnabled()) { @@ -898,7 +921,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // The desired container won't fit here, so reserve // Reserve only, if app does not wait for preempted resources on the node, // otherwise we may end up with duplicate reservations - if (isReservable(capability) && + if (isReservable(capability) && !opportunistic && !node.isPreemptedForApp(this) && reserve(pendingAsk.getPerAllocationResource(), node, reservedContainer, type, schedulerKey)) { @@ -944,7 +967,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } @SuppressWarnings("deprecation") - private Resource assignContainer(FSSchedulerNode node, boolean reserved) { + private Resource assignContainer(FSSchedulerNode node, boolean opportunistic, + boolean reserved) { if (LOG.isTraceEnabled()) { LOG.trace("Node offered to app: " + getName() + " reserved: " + reserved); } @@ -1007,7 +1031,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt + ", app attempt id: " + this.attemptId); } return assignContainer(node, nodeLocalPendingAsk, NodeType.NODE_LOCAL, - reserved, schedulerKey); + reserved, opportunistic, schedulerKey); } if (!appSchedulingInfo.canDelayTo(schedulerKey, node.getRackName())) { @@ -1024,7 +1048,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt + ", app attempt id: " + this.attemptId); } return assignContainer(node, rackLocalPendingAsk, NodeType.RACK_LOCAL, - reserved, schedulerKey); + reserved, opportunistic, schedulerKey); } PendingAsk offswitchAsk = getPendingAsk(schedulerKey, @@ -1044,7 +1068,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt + ", app attempt id: " + this.attemptId); } return assignContainer(node, offswitchAsk, NodeType.OFF_SWITCH, - reserved, schedulerKey); + reserved, opportunistic, schedulerKey); } } @@ -1145,7 +1169,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // there's only one container size per priority. if (Resources.fitsIn(node.getReservedContainer().getReservedResource(), node.getUnallocatedResource())) { - assignContainer(node, true); + assignContainer(node, false, true); } return true; } @@ -1161,7 +1185,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt Resource fairDemand = Resources.componentwiseMin(threshold, demand); // Check if the queue is starved for fairshare - boolean starved = isUsageBelowShare(getResourceUsage(), fairDemand); + boolean starved = + isUsageBelowShare(getGuaranteedResourceUsage(), fairDemand); if (!starved) { lastTimeAtFairShare = now; @@ -1173,8 +1198,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt fairshareStarvation = Resources.none(); } else { // The app has been starved for longer than preemption-timeout. - fairshareStarvation = - Resources.subtractFromNonNegative(fairDemand, getResourceUsage()); + fairshareStarvation = Resources.subtractFromNonNegative(fairDemand, + getGuaranteedResourceUsage()); } return fairshareStarvation; } @@ -1193,7 +1218,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt * @return true if the app is starved for fairshare, false otherwise */ boolean isStarvedForFairShare() { - return isUsageBelowShare(getResourceUsage(), getFairShare()); + return isUsageBelowShare(getGuaranteedResourceUsage(), getFairShare()); } /** @@ -1294,7 +1319,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt * Get the current app's unsatisfied demand. */ Resource getPendingDemand() { - return Resources.subtract(demand, getResourceUsage()); + return Resources.subtract(demand, getGuaranteedResourceUsage()); } @Override @@ -1313,11 +1338,16 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } @Override - public Resource getResourceUsage() { + public Resource getGuaranteedResourceUsage() { return getCurrentConsumption(); } @Override + public Resource getOpportunisticResourceUsage() { + return attemptOpportunisticResourceUsage.getUsed(); + } + + @Override public float getWeight() { float weight = 1.0F; @@ -1366,7 +1396,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } @Override - public Resource assignContainer(FSSchedulerNode node) { + public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) { if (isOverAMShareLimit()) { PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk(); updateAMDiagnosticMsg(amAsk.getPerAllocationResource(), @@ -1378,7 +1408,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } return Resources.none(); } - return assignContainer(node, false); + return assignContainer(node, opportunistic, false); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 49d2166..2a90228 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -89,7 +89,7 @@ public class FSLeafQueue extends FSQueue { } else { nonRunnableApps.add(app); } - incUsedResource(app.getResourceUsage()); + incUsedGuaranteedResource(app.getGuaranteedResourceUsage()); } finally { writeLock.unlock(); } @@ -124,7 +124,7 @@ public class FSLeafQueue extends FSQueue { getMetrics().setAMResourceUsage(amResourceUsage); } - decUsedResource(app.getResourceUsage()); + decUsedGuaranteedResource(app.getGuaranteedResourceUsage()); return runnable; } @@ -294,6 +294,42 @@ public class FSLeafQueue extends FSQueue { return demand; } + @Override + public Resource getGuaranteedResourceUsage() { + Resource guaranteedResource = Resources.createResource(0); + readLock.lock(); + try { + for (FSAppAttempt app : runnableApps) { + Resources.addTo(guaranteedResource, app.getGuaranteedResourceUsage()); + } + for (FSAppAttempt app : nonRunnableApps) { + Resources.addTo(guaranteedResource, app.getGuaranteedResourceUsage()); + } + } finally { + readLock.unlock(); + } + return guaranteedResource; + } + + @Override + public Resource getOpportunisticResourceUsage() { + Resource opportunisticResource = Resource.newInstance(0, 0); + readLock.lock(); + try { + for (FSAppAttempt app : runnableApps) { + Resources.addTo(opportunisticResource, + app.getOpportunisticResourceUsage()); + } + for (FSAppAttempt app : nonRunnableApps) { + Resources.addTo(opportunisticResource, + app.getOpportunisticResourceUsage()); + } + } finally { + readLock.unlock(); + } + return opportunisticResource; + } + Resource getAmResourceUsage() { return amResourceUsage; } @@ -327,14 +363,14 @@ public class FSLeafQueue extends FSQueue { } @Override - public Resource assignContainer(FSSchedulerNode node) { + public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) { Resource assigned = none(); if (LOG.isDebugEnabled()) { LOG.debug("Node " + node.getNodeName() + " offered to queue: " + getName() + " fairShare: " + getFairShare()); } - if (!assignContainerPreCheck(node)) { + if (!assignContainerPreCheck(node, opportunistic)) { return assigned; } @@ -342,7 +378,7 @@ public class FSLeafQueue extends FSQueue { if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) { continue; } - assigned = sched.assignContainer(node); + assigned = sched.assignContainer(node, opportunistic); if (!assigned.equals(none())) { if (LOG.isDebugEnabled()) { LOG.debug("Assigned container in queue:" + getName() + " " + @@ -540,7 +576,8 @@ public class FSLeafQueue extends FSQueue { Resource desiredShare = Resources.min(policy.getResourceCalculator(), scheduler.getClusterResource(), getMinShare(), getDemand()); - Resource starvation = Resources.subtract(desiredShare, getResourceUsage()); + Resource starvation = + Resources.subtract(desiredShare, getGuaranteedResourceUsage()); boolean starved = !Resources.isNone(starvation); long now = scheduler.getClock().getTime(); @@ -598,7 +635,7 @@ public class FSLeafQueue extends FSQueue { ", SteadyFairShare: " + getSteadyFairShare() + ", MaxShare: " + getMaxShare() + ", MinShare: " + minShare + - ", ResourceUsage: " + getResourceUsage() + + ", ResourceUsage: " + getGuaranteedResourceUsage() + ", Demand: " + getDemand() + ", Runnable: " + getNumRunnableApps() + ", NumPendingApps: " + getNumPendingApps() + http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index a8e53fc..3e63e97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -119,6 +119,34 @@ public class FSParentQueue extends FSQueue { } @Override + public Resource getGuaranteedResourceUsage() { + Resource guaranteedResource = Resources.createResource(0); + readLock.lock(); + try { + for (FSQueue child : childQueues) { + Resources.addTo(guaranteedResource, child.getGuaranteedResourceUsage()); + } + } finally { + readLock.unlock(); + } + return guaranteedResource; + } + + @Override + public Resource getOpportunisticResourceUsage() { + Resource opportunisticResource = Resource.newInstance(0, 0); + readLock.lock(); + try { + for (FSQueue child : childQueues) { + Resources.addTo(opportunisticResource, + child.getOpportunisticResourceUsage()); + } + } finally { + readLock.unlock(); + } + return opportunisticResource; + } + public void updateDemand() { // Compute demand by iterating through apps in the queue // Limit demand to maxResources @@ -177,11 +205,11 @@ public class FSParentQueue extends FSQueue { } @Override - public Resource assignContainer(FSSchedulerNode node) { + public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) { Resource assigned = Resources.none(); // If this queue is over its limit, reject - if (!assignContainerPreCheck(node)) { + if (!assignContainerPreCheck(node, opportunistic)) { return assigned; } @@ -204,7 +232,7 @@ public class FSParentQueue extends FSQueue { readLock.lock(); try { for (FSQueue child : childQueues) { - assigned = child.assignContainer(node); + assigned = child.assignContainer(node, opportunistic); if (!Resources.equals(assigned, Resources.none())) { break; } @@ -288,7 +316,7 @@ public class FSParentQueue extends FSQueue { ", SteadyFairShare: " + getSteadyFairShare() + ", MaxShare: " + getMaxShare() + ", MinShare: " + minShare + - ", ResourceUsage: " + getResourceUsage() + + ", Guaranteed ResourceUsage: " + getGuaranteedResourceUsage() + ", Demand: " + getDemand() + ", MaxAMShare: " + maxAMShare + ", Runnable: " + getNumRunnableApps() + http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 4babfd5..f293eb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -57,7 +57,7 @@ public abstract class FSQueue implements Queue, Schedulable { private Resource fairShare = Resources.createResource(0, 0); private Resource steadyFairShare = Resources.createResource(0, 0); private Resource reservedResource = Resources.createResource(0, 0); - private final Resource resourceUsage = Resource.newInstance(0, 0); + private final Resource guaranteedResourceUsage = Resource.newInstance(0, 0); private final String name; protected final FairScheduler scheduler; private final YarnAuthorizationProvider authorizer; @@ -234,7 +234,8 @@ public abstract class FSQueue implements Queue, Schedulable { if (getFairShare().getMemorySize() == 0) { queueInfo.setCurrentCapacity(0.0f); } else { - queueInfo.setCurrentCapacity((float) getResourceUsage().getMemorySize() / + queueInfo.setCurrentCapacity( + (float) getGuaranteedResourceUsage().getMemorySize() / getFairShare().getMemorySize()); } @@ -418,14 +419,17 @@ public abstract class FSQueue implements Queue, Schedulable { * * @return true if check passes (can assign) or false otherwise */ - boolean assignContainerPreCheck(FSSchedulerNode node) { - if (node.getReservedContainer() != null) { + boolean assignContainerPreCheck(FSSchedulerNode node, boolean opportunistic) { + if (opportunistic) { + // always pre-approve OPPORTUNISTIC containers to be assigned on the node + return true; + } else if (node.getReservedContainer() != null) { if (LOG.isDebugEnabled()) { LOG.debug("Assigning container failed on node '" + node.getNodeName() + " because it has reserved containers."); } return false; - } else if (!Resources.fitsIn(getResourceUsage(), getMaxShare())) { + } else if (!Resources.fitsIn(getGuaranteedResourceUsage(), getMaxShare())) { if (LOG.isDebugEnabled()) { LOG.debug("Assigning container failed on node '" + node.getNodeName() + " because queue resource usage is larger than MaxShare: " @@ -448,7 +452,8 @@ public abstract class FSQueue implements Queue, Schedulable { @Override public String toString() { return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]", - getName(), getDemand(), getResourceUsage(), fairShare, getWeight()); + getName(), getDemand(), getGuaranteedResourceUsage(), fairShare, + getWeight()); } @Override @@ -480,8 +485,8 @@ public abstract class FSQueue implements Queue, Schedulable { } @Override - public Resource getResourceUsage() { - return resourceUsage; + public Resource getGuaranteedResourceUsage() { + return guaranteedResourceUsage; } /** @@ -489,11 +494,11 @@ public abstract class FSQueue implements Queue, Schedulable { * * @param res the resource to increase */ - protected void incUsedResource(Resource res) { - synchronized (resourceUsage) { - Resources.addTo(resourceUsage, res); + protected void incUsedGuaranteedResource(Resource res) { + synchronized (guaranteedResourceUsage) { + Resources.addTo(guaranteedResourceUsage, res); if (parent != null) { - parent.incUsedResource(res); + parent.incUsedGuaranteedResource(res); } } } @@ -503,11 +508,11 @@ public abstract class FSQueue implements Queue, Schedulable { * * @param res the resource to decrease */ - protected void decUsedResource(Resource res) { - synchronized (resourceUsage) { - Resources.subtractFrom(resourceUsage, res); + protected void decUsedGuaranteedResource(Resource res) { + synchronized (guaranteedResourceUsage) { + Resources.subtractFrom(guaranteedResourceUsage, res); if (parent != null) { - parent.decUsedResource(res); + parent.decUsedGuaranteedResource(res); } } } @@ -520,7 +525,7 @@ public abstract class FSQueue implements Queue, Schedulable { boolean fitsInMaxShare(Resource additionalResource) { Resource usagePlusAddition = - Resources.add(getResourceUsage(), additionalResource); + Resources.add(getGuaranteedResourceUsage(), additionalResource); if (!Resources.fitsIn(usagePlusAddition, getMaxShare())) { if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index e2a62ec..a8e348c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -165,7 +165,6 @@ public class FairScheduler extends private float reservableNodesRatio; // percentage of available nodes // an app can be reserved on - protected boolean sizeBasedWeight; // Give larger weights to larger jobs // Continuous Scheduling enabled or not @Deprecated @@ -188,6 +187,8 @@ public class FairScheduler extends boolean maxAssignDynamic; protected int maxAssign; // Max containers to assign per heartbeat + protected boolean oversubscriptionEnabled; + @VisibleForTesting final MaxRunningAppsEnforcer maxRunningEnforcer; @@ -1000,13 +1001,13 @@ public class FairScheduler extends * resources for preempted containers. * @param node Node to check */ - static void assignPreemptedContainers(FSSchedulerNode node) { + static void attemptToAssignPreemptedResources(FSSchedulerNode node) { for (Entry<FSAppAttempt, Resource> entry : node.getPreemptionList().entrySet()) { FSAppAttempt app = entry.getKey(); Resource preemptionPending = Resources.clone(entry.getValue()); while (!app.isStopped() && !Resources.isNone(preemptionPending)) { - Resource assigned = app.assignContainer(node); + Resource assigned = app.assignContainer(node, false); if (Resources.isNone(assigned) || assigned.equals(FairScheduler.CONTAINER_RESERVED)) { // Fail to assign, let's not try further @@ -1038,44 +1039,82 @@ public class FairScheduler extends // Assign new containers... // 1. Ensure containers are assigned to the apps that preempted // 2. Check for reserved applications - // 3. Schedule if there are no reservations + // 3. Schedule GUARANTEED containers if there are no reservations + // 4. Schedule OPPORTUNISTIC containers if possible // Apps may wait for preempted containers // We have to satisfy these first to avoid cases, when we preempt // a container for A from B and C gets the preempted containers, // when C does not qualify for preemption itself. - assignPreemptedContainers(node); - FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); - boolean validReservation = false; - if (reservedAppSchedulable != null) { - validReservation = reservedAppSchedulable.assignReservedContainer(node); - } + attemptToAssignPreemptedResources(node); + + boolean validReservation = attemptToAssignReservedResources(node); if (!validReservation) { - // No reservation, schedule at queue which is farthest below fair share - int assignedContainers = 0; - Resource assignedResource = Resources.clone(Resources.none()); - Resource maxResourcesToAssign = Resources.multiply( - node.getUnallocatedResource(), 0.5f); - while (node.getReservedContainer() == null) { - Resource assignment = queueMgr.getRootQueue().assignContainer(node); - if (assignment.equals(Resources.none())) { - break; - } + // only attempt to assign GUARANTEED containers if there is no + // reservation on the node because + attemptToAssignResourcesAsGuaranteedContainers(node); + } - assignedContainers++; - Resources.addTo(assignedResource, assignment); - if (!shouldContinueAssigning(assignedContainers, maxResourcesToAssign, - assignedResource)) { - break; - } - } + // attempt to assign OPPORTUNISTIC containers regardless of whether + // we have made a reservation or assigned a GUARANTEED container + if (oversubscriptionEnabled) { + attemptToAssignResourcesAsOpportunisticContainers(node); } + updateRootQueueMetrics(); } finally { writeLock.unlock(); } } + /** + * Assign the reserved resource to the application that have reserved it. + */ + private boolean attemptToAssignReservedResources(FSSchedulerNode node) { + boolean success = false; + FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); + if (reservedAppSchedulable != null) { + success = reservedAppSchedulable.assignReservedContainer(node); + } + return success; + } + + private void attemptToAssignResourcesAsGuaranteedContainers( + FSSchedulerNode node) { + // No reservation, schedule at queue which is farthest below fair share + int assignedContainers = 0; + Resource assignedResource = Resources.clone(Resources.none()); + Resource maxResourcesToAssign = Resources.multiply( + node.getUnallocatedResource(), 0.5f); + while (node.getReservedContainer() == null) { + Resource assignment = + queueMgr.getRootQueue().assignContainer(node, false); + if (assignment.equals(Resources.none())) { + break; + } + assignedContainers++; + Resources.addTo(assignedResource, assignment); + + if (!shouldContinueAssigning(assignedContainers, maxResourcesToAssign, + assignedResource)) { + break; + } + } + } + + /** + * Try to assign OPPORTUNISTIC containers as long as there is resources + * to. + * @param node the node to assign OPPORTUNISTIC containers on + */ + private void attemptToAssignResourcesAsOpportunisticContainers( + FSSchedulerNode node) { + while (!Resources.none().equals( + queueMgr.getRootQueue().assignContainer(node, true))) { + // nothing to do here + } + } + public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) { return super.getApplicationAttempt(appAttemptId); } @@ -1316,6 +1355,7 @@ public class FairScheduler extends sizeBasedWeight = this.conf.getSizeBasedWeight(); usePortForNodeName = this.conf.getUsePortForNodeName(); reservableNodesRatio = this.conf.getReservableNodes(); + oversubscriptionEnabled = this.conf.isOversubscriptionEnabled(); updateInterval = this.conf.getUpdateInterval(); if (updateInterval < 0) { @@ -1690,7 +1730,8 @@ public class FairScheduler extends } // maxShare - if (!Resources.fitsIn(Resources.add(cur.getResourceUsage(), consumption), + if (!Resources.fitsIn( + Resources.add(cur.getGuaranteedResourceUsage(), consumption), cur.getMaxShare())) { throw new YarnException("Moving app attempt " + appAttId + " to queue " + queueName + " would violate queue maxShare constraints on" http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index b50e4bb..30bbb78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -344,6 +344,11 @@ public class FairSchedulerConfiguration extends Configuration { DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS); } + public boolean isOversubscriptionEnabled() { + return getBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + YarnConfiguration.DEFAULT_RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED); + } + /** * Delay in milliseconds for locality fallback node to rack. * @deprecated linked to {@link #CONTINUOUS_SCHEDULING_ENABLED} deprecation http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java index bd1ff7a..f018929 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java @@ -58,8 +58,17 @@ public interface Schedulable { */ Resource getDemand(); - /** Get the aggregate amount of resources consumed by the schedulable. */ - Resource getResourceUsage(); + /** + * Get the aggregate amount of guaranteed resources consumed by the + * schedulable. + */ + Resource getGuaranteedResourceUsage(); + + /** + * Get the aggregate amount of opportunistic resources consumed by the + * schedulable. + */ + Resource getOpportunisticResourceUsage(); /** Minimum Resource share assigned to the schedulable. */ Resource getMinShare(); @@ -89,8 +98,10 @@ public interface Schedulable { /** * Assign a container on this node if possible, and return the amount of * resources assigned. + * @param node the node to assign containers on + * @param opportunistic whether to assign OPPORTUNISTIC containers or not */ - Resource assignContainer(FSSchedulerNode node); + Resource assignContainer(FSSchedulerNode node, boolean opportunistic); /** Get the fair share assigned to this Schedulable. */ Resource getFairShare(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 59635d9..f366891 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -169,8 +169,8 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy { extends DominantResourceFairnessComparator { @Override public int compare(Schedulable s1, Schedulable s2) { - Resource usage1 = s1.getResourceUsage(); - Resource usage2 = s2.getResourceUsage(); + Resource usage1 = s1.getGuaranteedResourceUsage(); + Resource usage2 = s2.getGuaranteedResourceUsage(); Resource minShare1 = s1.getMinShare(); Resource minShare2 = s2.getMinShare(); Resource clusterCapacity = fsContext.getClusterResource(); @@ -370,9 +370,9 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy { @Override public int compare(Schedulable s1, Schedulable s2) { ResourceInformation[] resourceInfo1 = - s1.getResourceUsage().getResources(); + s1.getGuaranteedResourceUsage().getResources(); ResourceInformation[] resourceInfo2 = - s2.getResourceUsage().getResources(); + s2.getGuaranteedResourceUsage().getResources(); ResourceInformation[] minShareInfo1 = s1.getMinShare().getResources(); ResourceInformation[] minShareInfo2 = s2.getMinShare().getResources(); ResourceInformation[] clusterInfo = http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 8179aa7..7ecbeea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -90,8 +90,8 @@ public class FairSharePolicy extends SchedulingPolicy { int res = compareDemand(s1, s2); // Pre-compute resource usages to avoid duplicate calculation - Resource resourceUsage1 = s1.getResourceUsage(); - Resource resourceUsage2 = s2.getResourceUsage(); + Resource resourceUsage1 = s1.getGuaranteedResourceUsage(); + Resource resourceUsage2 = s2.getGuaranteedResourceUsage(); if (res == 0) { res = compareMinShareUsage(s1, s2, resourceUsage1, resourceUsage2); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java index 913513c..fe313a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java @@ -88,7 +88,7 @@ public class FairSchedulerQueueInfo { amMaxResources = new ResourceInfo(Resource.newInstance( queue.getMetrics().getMaxAMShareMB(), queue.getMetrics().getMaxAMShareVCores())); - usedResources = new ResourceInfo(queue.getResourceUsage()); + usedResources = new ResourceInfo(queue.getGuaranteedResourceUsage()); demandResources = new ResourceInfo(queue.getDemand()); fractionMemUsed = (float)usedResources.getMemorySize() / clusterResources.getMemorySize(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index e81192e..6197a2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; @@ -103,7 +102,10 @@ public class MockNodes { return rs; } - private static class MockRMNodeImpl implements RMNode { + /** + * A mock implementation of RMNode. + */ + public static class MockRMNodeImpl implements RMNode { private NodeId nodeId; private String hostName; private String nodeAddr; @@ -118,12 +120,27 @@ public class MockNodes { private ResourceUtilization containersUtilization; private ResourceUtilization nodeUtilization; private Resource physicalResource; + private OverAllocationInfo overAllocationInfo; + private List<UpdatedContainerInfo> containerUpdates = + Collections.EMPTY_LIST; + + public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, + Resource perNode, String rackName, String healthReport, + long lastHealthReportTime, int cmdPort, String hostName, + NodeState state, Set<String> labels, + ResourceUtilization containersUtilization, + ResourceUtilization nodeUtilization, Resource pPhysicalResource) { + this(nodeId, nodeAddr, httpAddress, perNode, rackName, healthReport, + lastHealthReportTime, cmdPort, hostName, state, labels, + containersUtilization, nodeUtilization, pPhysicalResource, null); + } public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, Resource perNode, String rackName, String healthReport, long lastHealthReportTime, int cmdPort, String hostName, NodeState state, Set<String> labels, ResourceUtilization containersUtilization, - ResourceUtilization nodeUtilization, Resource pPhysicalResource) { + ResourceUtilization nodeUtilization, Resource pPhysicalResource, + OverAllocationInfo overAllocationInfo) { this.nodeId = nodeId; this.nodeAddr = nodeAddr; this.httpAddress = httpAddress; @@ -138,6 +155,7 @@ public class MockNodes { this.containersUtilization = containersUtilization; this.nodeUtilization = nodeUtilization; this.physicalResource = pPhysicalResource; + this.overAllocationInfo = overAllocationInfo; } @Override @@ -226,7 +244,7 @@ public class MockNodes { @Override public List<UpdatedContainerInfo> pullContainerUpdates() { - return new ArrayList<UpdatedContainerInfo>(); + return containerUpdates; } @Override @@ -264,7 +282,7 @@ public class MockNodes { @Override public OverAllocationInfo getOverAllocationInfo() { - return null; + return this.overAllocationInfo; } public OpportunisticContainersStatus getOpportunisticContainersStatus() { @@ -289,6 +307,19 @@ public class MockNodes { public Resource getPhysicalResource() { return this.physicalResource; } + + public void updateResourceUtilization(ResourceUtilization utilization) { + this.nodeUtilization = utilization; + } + + public void updateContainersAndNodeUtilization( + UpdatedContainerInfo updatedContainerInfo, + ResourceUtilization resourceUtilization) { + if (updatedContainerInfo != null) { + containerUpdates = Collections.singletonList(updatedContainerInfo); + } + this.nodeUtilization = resourceUtilization; + } }; private static RMNode buildRMNode(int rack, final Resource perNode, @@ -312,6 +343,15 @@ public class MockNodes { NodeState state, String httpAddr, int hostnum, String hostName, int port, Set<String> labels, ResourceUtilization containersUtilization, ResourceUtilization nodeUtilization, Resource physicalResource) { + return buildRMNode(rack, perNode, state, httpAddr, hostnum, hostName, port, + labels, containersUtilization, nodeUtilization, physicalResource, null); + } + + private static MockRMNodeImpl buildRMNode(int rack, final Resource perNode, + NodeState state, String httpAddr, int hostnum, String hostName, int port, + Set<String> labels, ResourceUtilization containersUtilization, + ResourceUtilization nodeUtilization, Resource physicalResource, + OverAllocationInfo overAllocationInfo) { final String rackName = "rack"+ rack; final int nid = hostnum; final String nodeAddr = hostName + ":" + nid; @@ -324,9 +364,9 @@ public class MockNodes { String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe"; return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, rackName, healthReport, 0, nid, hostName, state, labels, - containersUtilization, nodeUtilization, physicalResource); + containersUtilization, nodeUtilization, + physicalResource, overAllocationInfo); } - public static RMNode nodeInfo(int rack, final Resource perNode, NodeState state) { return buildRMNode(rack, perNode, state, "N/A"); @@ -355,4 +395,10 @@ public class MockNodes { return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, port); } + public static MockRMNodeImpl newNodeInfo(int rack, final Resource perNode, + OverAllocationInfo overAllocationInfo) { + return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", + NODE_ID++, null, 123, null, ResourceUtilization.newInstance(0, 0, 0.0f), + ResourceUtilization.newInstance(0, 0, 0.0f), null, overAllocationInfo); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 5b49303..075124f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -502,7 +502,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase FSParentQueue root = scheduler.getQueueManager().getRootQueue(); // ************ check cluster used Resources ******** assertTrue(root.getPolicy() instanceof DominantResourceFairnessPolicy); - assertEquals(usedResources,root.getResourceUsage()); + assertEquals(usedResources, root.getGuaranteedResourceUsage()); // ************ check app headroom **************** FSAppAttempt schedulerAttempt = http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java index 03332b2..6adc63c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java @@ -82,7 +82,7 @@ public class FakeSchedulable implements Schedulable { } @Override - public Resource assignContainer(FSSchedulerNode node) { + public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) { return null; } @@ -112,11 +112,16 @@ public class FakeSchedulable implements Schedulable { } @Override - public Resource getResourceUsage() { + public Resource getGuaranteedResourceUsage() { return usage; } @Override + public Resource getOpportunisticResourceUsage() { + return Resource.newInstance(0, 0); + } + + @Override public long getStartTime() { return startTime; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java index f581935..71db786 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java @@ -222,7 +222,8 @@ public class TestAppRunnability extends FairSchedulerTestBase { scheduler.handle(nodeEvent); scheduler.handle(updateEvent); - assertEquals(Resource.newInstance(1024, 1), oldQueue.getResourceUsage()); + assertEquals(Resource.newInstance(1024, 1), + oldQueue.getGuaranteedResourceUsage()); scheduler.update(); assertEquals(Resource.newInstance(3072, 3), oldQueue.getDemand()); @@ -231,8 +232,10 @@ public class TestAppRunnability extends FairSchedulerTestBase { assertSame(targetQueue, app.getQueue()); assertFalse(oldQueue.isRunnableApp(app)); assertTrue(targetQueue.isRunnableApp(app)); - assertEquals(Resource.newInstance(0, 0), oldQueue.getResourceUsage()); - assertEquals(Resource.newInstance(1024, 1), targetQueue.getResourceUsage()); + assertEquals(Resource.newInstance(0, 0), + oldQueue.getGuaranteedResourceUsage()); + assertEquals(Resource.newInstance(1024, 1), + targetQueue.getGuaranteedResourceUsage()); assertEquals(0, oldQueue.getNumRunnableApps()); assertEquals(1, targetQueue.getNumRunnableApps()); assertEquals(1, queueMgr.getRootQueue().getNumRunnableApps()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java index 51ffd23..9bfbb91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java @@ -223,7 +223,7 @@ public class TestFSAppAttempt extends FairSchedulerTestBase { Mockito.when(mockQueue.getMaxShare()).thenReturn(queueMaxResources); Mockito.when(mockQueue.getFairShare()).thenReturn(queueFairShare); - Mockito.when(mockQueue.getResourceUsage()).thenReturn(queueUsage); + Mockito.when(mockQueue.getGuaranteedResourceUsage()).thenReturn(queueUsage); Mockito.when(mockScheduler.getClusterResource()).thenReturn (clusterResource); Mockito.when(fakeRootQueueMetrics.getAllocatedResources()).thenReturn @@ -305,7 +305,7 @@ public class TestFSAppAttempt extends FairSchedulerTestBase { getApplicationId())); FSAppAttempt app = scheduler.getSchedulerApp(id11); assertNotNull(app); - Resource queueUsage = app.getQueue().getResourceUsage(); + Resource queueUsage = app.getQueue().getGuaranteedResourceUsage(); assertEquals(0, queueUsage.getMemorySize()); assertEquals(0, queueUsage.getVirtualCores()); SchedulerNode n1 = scheduler.getSchedulerNode(node1.getNodeID()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 4a738ca..efe4ad1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -88,7 +88,7 @@ public class TestFSLeafQueue extends FairSchedulerTestBase { FSAppAttempt app = mock(FSAppAttempt.class); Mockito.when(app.getDemand()).thenReturn(maxResource); - Mockito.when(app.getResourceUsage()).thenReturn(Resources.none()); + Mockito.when(app.getGuaranteedResourceUsage()).thenReturn(Resources.none()); schedulable.addApp(app, true); schedulable.addApp(app, true); @@ -176,7 +176,7 @@ public class TestFSLeafQueue extends FairSchedulerTestBase { @Override public void run() { for (int i=0; i < 500; i++) { - schedulable.getResourceUsage(); + schedulable.getGuaranteedResourceUsage(); } } }); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java index 6726f17..f79ba4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java @@ -98,7 +98,7 @@ public class TestFSSchedulerNode { ApplicationAttemptId appAttemptId = mock(ApplicationAttemptId.class); when(starvingApp.getApplicationAttemptId()).thenReturn(appAttemptId); - when(starvingApp.assignContainer(schedulerNode)).thenAnswer( + when(starvingApp.assignContainer(schedulerNode, false)).thenAnswer( new Answer<Resource>() { @Override public Resource answer(InvocationOnMock invocationOnMock) @@ -142,7 +142,7 @@ public class TestFSSchedulerNode { } private void allocateContainers(FSSchedulerNode schedulerNode) { - FairScheduler.assignPreemptedContainers(schedulerNode); + FairScheduler.attemptToAssignPreemptedResources(schedulerNode); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org