YARN-6750. Add a configuration to cap how much a NM can be overallocated. 
Contributed by Haibo Chen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cd2ae262
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cd2ae262
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cd2ae262

Branch: refs/heads/YARN-1011
Commit: cd2ae2626293865a10f9e397b12fc8e320aec971
Parents: 86a6c26
Author: Miklos Szegedi <szege...@apache.org>
Authored: Wed Nov 22 09:17:56 2017 -0800
Committer: Haibo Chen <haiboc...@apache.org>
Committed: Mon Apr 9 17:25:58 2018 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |   5 +
 .../src/main/resources/yarn-default.xml         |  10 ++
 .../scheduler/AbstractYarnScheduler.java        |   4 +
 .../scheduler/SchedulerNode.java                |  42 ++++++--
 .../scheduler/fair/FSSchedulerNode.java         |   5 +
 .../scheduler/fair/FairScheduler.java           |   2 +-
 .../scheduler/fair/TestFairScheduler.java       | 107 +++++++++++++++++++
 7 files changed, 168 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd2ae262/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 4c1c6d5..eaa249f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -390,6 +390,11 @@ public class YarnConfiguration extends Configuration {
   /** ACL used in case none is found. Allows nothing. */
   public static final String DEFAULT_YARN_APP_ACL = " ";
 
+  /** The global max overallocation per node in terms of their capacity. */
+  public static final String PER_NODE_MAX_OVERALLOCATION_RATIO =
+      RM_PREFIX + "overallocation.per-node-max-ratio";
+  public static final float DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO = 4.0f;
+
   /** Setting that controls whether opportunistic container allocation
    *  is enabled or not. */
   @Unstable

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd2ae262/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 80b1b54..123d03c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3224,6 +3224,16 @@
 
   <property>
     <description>
+    The maximum amount of resources, specified as a ratio to node capacity,
+    that can be allocated to opportunistic containers on any given node in
+    the cluster.
+    </description>
+    <name>yarn.resourcemanager.overallocation.per-node-max-ratio</name>
+    <value>4.0</value>
+  </property>
+
+  <property>
+    <description>
     Frequency for computing least loaded NMs.
     </description>
     
<name>yarn.resourcemanager.nm-container-queuing.sorting-nodes-interval-ms</name>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd2ae262/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 4818132..e3b56d3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -119,6 +119,7 @@ public abstract class AbstractYarnScheduler
       new ClusterNodeTracker<>();
 
   protected Resource minimumAllocation;
+  protected float maxOverAllocationRatioPerNode;
 
   protected volatile RMContext rmContext;
 
@@ -201,6 +202,9 @@ public abstract class AbstractYarnScheduler
     nodeTracker.setConfiguredMaxAllocationWaitTime(
         configuredMaximumAllocationWaitTime);
     maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);
