YARN-8804. resourceLimits may be wrongly calculated when leaf-queue is blocked 
in cluster with 3+ level queues. Contributed by Tao Yang


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6b988d82
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6b988d82
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6b988d82

Branch: refs/heads/HDDS-4
Commit: 6b988d821e62d29c118e10a7213583b92c302baf
Parents: 913f87d
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Sep 26 14:43:00 2018 -0700
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Sep 26 14:43:00 2018 -0700

----------------------------------------------------------------------
 .../scheduler/ResourceLimits.java               | 24 ++++++
 .../scheduler/capacity/ParentQueue.java         | 16 ++--
 .../capacity/TestContainerAllocation.java       | 78 ++++++++++++++++++++
 3 files changed, 112 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b988d82/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
index 721eb36..820d2fa 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
@@ -38,6 +38,9 @@ public class ResourceLimits {
   // containers.
   private volatile Resource headroom;
 
+  // How much resource should be reserved for high-priority blocked queues
+  private Resource blockedHeadroom;
+
   private boolean allowPreempt = false;
 
   public ResourceLimits(Resource limit) {
@@ -81,4 +84,25 @@ public class ResourceLimits {
   public void setIsAllowPreemption(boolean allowPreempt) {
    this.allowPreempt = allowPreempt;
   }
+
+  public void addBlockedHeadroom(Resource resource) {
+    if (blockedHeadroom == null) {
+      blockedHeadroom = Resource.newInstance(0, 0);
+    }
+    Resources.addTo(blockedHeadroom, resource);
+  }
+
+  public Resource getBlockedHeadroom() {
+    if (blockedHeadroom == null) {
+      return Resources.none();
+    }
+    return blockedHeadroom;
+  }
+
+  public Resource getNetLimit() {
+    if (blockedHeadroom != null) {
+      return Resources.subtract(limit, blockedHeadroom);
+    }
+    return limit;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b988d82/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 80549ca..e32130f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -776,7 +776,6 @@ public class ParentQueue extends AbstractCSQueue {
       SchedulingMode schedulingMode) {
     CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT;
 
-    Resource parentLimits = limits.getLimit();
     printChildQueues();
 
     // Try to assign to most 'under-served' sub-queue
@@ -790,7 +789,7 @@ public class ParentQueue extends AbstractCSQueue {
 
       // Get ResourceLimits of child queue before assign containers
       ResourceLimits childLimits =
-          getResourceLimitsOfChild(childQueue, cluster, parentLimits,
+          getResourceLimitsOfChild(childQueue, cluster, limits.getNetLimit(),
               candidates.getPartition());
 
       CSAssignment childAssignment = childQueue.assignContainers(cluster,
@@ -812,16 +811,21 @@ public class ParentQueue extends AbstractCSQueue {
             CSAssignment.SkippedType.QUEUE_LIMIT) {
           assignment = childAssignment;
         }
+        Resource blockedHeadroom = null;
+        if (childQueue instanceof LeafQueue) {
+          blockedHeadroom = childLimits.getHeadroom();
+        } else {
+          blockedHeadroom = childLimits.getBlockedHeadroom();
+        }
         Resource resourceToSubtract = Resources.max(resourceCalculator,
-            cluster, childLimits.getHeadroom(), Resources.none());
+            cluster, blockedHeadroom, Resources.none());
+        limits.addBlockedHeadroom(resourceToSubtract);
         if(LOG.isDebugEnabled()) {
-          LOG.debug("Decrease parentLimits " + parentLimits +
+          LOG.debug("Decrease parentLimits " + limits.getLimit() +
               " for " + this.getQueueName() + " by " +
               resourceToSubtract + " as childQueue=" +
               childQueue.getQueueName() + " is blocked");
         }
-        parentLimits = Resources.subtract(parentLimits,
-            resourceToSubtract);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b988d82/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index 3d028ee..f1b4444 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -1052,4 +1052,82 @@ public class TestContainerAllocation {
 
     rm1.close();
   }
+
+  @Test(timeout = 60000)
+  public void testAllocationCannotBeBlockedWhenFormerQueueReachedItsLimit()
+      throws Exception {
+    /**
+     * Queue structure:
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     *                   |  \
+     *                  c1   c2
+     *           10(max=10)  90
+     * </pre>
+     * Test case:
+     * Create a cluster with two nodes whose node resource both are
+     * <10GB, 10core>, create queues as above, among them max-capacity of "c1"
+     * is 10 and others are all 100, so that max-capacity of queue "c1" is
+     * <2GB, 2core>,
+     * submit app1 to queue "c1" and launch am1(resource=<1GB, 1 core>) on nm1,
+     * submit app2 to queue "b" and launch am2(resource=<1GB, 1 core>) on nm1,
+     * app1 and app2 both ask one <2GB, 1core> containers
+     *
+     * Now queue "c" has lower capacity percentage than queue "b", the
+     * allocation sequence will be "a" -> "c" -> "b", queue "c1" has reached
+     * queue limit so that requests of app1 should be pending
+     *
+     * After nm1 do 1 heartbeat, scheduler should allocate one container for
+     * app2 on nm1.
+     */
+    CapacitySchedulerConfiguration newConf =
+        (CapacitySchedulerConfiguration) TestUtils
+            .getConfigurationWithMultipleQueues(conf);
+    newConf.setQueues(CapacitySchedulerConfiguration.ROOT + ".c",
+        new String[] { "c1", "c2" });
+    newConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c1", 10);
+    newConf
+        .setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c1", 10);
+    newConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c2", 90);
+    newConf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+        DominantResourceCalculator.class, ResourceCalculator.class);
+
+    MockRM rm1 = new MockRM(newConf);
+
+    RMNodeLabelsManager nodeLabelsManager = new NullRMNodeLabelsManager();
+    nodeLabelsManager.init(newConf);
+    rm1.getRMContext().setNodeLabelManager(nodeLabelsManager);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB);
+
+    // launch an app to queue "c1", AM container should be launched on nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // launch another app to queue "b", AM container should be launched on nm1
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+    am1.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
+    am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    FiCaSchedulerApp schedulerApp1 =
+        cs.getApplicationAttempt(am1.getApplicationAttemptId());
+    FiCaSchedulerApp schedulerApp2 =
+        cs.getApplicationAttempt(am2.getApplicationAttemptId());
+
+    // Do nm1 heartbeats 1 times, will allocate a container on nm1 for app2
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    rm1.drainEvents();
+    Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
+
+    rm1.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to