YARN-6750. Add a configuration to cap how much a NM can be overallocated. Contributed by Haibo Chen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6d028d77 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6d028d77 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6d028d77 Branch: refs/heads/YARN-1011 Commit: 6d028d77ca7c4650065251ea7e175fefe055eafa Parents: d106fdb Author: Miklos Szegedi <szege...@apache.org> Authored: Wed Nov 22 09:17:56 2017 -0800 Committer: Haibo Chen <haiboc...@apache.org> Committed: Fri Sep 28 14:06:14 2018 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 5 + .../src/main/resources/yarn-default.xml | 10 ++ .../scheduler/AbstractYarnScheduler.java | 4 + .../scheduler/SchedulerNode.java | 42 ++++++-- .../scheduler/fair/FSSchedulerNode.java | 5 + .../scheduler/fair/FairScheduler.java | 2 +- .../scheduler/fair/TestFairScheduler.java | 107 +++++++++++++++++++ 7 files changed, 168 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d028d77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 71651a6..0326880 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -398,6 +398,11 @@ public class YarnConfiguration extends Configuration { /** ACL used in case none is found. Allows nothing. */ public static final String DEFAULT_YARN_APP_ACL = " "; + /** The global max overallocation per node in terms of their capacity. */ + public static final String PER_NODE_MAX_OVERALLOCATION_RATIO = + RM_PREFIX + "overallocation.per-node-max-ratio"; + public static final float DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO = 4.0f; + /** Setting that controls whether opportunistic container allocation * is enabled or not. */ @Unstable http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d028d77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index b45fe06..bc9279e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3342,6 +3342,16 @@ <property> <description> + The maximum amount of resources, specified as a ratio to node capacity, + that can be allocated to opportunistic containers on any given node in + the cluster. + </description> + <name>yarn.resourcemanager.overallocation.per-node-max-ratio</name> + <value>4.0</value> + </property> + + <property> + <description> Frequency for computing least loaded NMs. </description> <name>yarn.resourcemanager.nm-container-queuing.sorting-nodes-interval-ms</name> http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d028d77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 6b1fdcb..278e474 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -119,6 +119,7 @@ public abstract class AbstractYarnScheduler new ClusterNodeTracker<>(); protected Resource minimumAllocation; + protected float maxOverAllocationRatioPerNode; protected volatile RMContext rmContext; @@ -201,6 +202,9 @@ public abstract class AbstractYarnScheduler nodeTracker.setConfiguredMaxAllocationWaitTime( configuredMaximumAllocationWaitTime); maxClusterLevelAppPriority = getMaxPriorityFromConf(conf); + maxOverAllocationRatioPerNode = conf.getFloat( + YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO, + YarnConfiguration.DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO); createReleaseCache(); autoUpdateContainers = conf.getBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d028d77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index ff3640c..3e16dc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -94,6 +94,10 @@ public abstract class SchedulerNode { protected Resource resourceAllocatedPendingLaunch = Resource.newInstance(0, 0); + // The max amount of resources that can be allocated to opportunistic + // containers on the node, specified as a ratio to its capacity + private final float maxOverAllocationRatio; + private volatile Set<String> labels = null; private volatile Set<NodeAttribute> nodeAttributes = null; @@ -102,7 +106,7 @@ public abstract class SchedulerNode { private volatile long lastHeartbeatMonotonicTime; public SchedulerNode(RMNode node, boolean usePortForNodeName, - Set<String> labels) { + Set<String> labels, float maxOverAllocationRatio) { this.rmNode = node; this.rmContext = node.getRMContext(); this.unallocatedResource = Resources.clone(node.getTotalCapability()); @@ -114,10 +118,24 @@ public abstract class SchedulerNode { } this.labels = ImmutableSet.copyOf(labels); this.lastHeartbeatMonotonicTime = Time.monotonicNow(); + this.maxOverAllocationRatio = maxOverAllocationRatio; + } + + public SchedulerNode(RMNode node, boolean usePortForNodeName, + Set<String> labels) { + this(node, usePortForNodeName, labels, + YarnConfiguration.DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO); + } + + public SchedulerNode(RMNode node, boolean usePortForNodeName, + float maxOverAllocationRatio) { + this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET, + maxOverAllocationRatio); } public SchedulerNode(RMNode node, boolean usePortForNodeName) { - this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET); + this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET, + YarnConfiguration.DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO); } public RMNode getRMNode() { @@ -671,9 +689,11 @@ public abstract class SchedulerNode { /** * Get the amount of resources that can be allocated to opportunistic - * containers in the case of overallocation. It is calculated as + * containers in the case of overallocation, calculated as * node capacity - (node utilization + resources of allocated-yet-not-started - * containers). + * containers), subject to the maximum amount of resources that can be + * allocated to opportunistic containers on the node specified as a ratio to + * its capacity. * @return the amount of resources that are available to be allocated to * opportunistic containers */ @@ -706,11 +726,21 @@ public abstract class SchedulerNode { Resource resourceAllowedForOpportunisticContainers = Resources.createResource(allowedMemory, allowedCpu); - // TODO cap the resources allocated to OPPORTUNISTIC containers on a node - // in terms of its capacity. i.e. return min(max_ratio * capacity, allowed) + // cap the total amount of resources allocated to OPPORTUNISTIC containers + Resource maxOverallocation = getMaxOverallocationAllowed(); + Resources.subtractFrom(maxOverallocation, allocatedResourceOpportunistic); + resourceAllowedForOpportunisticContainers = Resources.componentwiseMin( + maxOverallocation, resourceAllowedForOpportunisticContainers); + return resourceAllowedForOpportunisticContainers; } + private Resource getMaxOverallocationAllowed() { + long maxMemory = (long) (capacity.getMemorySize() * maxOverAllocationRatio); + int maxVcore = (int) (capacity.getVirtualCores() * maxOverAllocationRatio); + return Resource.newInstance(maxMemory, maxVcore); + } + private static class ContainerInfo { private final RMContainer container; private boolean launchedOnNode; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d028d77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index 95490f5..a53dda4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -66,10 +66,15 @@ public class FSSchedulerNode extends SchedulerNode { // slated for preemption private Resource totalResourcesPreempted = Resource.newInstance(0, 0); + @VisibleForTesting public FSSchedulerNode(RMNode node, boolean usePortForNodeName) { super(node, usePortForNodeName); } + public FSSchedulerNode(RMNode node, boolean usePortForNodeName, + float maxOverallocationRatio) { + super(node, usePortForNodeName, maxOverallocationRatio); + } /** * Total amount of reserved resources including reservations and preempted * containers. http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d028d77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 5d48fac..25782a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -774,7 +774,7 @@ public class FairScheduler extends writeLock.lock(); try { FSSchedulerNode schedulerNode = new FSSchedulerNode(node, - usePortForNodeName); + usePortForNodeName, maxOverAllocationRatioPerNode); nodeTracker.addNode(schedulerNode); triggerUpdate(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d028d77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index fbb7243..9302f88 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -3143,6 +3143,113 @@ public class TestFairScheduler extends FairSchedulerTestBase { } } + /** + * Test that max overallocation per node is enforced by Fair Scheduler. + * @throws Exception + */ + @Test + public void testMaxOverallocationPerNode() throws Exception { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration. + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + float maxOverallocationRatio = conf.getFloat( + YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO, + YarnConfiguration.DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO); + conf.setFloat(YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO, 1.5f); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 1G of memory and 1 vcores and an overallocation + // threshold of 1.0f and 1.0f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(1f, 1f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(1024, 1), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request that takes up the whole node + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(1024, "queue1", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + + // node utilization is zero after the container runs + ContainerStatus containerStatus1 = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus1), + Collections.emptyList()), + ResourceUtilization.newInstance(0, 0, 0.0f)); + + // create a scheduling request that should get allocated an OPPORTUNISTIC + // container because the node utilization is zero + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(1024, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + List<Container> allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers2.get(0).getExecutionType()); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + + // node utilization is still zero after the container runs + ContainerStatus containerStatus2 = ContainerStatus.newInstance( + allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus2), + Collections.emptyList()), + ResourceUtilization.newInstance(0, 0, 0.0f)); + + // create another scheduling request that should not get any allocation + // because of the max overallocation on the node will be exceeded. + ApplicationAttemptId appAttempt3 = + createSchedulingRequest(1024, "queue3", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(0, scheduler.getQueueManager().getQueue("queue3"). + getOpportunisticResourceUsage().getMemorySize()); + List<Container> allocatedContainers3 = + scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers3.size() == 0); + assertEquals(0, scheduler.getQueueManager().getQueue("queue3"). + getOpportunisticResourceUsage().getMemorySize()); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + conf.setFloat(YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO, + maxOverallocationRatio); + } + } + @Test public void testAclSubmitApplication() throws Exception { // Set acl's --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org