YARN-8138. Add unit test to validate queue priority preemption works under node 
partition. (Zian Chen via wangda)

Change-Id: Ibebfab98a714c12c2dc643b6d7b9754a7f813632
(cherry picked from commit 6ee62e6b1c9b4bc3447ce870446068e626b1a492)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/896b473f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/896b473f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/896b473f

Branch: refs/heads/HDFS-7240
Commit: 896b473f1b477976e449184ceea075bedd71d6e8
Parents: 669eb7b
Author: Wangda Tan <wan...@apache.org>
Authored: Sat Apr 14 11:04:49 2018 -0700
Committer: Wangda Tan <wan...@apache.org>
Committed: Sat Apr 14 11:08:36 2018 -0700

----------------------------------------------------------------------
 ...TestCapacitySchedulerSurgicalPreemption.java | 150 +++++++++++++++++++
 1 file changed, 150 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/896b473f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
index 9b183c0..2aff82d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
@@ -18,8 +18,12 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -38,18 +42,24 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
+import static org.hamcrest.MatcherAssert.assertThat;
+
 public class TestCapacitySchedulerSurgicalPreemption
     extends CapacitySchedulerPreemptionTestBase {
 
+  private static final int NUM_NM = 5;
   @Override
   @Before
   public void setUp() throws Exception {
@@ -860,6 +870,146 @@ public class TestCapacitySchedulerSurgicalPreemption
     rm1.close();
   }
 
+  private void initializeConfProperties(CapacitySchedulerConfiguration conf)
+      throws IOException {
+
+    conf.setQueues("root", new String[] {"A", "B"});
+    conf.setCapacity("root.A", 50);
+    conf.setCapacity("root.B", 50);
+    conf.setQueuePriority("root.A", 1);
+    conf.setQueuePriority("root.B", 2);
+
+    conf.set(PREFIX + "root.ordering-policy", "priority-utilization");
+    conf.set(PREFIX + 
"ordering-policy.priority-utilization.underutilized-preemption.enabled", 
"true");
+    conf.set(PREFIX + 
"ordering-policy.priority-utilization.underutilized-preemption.allow-move-reservation",
 "false");
+    conf.set(PREFIX + 
"ordering-policy.priority-utilization.underutilized-preemption.reserved-container-delay-ms",
 "0");
+    conf.set(PREFIX + "root.accessible-node-labels.x.capacity", "100");
+
+    // Setup queue access to node labels
+    conf.set(PREFIX + "root.A.accessible-node-labels", "x");
+    conf.set(PREFIX + "root.B.accessible-node-labels", "x");
+    conf.set(PREFIX + "root.A.default-node-label-expression", "x");
+    conf.set(PREFIX + "root.B.default-node-label-expression", "x");
+    conf.set(PREFIX + "root.A.accessible-node-labels.x.capacity", "50");
+    conf.set(PREFIX + "root.B.accessible-node-labels.x.capacity", "50");
+    conf.set(PREFIX + "root.A.user-limit-factor", "100");
+    conf.set(PREFIX + "root.B.user-limit-factor", "100");
+    conf.set(PREFIX + "maximum-am-resource-percent", "1");
+
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+    conf.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "1");
+    conf.set(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 
"1000");
+    conf.set(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL, 
"1000");
+    conf.set(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, "0.5");
+    
conf.set(CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR, 
"1");
+
+  }
+
+  @Test
+  public void testPriorityPreemptionWithNodeLabels() throws Exception {
+    // set up queue priority and capacity
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+
+    initializeConfProperties(conf);
+
+    MockRM rm1 = new MockRM(conf) {
+      protected RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    rm1.start();
+
+    MockNM[] mockNMs = new MockNM[NUM_NM];
+    for (int i = 0; i < NUM_NM; i++) {
+      mockNMs[i] = rm1.registerNode("h" + i + ":1234", 6144);
+    }
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+    mgr.addToCluserNodeLabels(Arrays.asList(NodeLabel.newInstance("x")));
+
+    RMNode[] rmNodes = new RMNode[5];
+    for (int i = 0; i < NUM_NM; i++) {
+      rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId());
+      mgr.replaceLabelsOnNode(
+          ImmutableMap.of(rmNodes[i].getNodeID(), ImmutableSet.of("x")));
+    }
+
+    // launch an app to queue B, AM container launched in nm4
+    RMApp app1 = rm1.submitApp(4096, "app", "user", null, "B");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[4]);
+
+    am1.allocate("*", 4096, NUM_NM-1, new ArrayList<>());
+
+    // Do allocation for nm0-nm3
+    for (int i = 0; i < NUM_NM-1; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+    }
+
+    // App1 should have 5 containers now, one for each node
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(NUM_NM, schedulerApp1.getLiveContainers().size());
+    for (int i = 0; i < NUM_NM; i++) {
+      waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(
+          rmNodes[i].getNodeID()), am1.getApplicationAttemptId(), 1);
+    }
+
+    // Submit app2 to queue A and asks for a 750MB container for AM (on n0)
+    RMApp app2 = rm1.submitApp(1024, "app", "user", null, "A");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[0]);
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
+
+    // Ask NUM_NM-1 * 1500MB containers
+    am2.allocate("*", 2048, NUM_NM-1, new ArrayList<>());
+
+    // Do allocation for n1-n4
+    for (int i = 1; i < NUM_NM; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+    }
+
+    // kill app1
+    rm1.killApp(app1.getApplicationId());
+
+    // Submit app3 to queue B and asks for a 5000MB container for AM (on n2)
+    RMApp app3 = rm1.submitApp(1024, "app", "user", null, "B");
+    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, mockNMs[2]);
+    FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt(
+        ApplicationAttemptId.newInstance(app3.getApplicationId(), 1));
+
+    // Ask NUM_NM * 5000MB containers
+    am3.allocate("*", 5120, NUM_NM, new ArrayList<>());
+
+    // Do allocation for n0-n4
+    for (int i = 0; i < NUM_NM; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+    }
+
+    // Sleep the timeout interval, we should see 2 containers selected
+    Thread.sleep(1000);
+
+    SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
+        getResourceScheduler()).getSchedulingMonitorManager();
+    SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
+    ProportionalCapacityPreemptionPolicy editPolicy =
+        (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
+    editPolicy.editSchedule();
+
+    // We should only allow to preempt 2 containers, on node1 and node2
+    Set<RMContainer> selectedToPreempt =
+        editPolicy.getToPreemptContainers().keySet();
+    Assert.assertEquals(2, selectedToPreempt.size());
+    List<NodeId> selectedToPreemptNodeIds = new ArrayList<>();
+    for (RMContainer rmc : selectedToPreempt) {
+      selectedToPreemptNodeIds.add(rmc.getAllocatedNode());
+    }
+    assertThat(selectedToPreemptNodeIds, CoreMatchers.hasItems(
+        mockNMs[1].getNodeId(), mockNMs[2].getNodeId()));
+
+    rm1.close();
+
+  }
 
   @Test(timeout = 60000)
   public void testPreemptionForFragmentatedCluster() throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to