+    maxOverAllocationRatioPerNode = conf.getFloat(
+        YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO,
+        YarnConfiguration.DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO);
     createReleaseCache();
     autoUpdateContainers =
         conf.getBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd2ae262/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 88af3b2..7da01fd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -91,13 +91,17 @@ public abstract class SchedulerNode {
   protected Resource resourceAllocatedPendingLaunch =
       Resource.newInstance(0, 0);
 
+  // The max amount of resources that can be allocated to opportunistic
+  // containers on the node, specified as a ratio to its capacity
+  private final float maxOverAllocationRatio;
+
   private volatile Set<String> labels = null;
 
   // Last updated time
   private volatile long lastHeartbeatMonotonicTime;
 
   public SchedulerNode(RMNode node, boolean usePortForNodeName,
-      Set<String> labels) {
+      Set<String> labels, float maxOverAllocationRatio) {
     this.rmNode = node;
     this.unallocatedResource = Resources.clone(node.getTotalCapability());
     this.capacity = Resources.clone(node.getTotalCapability());
@@ -108,10 +112,24 @@ public abstract class SchedulerNode {
     }
     this.labels = ImmutableSet.copyOf(labels);
     this.lastHeartbeatMonotonicTime = Time.monotonicNow();
+    this.maxOverAllocationRatio = maxOverAllocationRatio;
+  }
+
+  public SchedulerNode(RMNode node, boolean usePortForNodeName,
+      Set<String> labels) {
+    this(node, usePortForNodeName, labels,
+        YarnConfiguration.DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO);
+  }
+
+  public SchedulerNode(RMNode node, boolean usePortForNodeName,
+      float maxOverAllocationRatio) {
+    this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET,
+        maxOverAllocationRatio);
   }
 
   public SchedulerNode(RMNode node, boolean usePortForNodeName) {
-    this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET);
+    this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET,
+        YarnConfiguration.DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO);
   }
 
   public RMNode getRMNode() {
@@ -639,9 +657,11 @@ public abstract class SchedulerNode {
 
   /**
    * Get the amount of resources that can be allocated to opportunistic
-   * containers in the case of overallocation. It is calculated as
+   * containers in the case of overallocation, calculated as
    * node capacity - (node utilization + resources of allocated-yet-not-started
-   * containers).
+   * containers), subject to the maximum amount of resources that can be
+   * allocated to opportunistic containers on the node specified as a ratio to
+   * its capacity.
    * @return the amount of resources that are available to be allocated to
    *         opportunistic containers
    */
@@ -674,11 +694,21 @@ public abstract class SchedulerNode {
     Resource resourceAllowedForOpportunisticContainers =
         Resources.createResource(allowedMemory, allowedCpu);
 
-    // TODO cap the resources allocated to OPPORTUNISTIC containers on a node
-    // in terms of its capacity. i.e. return min(max_ratio * capacity, allowed)
+    // cap the total amount of resources allocated to OPPORTUNISTIC containers
+    Resource maxOverallocation = getMaxOverallocationAllowed();
+    Resources.subtractFrom(maxOverallocation, allocatedResourceOpportunistic);
+    resourceAllowedForOpportunisticContainers = Resources.componentwiseMin(
+        maxOverallocation, resourceAllowedForOpportunisticContainers);
+
     return resourceAllowedForOpportunisticContainers;
   }
 
+  private Resource getMaxOverallocationAllowed() {
+    long maxMemory = (long) (capacity.getMemorySize() * 
maxOverAllocationRatio);
+    int maxVcore = (int) (capacity.getVirtualCores() * maxOverAllocationRatio);
+    return Resource.newInstance(maxMemory, maxVcore);
+  }
+
   private static class ContainerInfo {
     private final RMContainer container;
     private boolean launchedOnNode;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd2ae262/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index 95490f5..a53dda4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -66,10 +66,15 @@ public class FSSchedulerNode extends SchedulerNode {
   // slated for preemption
   private Resource totalResourcesPreempted = Resource.newInstance(0, 0);
 
+  @VisibleForTesting
   public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
     super(node, usePortForNodeName);
   }
 
+  public FSSchedulerNode(RMNode node, boolean usePortForNodeName,
+      float maxOverallocationRatio) {
+    super(node, usePortForNodeName, maxOverallocationRatio);
+  }
   /**
    * Total amount of reserved resources including reservations and preempted
    * containers.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd2ae262/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 6f9b624..545eed2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -752,7 +752,7 @@ public class FairScheduler extends
     try {
       writeLock.lock();
       FSSchedulerNode schedulerNode = new FSSchedulerNode(node,
-          usePortForNodeName);
+          usePortForNodeName, maxOverAllocationRatioPerNode);
       nodeTracker.addNode(schedulerNode);
 
       triggerUpdate();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd2ae262/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 508735a..10cf317 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -3137,6 +3137,113 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
     }
   }
 
+  /**
+   * Test that max overallocation per node is enforced by Fair Scheduler.
+   * @throws Exception
+   */
+  @Test
+  public void testMaxOverallocationPerNode() throws Exception {
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+        true);
+    // disable resource request normalization in fair scheduler
+    int memoryAllocationIncrement = conf.getInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+        FairSchedulerConfiguration.
+            DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+    conf.setInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+    int memoryAllocationMinimum = conf.getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+    float maxOverallocationRatio = conf.getFloat(
+        YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO,
+        YarnConfiguration.DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO);
+    conf.setFloat(YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO, 1.5f);
+
+    try {
+      scheduler.init(conf);
+      scheduler.start();
+      scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+      // Add a node with 1G of memory and 1 vcores and an overallocation
+      // threshold of 1.0f and 1.0f for memory and cpu respectively
+      OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+          ResourceThresholds.newInstance(1f, 1f));
+      MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+          Resources.createResource(1024, 1), overAllocationInfo);
+      scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+      // create a scheduling request that takes up the whole node
+      ApplicationAttemptId appAttempt1 =
+          createSchedulingRequest(1024, "queue1", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers1 =
+          
scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers1.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers1.get(0).getExecutionType());
+
+      // node utilization is zero after the container runs
+      ContainerStatus containerStatus1 = ContainerStatus.newInstance(
+          allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.singletonList(containerStatus1),
+              Collections.emptyList()),
+          ResourceUtilization.newInstance(0, 0, 0.0f));
+
+      // create a scheduling request that should get allocated an OPPORTUNISTIC
+      // container because the node utilization is zero
+      ApplicationAttemptId appAttempt2 =
+          createSchedulingRequest(1024, "queue2", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      List<Container> allocatedContainers2 =
+          
scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers2.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.OPPORTUNISTIC,
+          allocatedContainers2.get(0).getExecutionType());
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+          getOpportunisticResourceUsage().getMemorySize());
+
+      // node utilization is still zero after the container runs
+      ContainerStatus containerStatus2 = ContainerStatus.newInstance(
+          allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.singletonList(containerStatus2),
+              Collections.emptyList()),
+          ResourceUtilization.newInstance(0, 0, 0.0f));
+
+      // create another scheduling request that should not get any allocation
+      // because of the max overallocation on the node will be exceeded.
+      ApplicationAttemptId appAttempt3 =
+          createSchedulingRequest(1024, "queue3", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(0, scheduler.getQueueManager().getQueue("queue3").
+          getOpportunisticResourceUsage().getMemorySize());
+      List<Container> allocatedContainers3 =
+          
scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers3.size() == 0);
+      assertEquals(0, scheduler.getQueueManager().getQueue("queue3").
+          getOpportunisticResourceUsage().getMemorySize());
+    } finally {
+      conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+          false);
+      conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          memoryAllocationMinimum);
+      conf.setInt(
+          FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+          memoryAllocationIncrement);
+      conf.setFloat(YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO,
+          maxOverallocationRatio);
+    }
+  }
+
   @Test
   public void testAclSubmitApplication() throws Exception {
     // Set acl's


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to