This is an automated email from the ASF dual-hosted git repository.

prabhujoseph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6ce295b  YARN-10259. Fix reservation logic in Multi Node Placement.
6ce295b is described below

commit 6ce295b78737aca8103912121d54f318cb5d36ef
Author: Prabhu Joseph <pjos...@apache.org>
AuthorDate: Thu May 7 17:47:51 2020 +0530

    YARN-10259. Fix reservation logic in Multi Node Placement.
    
    Reviewed by Wangda Tan.
---
 .../scheduler/capacity/LeafQueue.java              |  14 ++-
 .../allocator/RegularContainerAllocator.java       |  18 +++-
 .../capacity/TestCapacitySchedulerMultiNodes.java  | 110 ++++++++++++++++++++-
 3 files changed, 130 insertions(+), 12 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 64b1536..69f41ef 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -1008,11 +1008,15 @@ public class LeafQueue extends AbstractCSQueue {
   private CSAssignment allocateFromReservedContainer(Resource clusterResource,
       CandidateNodeSet<FiCaSchedulerNode> candidates,
       ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
-    // Considering multi-node scheduling, its better to iterate through
-    // all candidates and stop once we get atleast one good node to allocate
-    // where reservation was made earlier. In normal case, there is only one
-    // node and hence there wont be any impact after this change.
-    for (FiCaSchedulerNode node : candidates.getAllNodes().values()) {
+
+    // Irrespective of Single / Multi Node Placement, the allocate from
+    // Reserved Container has to happen only for the single node which
+    // CapacityScheduler#allocateFromReservedContainer invokes with.
+    // Else In Multi Node Placement, there won't be any Allocation or
+    // Reserve of new containers when there is a RESERVED container on
+    // a node which is full.
+    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
+    if (node != null) {
       RMContainer reservedContainer = node.getReservedContainer();
       if (reservedContainer != null) {
         FiCaSchedulerApp application = getApplication(
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 1dacc96..287dc67 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -837,6 +837,7 @@ public class RegularContainerAllocator extends 
AbstractContainerAllocator {
     // Do checks before determining which node to allocate
     // Directly return if this check fails.
     ContainerAllocation result;
+    ContainerAllocation lastReservation = null;
 
     AppPlacementAllocator<FiCaSchedulerNode> schedulingPS =
         application.getAppSchedulingInfo().getAppPlacementAllocator(
@@ -878,11 +879,24 @@ public class RegularContainerAllocator extends 
AbstractContainerAllocator {
       result = tryAllocateOnNode(clusterResource, node, schedulingMode,
           resourceLimits, schedulerKey, reservedContainer);
 
-      if (AllocationState.ALLOCATED == result.getAllocationState()
-          || AllocationState.RESERVED == result.getAllocationState()) {
+      if (AllocationState.ALLOCATED == result.getAllocationState()) {
         result = doAllocation(result, node, schedulerKey, reservedContainer);
         break;
       }
+
+      // In MultiNodePlacement, Try Allocate on other Available nodes
+      // from Iterator as well before Reserving. Else there won't be any
+      // Allocate of new containers when the first node in the
+      // iterator could not fit and returns RESERVED allocation.
+      if (AllocationState.RESERVED == result.getAllocationState()) {
+        lastReservation = result;
+        if (iter.hasNext()) {
+          continue;
+        } else {
+          result = doAllocation(lastReservation, node, schedulerKey,
+              reservedContainer);
+        }
+      }
     }
 
     return result;
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
index fa85ca7..bb2cbfd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -223,6 +224,7 @@ public class TestCapacitySchedulerMultiNodes extends 
CapacitySchedulerTestBase {
 
     CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
     RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
     LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
     FiCaSchedulerApp schedulerApp1 =
         cs.getApplicationAttempt(am1.getApplicationAttemptId());
@@ -234,12 +236,13 @@ public class TestCapacitySchedulerMultiNodes extends 
CapacitySchedulerTestBase {
      * after its ask has been cancelled when used capacity of root queue is 1.
      */
     // Ask a container with 6GB memory size for app1,
-    // nm1 will reserve a container for app1
+    // nm2 will reserve a container for app1
+    // Last Node from Node Iterator will be RESERVED
     am1.allocate("*", 6 * GB, 1, new ArrayList<>());
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
 
     // Check containers of app1 and app2.
-    Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    Assert.assertNotNull(cs.getNode(nm2.getNodeId()).getReservedContainer());
     Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
     Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
     Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
@@ -324,12 +327,13 @@ public class TestCapacitySchedulerMultiNodes extends 
CapacitySchedulerTestBase {
      * after node has sufficient resource.
      */
     // Ask a container with 6GB memory size for app2,
-    // nm1 will reserve a container for app2
+    // nm2 will reserve a container for app2
+    // Last Node from Node Iterator will be RESERVED
     am2.allocate("*", 6 * GB, 1, new ArrayList<>());
     cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
 
     // Check containers of app1 and app2.
-    Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    Assert.assertNotNull(cs.getNode(nm2.getNodeId()).getReservedContainer());
     Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
     Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
     Assert.assertEquals(1, schedulerApp2.getReservedContainers().size());
@@ -344,4 +348,100 @@ public class TestCapacitySchedulerMultiNodes extends 
CapacitySchedulerTestBase {
 
     rm1.close();
   }
+
+  @Test(timeout=30000)
+  public void testAllocateOfReservedContainerFromAnotherNode()
+      throws Exception {
+    CapacitySchedulerConfiguration newConf =
+        new CapacitySchedulerConfiguration(conf);
+    newConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+    
newConf.setInt(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+        + ".resource-based.sorting-interval.ms", 0);
+    newConf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
+        1.0f);
+    MockRM rm1 = new MockRM(newConf);
+
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 12 * GB, 2);
+    MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB, 2);
+
+    // launch an app1 to queue, AM container will be launched in nm1
+    RMApp app1 = MockRMAppSubmitter.submit(rm1,
+        MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1)
+            .withAppName("app")
+            .withUser("user")
+            .withAcls(null)
+            .withQueue("default")
+            .build());
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // launch another app2 to queue, AM container will be launched in nm2
+    RMApp app2 = MockRMAppSubmitter.submit(rm1,
+        MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1)
+            .withAppName("app")
+            .withUser("user")
+            .withAcls(null)
+            .withQueue("default")
+            .build());
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    // Reserve a Container for app3
+    RMApp app3 = MockRMAppSubmitter.submit(rm1,
+        MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1)
+            .withAppName("app")
+            .withUser("user")
+            .withAcls(null)
+            .withQueue("default")
+            .build());
+
+    final AtomicBoolean result = new AtomicBoolean(false);
+    Thread t = new Thread() {
+      public void run() {
+        try {
+          MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
+          result.set(true);
+        } catch (Exception e) {
+          Assert.fail("Failed to allocate the reserved container");
+        }
+      }
+    };
+    t.start();
+    Thread.sleep(1000);
+
+    // Validate if app3 has got RESERVED container
+    FiCaSchedulerApp schedulerApp =
+        
cs.getApplicationAttempt(app3.getCurrentAppAttempt().getAppAttemptId());
+    Assert.assertEquals("App3 failed to get reserved container", 1,
+        schedulerApp.getReservedContainers().size());
+
+    // Free the Space on other node where Reservation has not happened
+    if (cs.getNode(rmNode1.getNodeID()).getReservedContainer() != null) {
+      rm1.killApp(app2.getApplicationId());
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    } else {
+      rm1.killApp(app1.getApplicationId());
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+
+    // Check if Reserved AM of app3 gets allocated in
+    // node where space available
+    while (!result.get()) {
+      Thread.sleep(100);
+    }
+
+    // Validate release of reserved containers
+    schedulerApp =
+        
cs.getApplicationAttempt(app3.getCurrentAppAttempt().getAppAttemptId());
+    Assert.assertEquals("App3 failed to release Reserved container", 0,
+        schedulerApp.getReservedContainers().size());
+    Assert.assertNull(cs.getNode(rmNode1.getNodeID()).getReservedContainer());
+    Assert.assertNull(cs.getNode(rmNode2.getNodeID()).getReservedContainer());
+
+    rm1.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to