Repository: hadoop Updated Branches: refs/heads/branch-2 81bbee685 -> 9ebbf1bfc
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java new file mode 100644 index 0000000..cf1b26f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -0,0 +1,1027 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +public class TestNodeLabelContainerAllocation { + private final int GB = 1024; + + private YarnConfiguration conf; + + RMNodeLabelsManager mgr; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + } + + private Configuration getConfigurationWithQueueLabels(Configuration config) { + CapacitySchedulerConfiguration conf = + new CapacitySchedulerConfiguration(config); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"}); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 10); + conf.setMaximumCapacity(A, 15); + conf.setAccessibleNodeLabels(A, toSet("x")); + conf.setCapacityByLabel(A, "x", 100); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf.setCapacity(B, 20); + conf.setAccessibleNodeLabels(B, toSet("y")); + conf.setCapacityByLabel(B, "y", 100); + + final String C = CapacitySchedulerConfiguration.ROOT + ".c"; + conf.setCapacity(C, 70); + conf.setMaximumCapacity(C, 70); + conf.setAccessibleNodeLabels(C, RMNodeLabelsManager.EMPTY_STRING_SET); + + // Define 2nd-level queues + final String A1 = A + ".a1"; + conf.setQueues(A, new String[] {"a1"}); + conf.setCapacity(A1, 100); + conf.setMaximumCapacity(A1, 100); + conf.setCapacityByLabel(A1, "x", 100); + + final String B1 = B + ".b1"; + conf.setQueues(B, new String[] {"b1"}); + conf.setCapacity(B1, 100); + conf.setMaximumCapacity(B1, 100); + conf.setCapacityByLabel(B1, "y", 100); + + final String C1 = C + ".c1"; + conf.setQueues(C, new String[] {"c1"}); + conf.setCapacity(C1, 100); + conf.setMaximumCapacity(C1, 100); + + return conf; + } + + private void checkTaskContainersHost(ApplicationAttemptId attemptId, + ContainerId containerId, ResourceManager rm, String host) { + YarnScheduler scheduler = rm.getRMContext().getScheduler(); + SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId); + + Assert.assertTrue(appReport.getLiveContainers().size() > 0); + for (RMContainer c : appReport.getLiveContainers()) { + if (c.getContainerId().equals(containerId)) { + Assert.assertEquals(host, c.getAllocatedNode().getHost()); + } + } + } + + @SuppressWarnings("unchecked") + private <E> Set<E> toSet(E... elements) { + Set<E> set = Sets.newHashSet(elements); + return set; + } + + + @Test (timeout = 300000) + public void testContainerAllocationWithSingleUserLimits() throws Exception { + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"), + NodeId.newInstance("h2", 0), toSet("y"))); + + // inject node label manager + MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x + rm1.registerNode("h2:1234", 8000); // label = y + MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty> + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // A has only 10% of x, so it can only allocate one container in label=empty + ContainerId containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), ""); + Assert.assertTrue(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + // Cannot allocate 2nd label=empty container + containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); + am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), ""); + Assert.assertFalse(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + + // A has default user limit = 100, so it can use all resource in label = x + // We can allocate floor(8000 / 1024) = 7 containers + for (int id = 3; id <= 8; id++) { + containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), id); + am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x"); + Assert.assertTrue(rm1.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + } + rm1.close(); + } + + @Test(timeout = 300000) + public void testContainerAllocateWithComplexLabels() throws Exception { + /* + * Queue structure: + * root (*) + * ________________ + * / \ + * a x(100%), y(50%) b y(50%), z(100%) + * ________________ ______________ + * / / \ + * a1 (x,y) b1(no) b2(y,z) + * 100% y = 100%, z = 100% + * + * Node structure: + * h1 : x + * h2 : y + * h3 : y + * h4 : z + * h5 : NO + * + * Total resource: + * x: 4G + * y: 6G + * z: 2G + * *: 2G + * + * Resource of + * a1: x=4G, y=3G, NO=0.2G + * b1: NO=0.9G (max=1G) + * b2: y=3, z=2G, NO=0.9G (max=1G) + * + * Each node can only allocate two containers + */ + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), + toSet("x"), NodeId.newInstance("h2", 0), toSet("y"), + NodeId.newInstance("h3", 0), toSet("y"), NodeId.newInstance("h4", 0), + toSet("z"), NodeId.newInstance("h5", 0), + RMNodeLabelsManager.EMPTY_STRING_SET)); + + // inject node label manager + MockRM rm1 = new MockRM(TestUtils.getComplexConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 2048); + MockNM nm2 = rm1.registerNode("h2:1234", 2048); + MockNM nm3 = rm1.registerNode("h3:1234", 2048); + MockNM nm4 = rm1.registerNode("h4:1234", 2048); + MockNM nm5 = rm1.registerNode("h5:1234", 2048); + + ContainerId containerId; + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // request a container (label = y). can be allocated on nm2 + am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y"); + containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L); + Assert.assertTrue(rm1.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, + "h2"); + + // launch an app to queue b1 (label = y), and check all container will + // be allocated in h5 + RMApp app2 = rm1.submitApp(1024, "app", "user", null, "b1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5); + + // request a container for AM, will succeed + // and now b1's queue capacity will be used, cannot allocate more containers + // (Maximum capacity reached) + am2.allocate("*", 1024, 1, new ArrayList<ContainerId>()); + containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm4, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertFalse(rm1.waitForState(nm5, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + + // launch an app to queue b2 + RMApp app3 = rm1.submitApp(1024, "app", "user", null, "b2"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5); + + // request a container. try to allocate on nm1 (label = x) and nm3 (label = + // y,z). Will successfully allocate on nm3 + am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y"); + containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1, + "h3"); + + // try to allocate container (request label = z) on nm4 (label = y,z). + // Will successfully allocate on nm4 only. + am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "z"); + containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 3L); + Assert.assertTrue(rm1.waitForState(nm4, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1, + "h4"); + + rm1.close(); + } + + @Test (timeout = 120000) + public void testContainerAllocateWithLabels() throws Exception { + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"), + NodeId.newInstance("h2", 0), toSet("y"))); + + // inject node label manager + MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y + MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty> + + ContainerId containerId; + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm3); + + // request a container. + am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x"); + containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, + "h1"); + + // launch an app to queue b1 (label = y), and check all container will + // be allocated in h2 + RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3); + + // request a container. + am2.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y"); + containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1, + "h2"); + + // launch an app to queue c1 (label = ""), and check all container will + // be allocated in h3 + RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3); + + // request a container. + am3.allocate("*", 1024, 1, new ArrayList<ContainerId>()); + containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1, + "h3"); + + rm1.close(); + } + + @Test (timeout = 120000) + public void testContainerAllocateWithDefaultQueueLabels() throws Exception { + // This test is pretty much similar to testContainerAllocateWithLabel. + // Difference is, this test doesn't specify label expression in ResourceRequest, + // instead, it uses default queue label expression + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"), + NodeId.newInstance("h2", 0), toSet("y"))); + + // inject node label manager + MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y + MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty> + + ContainerId containerId; + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // request a container. + am1.allocate("*", 1024, 1, new ArrayList<ContainerId>()); + containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, + "h1"); + + // launch an app to queue b1 (label = y), and check all container will + // be allocated in h2 + RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // request a container. + am2.allocate("*", 1024, 1, new ArrayList<ContainerId>()); + containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1, + "h2"); + + // launch an app to queue c1 (label = ""), and check all container will + // be allocated in h3 + RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3); + + // request a container. + am3.allocate("*", 1024, 1, new ArrayList<ContainerId>()); + containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1, + "h3"); + + rm1.close(); + } + + private void checkPendingResource(MockRM rm, int priority, + ApplicationAttemptId attemptId, int memory) { + CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler(); + FiCaSchedulerApp app = cs.getApplicationAttempt(attemptId); + ResourceRequest rr = + app.getAppSchedulingInfo().getResourceRequest( + Priority.newInstance(priority), "*"); + Assert.assertEquals(memory, + rr.getCapability().getMemory() * rr.getNumContainers()); + } + + private void checkLaunchedContainerNumOnNode(MockRM rm, NodeId nodeId, + int numContainers) { + CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler(); + SchedulerNode node = cs.getSchedulerNode(nodeId); + Assert.assertEquals(numContainers, node.getNumContainers()); + } + + @Test + public void testPreferenceOfNeedyAppsTowardsNodePartitions() throws Exception { + /** + * Test case: Submit two application to a queue (app1 first then app2), app1 + * asked for no-label, app2 asked for label=x, when node1 has label=x + * doing heart beat, app2 will get allocation first, even if app2 submits later + * than app1 + */ + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y")); + // Makes y to be non-exclusive node labels + mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("y", false))); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y"))); + + // inject node label manager + MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y + MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty> + + // launch an app to queue b1 (label = y), AM container should be launched in nm2 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2); + + // launch another app to queue b1 (label = y), AM container should be launched in nm2 + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // request container and nm1 do heartbeat (nm2 has label=y), note that app1 + // request non-labeled container, and app2 request labeled container, app2 + // will get allocated first even if app1 submitted first. + am1.allocate("*", 1 * GB, 8, new ArrayList<ContainerId>()); + am2.allocate("*", 1 * GB, 8, new ArrayList<ContainerId>(), "y"); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // Do node heartbeats many times + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App2 will get preference to be allocated on node1, and node1 will be all + // used by App2. + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId()); + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(am2.getApplicationAttemptId()); + // app1 get nothing in nm1 (partition=y) + checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(), schedulerApp1); + checkNumOfContainersInAnAppOnGivenNode(9, nm2.getNodeId(), schedulerApp1); + // app2 get all resource in nm1 (partition=y) + checkNumOfContainersInAnAppOnGivenNode(8, nm1.getNodeId(), schedulerApp2); + checkNumOfContainersInAnAppOnGivenNode(1, nm2.getNodeId(), schedulerApp2); + + rm1.close(); + } + + private void checkNumOfContainersInAnAppOnGivenNode(int expectedNum, + NodeId nodeId, FiCaSchedulerApp app) { + int num = 0; + for (RMContainer container : app.getLiveContainers()) { + if (container.getAllocatedNode().equals(nodeId)) { + num++; + } + } + Assert.assertEquals(expectedNum, num); + } + + @Test + public void + testPreferenceOfNeedyPrioritiesUnderSameAppTowardsNodePartitions() + throws Exception { + /** + * Test case: Submit one application, it asks label="" in priority=1 and + * label="x" in priority=2, when a node with label=x heartbeat, priority=2 + * will get allocation first even if there're pending resource in priority=1 + */ + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y")); + // Makes y to be non-exclusive node labels + mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("y", false))); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y"))); + + // inject node label manager + MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y + MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty> + + ContainerId nextContainerId; + + // launch an app to queue b1 (label = y), AM container should be launched in nm3 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2); + + // request containers from am2, priority=1 asks for "" and priority=2 asks + // for "y", "y" container should be allocated first + nextContainerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + am1.allocate("*", 1 * GB, 1, 1, new ArrayList<ContainerId>(), ""); + am1.allocate("*", 1 * GB, 1, 2, new ArrayList<ContainerId>(), "y"); + Assert.assertTrue(rm1.waitForState(nm1, nextContainerId, + RMContainerState.ALLOCATED, 10 * 1000)); + + // Check pending resource for am2, priority=1 doesn't get allocated before + // priority=2 allocated + checkPendingResource(rm1, 1, am1.getApplicationAttemptId(), 1 * GB); + checkPendingResource(rm1, 2, am1.getApplicationAttemptId(), 0 * GB); + + rm1.close(); + } + + @Test + public void testNonLabeledResourceRequestGetPreferrenceToNonLabeledNode() + throws Exception { + /** + * Test case: Submit one application, it asks 6 label="" containers, NM1 + * with label=y and NM2 has no label, NM1/NM2 doing heartbeat together. Even + * if NM1 has idle resource, containers are all allocated to NM2 since + * non-labeled request should get allocation on non-labeled nodes first. + */ + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y")); + // Makes x to be non-exclusive node labels + mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false))); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y + MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty> + + ContainerId nextContainerId; + + // launch an app to queue b1 (label = y), AM container should be launched in nm3 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2); + + // request containers from am2, priority=1 asks for "" * 6 (id from 4 to 9), + // nm2/nm3 do + // heartbeat at the same time, check containers are always allocated to nm3. + // This is to verify when there's resource available in non-labeled + // partition, non-labeled resource should allocate to non-labeled partition + // first. + am1.allocate("*", 1 * GB, 6, 1, new ArrayList<ContainerId>(), ""); + for (int i = 2; i < 2 + 6; i++) { + nextContainerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), i); + Assert.assertTrue(rm1.waitForState(Arrays.asList(nm1, nm2), + nextContainerId, RMContainerState.ALLOCATED, 10 * 1000)); + } + // no more container allocated on nm1 + checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 0); + // all 7 (1 AM container + 6 task container) containers allocated on nm2 + checkLaunchedContainerNumOnNode(rm1, nm2.getNodeId(), 7); + + rm1.close(); + } + + @Test + public void testPreferenceOfQueuesTowardsNodePartitions() + throws Exception { + /** + * Test case: have a following queue structure: + * + * <pre> + * root + * / | \ + * a b c + * / \ / \ / \ + * a1 a2 b1 b2 c1 c2 + * (x) (x) (x) + * </pre> + * + * Only a1, b1, c1 can access label=x, and their default label=x Each each + * has one application, asks for 5 containers. NM1 has label=x + * + * NM1/NM2 doing heartbeat for 15 times, it should allocate all 15 + * containers with label=x + */ + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"}); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(A, 33); + csConf.setAccessibleNodeLabels(A, toSet("x")); + csConf.setCapacityByLabel(A, "x", 33); + csConf.setQueues(A, new String[] {"a1", "a2"}); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + csConf.setCapacity(B, 33); + csConf.setAccessibleNodeLabels(B, toSet("x")); + csConf.setCapacityByLabel(B, "x", 33); + csConf.setQueues(B, new String[] {"b1", "b2"}); + + final String C = CapacitySchedulerConfiguration.ROOT + ".c"; + csConf.setCapacity(C, 34); + csConf.setAccessibleNodeLabels(C, toSet("x")); + csConf.setCapacityByLabel(C, "x", 34); + csConf.setQueues(C, new String[] {"c1", "c2"}); + + // Define 2nd-level queues + final String A1 = A + ".a1"; + csConf.setCapacity(A1, 50); + csConf.setCapacityByLabel(A1, "x", 100); + csConf.setDefaultNodeLabelExpression(A1, "x"); + + final String A2 = A + ".a2"; + csConf.setCapacity(A2, 50); + csConf.setCapacityByLabel(A2, "x", 0); + + final String B1 = B + ".b1"; + csConf.setCapacity(B1, 50); + csConf.setCapacityByLabel(B1, "x", 100); + csConf.setDefaultNodeLabelExpression(B1, "x"); + + final String B2 = B + ".b2"; + csConf.setCapacity(B2, 50); + csConf.setCapacityByLabel(B2, "x", 0); + + final String C1 = C + ".c1"; + csConf.setCapacity(C1, 50); + csConf.setCapacityByLabel(C1, "x", 100); + csConf.setDefaultNodeLabelExpression(C1, "x"); + + final String C2 = C + ".c2"; + csConf.setCapacity(C2, 50); + csConf.setCapacityByLabel(C2, "x", 0); + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y")); + // Makes x to be non-exclusive node labels + mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false))); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty> + + // app1 -> a1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // app2 -> a2 + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a2"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // app3 -> b1 + RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "b1"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1); + + // app4 -> b2 + RMApp app4 = rm1.submitApp(1 * GB, "app", "user", null, "b2"); + MockAM am4 = MockRM.launchAndRegisterAM(app4, rm1, nm2); + + // app5 -> c1 + RMApp app5 = rm1.submitApp(1 * GB, "app", "user", null, "c1"); + MockAM am5 = MockRM.launchAndRegisterAM(app5, rm1, nm1); + + // app6 -> b2 + RMApp app6 = rm1.submitApp(1 * GB, "app", "user", null, "c2"); + MockAM am6 = MockRM.launchAndRegisterAM(app6, rm1, nm2); + + // Each application request 5 * 1GB container + am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>()); + am2.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>()); + am3.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>()); + am4.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>()); + am5.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>()); + am6.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>()); + + // NM1 do 15 heartbeats + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + for (int i = 0; i < 15; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + // NM1 get 15 new containers (total is 18, 15 task containers and 3 AM + // containers) + checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 18); + + // Check pending resource each application + // APP1/APP3/APP5 get satisfied, and APP2/APP2/APP3 get nothing. + checkPendingResource(rm1, 1, am1.getApplicationAttemptId(), 0 * GB); + checkPendingResource(rm1, 1, am2.getApplicationAttemptId(), 5 * GB); + checkPendingResource(rm1, 1, am3.getApplicationAttemptId(), 0 * GB); + checkPendingResource(rm1, 1, am4.getApplicationAttemptId(), 5 * GB); + checkPendingResource(rm1, 1, am5.getApplicationAttemptId(), 0 * GB); + checkPendingResource(rm1, 1, am6.getApplicationAttemptId(), 5 * GB); + + rm1.close(); + } + + @Test + public void testQueuesWithoutAccessUsingPartitionedNodes() throws Exception { + /** + * Test case: have a following queue structure: + * + * <pre> + * root + * / \ + * a b + * (x) + * </pre> + * + * Only a can access label=x, two nodes in the cluster, n1 has x and n2 has + * no-label. + * + * When user-limit-factor=5, submit one application in queue b and request + * for infinite containers should be able to use up all cluster resources. + */ + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(A, 50); + csConf.setAccessibleNodeLabels(A, toSet("x")); + csConf.setCapacityByLabel(A, "x", 100); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + csConf.setCapacity(B, 50); + csConf.setAccessibleNodeLabels(B, new HashSet<String>()); + csConf.setUserLimitFactor(B, 5); + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x")); + // Makes x to be non-exclusive node labels + mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false))); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <empty> + + // app1 -> b + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2); + + // Each application request 5 * 1GB container + am1.allocate("*", 1 * GB, 50, new ArrayList<ContainerId>()); + + // NM1 do 50 heartbeats + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); + + // How much cycles we waited to be allocated when available resource only on + // partitioned node + int cycleWaited = 0; + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + if (schedulerNode1.getNumContainers() == 0) { + cycleWaited++; + } + } + // We will will 10 cycles before get allocated on partitioned node + // NM2 can allocate 10 containers totally, exclude already allocated AM + // container, we will wait 9 to fulfill non-partitioned node, and need wait + // one more cycle before allocating to non-partitioned node + Assert.assertEquals(10, cycleWaited); + + // Both NM1/NM2 launched 10 containers, cluster resource is exhausted + checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 10); + checkLaunchedContainerNumOnNode(rm1, nm2.getNodeId(), 10); + + rm1.close(); + } + + @Test + public void testAMContainerAllocationWillAlwaysBeExclusive() + throws Exception { + /** + * Test case: Submit one application without partition, trying to allocate a + * node has partition=x, it should fail to allocate since AM container will + * always respect exclusivity for partitions + */ + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y")); + // Makes x to be non-exclusive node labels + mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false))); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x + + // launch an app to queue b1 (label = y), AM container should be launched in nm3 + rm1.submitApp(1 * GB, "app", "user", null, "b1"); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + // Heartbeat for many times, app1 should get nothing + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + Assert.assertEquals(0, cs.getSchedulerNode(nm1.getNodeId()) + .getNumContainers()); + + rm1.close(); + } + + @Test + public void + testQueueMaxCapacitiesWillNotBeHonoredWhenNotRespectingExclusivity() + throws Exception { + /** + * Test case: have a following queue structure: + * + * <pre> + * root + * / \ + * a b + * (x) (x) + * </pre> + * + * a/b can access x, both of them has max-capacity-on-x = 50 + * + * When doing non-exclusive allocation, app in a (or b) can use 100% of x + * resource. + */ + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a", + "b" }); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(A, 50); + csConf.setAccessibleNodeLabels(A, toSet("x")); + csConf.setCapacityByLabel(A, "x", 50); + csConf.setMaximumCapacityByLabel(A, "x", 50); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + csConf.setCapacity(B, 50); + csConf.setAccessibleNodeLabels(B, toSet("x")); + csConf.setCapacityByLabel(B, "x", 50); + csConf.setMaximumCapacityByLabel(B, "x", 50); + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x")); + // Makes x to be non-exclusive node labels + mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false))); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <empty> + + // app1 -> a + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2); + + // app1 asks for 10 partition= containers + am1.allocate("*", 1 * GB, 10, new ArrayList<ContainerId>()); + + // NM1 do 50 heartbeats + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + // app1 gets all resource in partition=x + Assert.assertEquals(10, schedulerNode1.getNumContainers()); + + rm1.close(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 7da1c97..52d0bc1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -45,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -146,7 +146,7 @@ public class TestParentQueue { final Resource allocatedResource = Resources.createResource(allocation); if (queue instanceof ParentQueue) { ((ParentQueue)queue).allocateResource(clusterResource, - allocatedResource, null); + allocatedResource, RMNodeLabelsManager.NO_LABEL); } else { FiCaSchedulerApp app1 = getMockApplication(0, ""); ((LeafQueue)queue).allocateResource(clusterResource, app1, @@ -157,7 +157,7 @@ public class TestParentQueue { if (allocation > 0) { doReturn(new CSAssignment(Resources.none(), type)).when(queue) .assignContainers(eq(clusterResource), eq(node), - any(ResourceLimits.class)); + any(ResourceLimits.class), any(SchedulingMode.class)); // Mock the node's resource availability Resource available = node.getAvailableResource(); @@ -168,7 +168,7 @@ public class TestParentQueue { return new CSAssignment(allocatedResource, type); } }).when(queue).assignContainers(eq(clusterResource), eq(node), - any(ResourceLimits.class)); + any(ResourceLimits.class), any(SchedulingMode.class)); } private float computeQueueAbsoluteUsedCapacity(CSQueue queue, @@ -228,11 +228,16 @@ public class TestParentQueue { LeafQueue a = (LeafQueue)queues.get(A); LeafQueue b = (LeafQueue)queues.get(B); + a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); + b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); + queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage() + .incPending(Resources.createResource(1 * GB)); + // Simulate B returning a container on node_0 stubQueueAllocation(a, clusterResource, node_0, 0*GB); stubQueueAllocation(b, clusterResource, node_0, 1*GB); root.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 1*GB, clusterResource); @@ -240,12 +245,12 @@ public class TestParentQueue { stubQueueAllocation(a, clusterResource, node_1, 2*GB); stubQueueAllocation(b, clusterResource, node_1, 1*GB); root.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(a, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -254,12 +259,12 @@ public class TestParentQueue { stubQueueAllocation(a, clusterResource, node_0, 1*GB); stubQueueAllocation(b, clusterResource, node_0, 2*GB); root.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -268,12 +273,12 @@ public class TestParentQueue { stubQueueAllocation(a, clusterResource, node_0, 0*GB); stubQueueAllocation(b, clusterResource, node_0, 4*GB); root.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); @@ -282,12 +287,12 @@ public class TestParentQueue { stubQueueAllocation(a, clusterResource, node_1, 1*GB); stubQueueAllocation(b, clusterResource, node_1, 1*GB); root.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(a, b); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); verifyQueueMetrics(a, 4*GB, clusterResource); verifyQueueMetrics(b, 9*GB, clusterResource); } @@ -448,16 +453,27 @@ public class TestParentQueue { // Start testing CSQueue a = queues.get(A); + a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); CSQueue b = queues.get(B); + b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); CSQueue c = queues.get(C); + c.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); CSQueue d = queues.get(D); + d.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); CSQueue a1 = queues.get(A1); + a1.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); CSQueue a2 = queues.get(A2); + a2.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); CSQueue b1 = queues.get(B1); + b1.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); CSQueue b2 = queues.get(B2); + b2.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); CSQueue b3 = queues.get(B3); + b3.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); + queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage() + .incPending(Resources.createResource(1 * GB)); // Simulate C returning a container on node_0 stubQueueAllocation(a, clusterResource, node_0, 0*GB); @@ -465,7 +481,7 @@ public class TestParentQueue { stubQueueAllocation(c, clusterResource, node_0, 1*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); root.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 0*GB, clusterResource); verifyQueueMetrics(c, 1*GB, clusterResource); @@ -478,7 +494,7 @@ public class TestParentQueue { stubQueueAllocation(b2, clusterResource, node_1, 4*GB); stubQueueAllocation(c, clusterResource, node_1, 0*GB); root.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); verifyQueueMetrics(c, 1*GB, clusterResource); @@ -490,14 +506,14 @@ public class TestParentQueue { stubQueueAllocation(b3, clusterResource, node_0, 2*GB); stubQueueAllocation(c, clusterResource, node_0, 2*GB); root.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(a, c, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 6*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -517,16 +533,16 @@ public class TestParentQueue { stubQueueAllocation(b1, clusterResource, node_2, 1*GB); stubQueueAllocation(c, clusterResource, node_2, 1*GB); root.assignContainers(clusterResource, node_2, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(a, a2, a1, b, c); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); allocationOrder.verify(a2).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); verifyQueueMetrics(c, 4*GB, clusterResource); @@ -622,12 +638,16 @@ public class TestParentQueue { // Start testing LeafQueue a = (LeafQueue)queues.get(A); LeafQueue b = (LeafQueue)queues.get(B); + a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); + b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); + queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage() + .incPending(Resources.createResource(1 * GB)); // Simulate B returning a container on node_0 stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH); stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); root.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 1*GB, clusterResource); @@ -636,12 +656,12 @@ public class TestParentQueue { stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL); stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH); root.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(a, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -651,12 +671,12 @@ public class TestParentQueue { stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL); stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH); root.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -691,12 +711,19 @@ public class TestParentQueue { // Start testing LeafQueue b3 = (LeafQueue)queues.get(B3); LeafQueue b2 = (LeafQueue)queues.get(B2); + b2.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); + b3.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); + queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage() + .incPending(Resources.createResource(1 * GB)); + + CSQueue b = queues.get(B); + b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); // Simulate B3 returning a container on node_0 stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH); stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); root.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verifyQueueMetrics(b2, 0*GB, clusterResource); verifyQueueMetrics(b3, 1*GB, clusterResource); @@ -705,12 +732,12 @@ public class TestParentQueue { stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL); stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH); root.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(b2, b3); allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 2*GB, clusterResource); @@ -720,12 +747,12 @@ public class TestParentQueue { stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL); stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); root.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b3, b2); allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 3*GB, clusterResource); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index e8a8243..47be618 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -48,10 +48,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -266,7 +266,7 @@ public class TestReservations { // Start testing... // Only AM a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(2 * GB, a.getUsedResources().getMemory()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -278,7 +278,7 @@ public class TestReservations { // Only 1 map - simulating reduce a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(5 * GB, a.getUsedResources().getMemory()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -290,7 +290,7 @@ public class TestReservations { // Only 1 map to other node - simulating reduce a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -305,7 +305,7 @@ public class TestReservations { // try to assign reducer (5G on node 0 and should reserve) a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -321,7 +321,7 @@ public class TestReservations { // assign reducer to node 2 a.assignContainers(clusterResource, node_2, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(18 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -338,7 +338,7 @@ public class TestReservations { // node_1 heartbeat and unreserves from node_0 in order to allocate // on node_1 a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(18 * GB, a.getUsedResources().getMemory()); assertEquals(18 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -422,7 +422,7 @@ public class TestReservations { // Start testing... // Only AM a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(2 * GB, a.getUsedResources().getMemory()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -434,7 +434,7 @@ public class TestReservations { // Only 1 map - simulating reduce a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(5 * GB, a.getUsedResources().getMemory()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -446,7 +446,7 @@ public class TestReservations { // Only 1 map to other node - simulating reduce a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -461,7 +461,7 @@ public class TestReservations { // try to assign reducer (5G on node 0 and should reserve) a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -477,7 +477,7 @@ public class TestReservations { // assign reducer to node 2 a.assignContainers(clusterResource, node_2, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(18 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -494,7 +494,7 @@ public class TestReservations { // node_1 heartbeat and won't unreserve from node_0, potentially stuck // if AM doesn't handle a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(18 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -570,7 +570,7 @@ public class TestReservations { // Start testing... // Only AM a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(2 * GB, a.getUsedResources().getMemory()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -581,7 +581,7 @@ public class TestReservations { // Only 1 map - simulating reduce a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(5 * GB, a.getUsedResources().getMemory()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -592,7 +592,7 @@ public class TestReservations { // Only 1 map to other node - simulating reduce a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -606,7 +606,7 @@ public class TestReservations { // try to assign reducer (5G on node 0 and should reserve) a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -621,7 +621,7 @@ public class TestReservations { // could allocate but told need to unreserve first a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -823,7 +823,7 @@ public class TestReservations { // Start testing... // Only AM a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(2 * GB, a.getUsedResources().getMemory()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -834,7 +834,7 @@ public class TestReservations { // Only 1 map - simulating reduce a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(5 * GB, a.getUsedResources().getMemory()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -845,7 +845,7 @@ public class TestReservations { // Only 1 map to other node - simulating reduce a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -860,15 +860,16 @@ public class TestReservations { Resource capability = Resources.createResource(32 * GB, 0); boolean res = a.canAssignToThisQueue(clusterResource, - CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( - clusterResource), capability, Resources.none()); + RMNodeLabelsManager.NO_LABEL, new ResourceLimits( + clusterResource), capability, Resources.none(), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertFalse(res); // now add in reservations and make sure it continues if config set // allocate to queue so that the potential new capacity is greater then // absoluteMaxCapacity a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -881,16 +882,17 @@ public class TestReservations { capability = Resources.createResource(5 * GB, 0); res = a.canAssignToThisQueue(clusterResource, - CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( - clusterResource), capability, Resources - .createResource(5 * GB)); + RMNodeLabelsManager.NO_LABEL, new ResourceLimits( + clusterResource), capability, Resources.createResource(5 * GB), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertTrue(res); // tell to not check reservations res = a.canAssignToThisQueue(clusterResource, - CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( - clusterResource), capability, Resources.none()); + RMNodeLabelsManager.NO_LABEL, new ResourceLimits( + clusterResource), capability, Resources.none(), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertFalse(res); refreshQueuesTurnOffReservationsContLook(a, csConf); @@ -899,15 +901,16 @@ public class TestReservations { // in since feature is off res = a.canAssignToThisQueue(clusterResource, - CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( - clusterResource), capability, Resources.none()); + RMNodeLabelsManager.NO_LABEL, new ResourceLimits( + clusterResource), capability, Resources.none(), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertFalse(res); res = a.canAssignToThisQueue(clusterResource, - CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( - clusterResource), capability, Resources - .createResource(5 * GB)); + RMNodeLabelsManager.NO_LABEL, new ResourceLimits( + clusterResource), capability, Resources.createResource(5 * GB), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertFalse(res); } @@ -1008,7 +1011,7 @@ public class TestReservations { // Start testing... // Only AM a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(2 * GB, a.getUsedResources().getMemory()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1019,7 +1022,7 @@ public class TestReservations { // Only 1 map - simulating reduce a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(5 * GB, a.getUsedResources().getMemory()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1030,7 +1033,7 @@ public class TestReservations { // Only 1 map to other node - simulating reduce a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1044,7 +1047,7 @@ public class TestReservations { // allocate to queue so that the potential new capacity is greater then // absoluteMaxCapacity a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(5 * GB, app_0.getCurrentReservation().getMemory()); @@ -1059,18 +1062,18 @@ public class TestReservations { // set limit so subtrace reservations it can continue Resource limit = Resources.createResource(12 * GB, 0); boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0, - true, null); + true, ""); assertTrue(res); // tell it not to check for reservations and should fail as already over // limit - res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, null); + res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, ""); assertFalse(res); refreshQueuesTurnOffReservationsContLook(a, csConf); // should now return false since feature off - res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, null); + res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, ""); assertFalse(res); } @@ -1143,7 +1146,7 @@ public class TestReservations { // Start testing... // Only AM a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(2 * GB, a.getUsedResources().getMemory()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1155,7 +1158,7 @@ public class TestReservations { // Only 1 map - simulating reduce a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(5 * GB, a.getUsedResources().getMemory()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1167,7 +1170,7 @@ public class TestReservations { // Only 1 map to other node - simulating reduce a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1183,7 +1186,7 @@ public class TestReservations { // some resource. Even with continous reservation looking, we don't allow // unreserve resource to reserve container. a.assignContainers(clusterResource, node_0, - new ResourceLimits(Resources.createResource(10 * GB))); + new ResourceLimits(Resources.createResource(10 * GB)), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1199,7 +1202,7 @@ public class TestReservations { // used (8G) + required (5G). It will not reserved since it has to unreserve // some resource. Unfortunately, there's nothing to unreserve. a.assignContainers(clusterResource, node_2, - new ResourceLimits(Resources.createResource(10 * GB))); + new ResourceLimits(Resources.createResource(10 * GB)), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1213,7 +1216,7 @@ public class TestReservations { // let it assign 5G to node_2 a.assignContainers(clusterResource, node_2, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1226,7 +1229,7 @@ public class TestReservations { // reserve 8G node_0 a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(21 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(8 * GB, a.getMetrics().getReservedMB()); @@ -1241,7 +1244,7 @@ public class TestReservations { // continued to try due to having reservation above, // but hits queue limits so can't reserve anymore. a.assignContainers(clusterResource, node_2, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(21 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(8 * GB, a.getMetrics().getReservedMB()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 62135b9..84abf4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -160,6 +160,7 @@ public class TestUtils { request.setCapability(capability); request.setRelaxLocality(relaxLocality); request.setPriority(priority); + request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); return request; } @@ -273,6 +274,7 @@ public class TestUtils { conf.setCapacity(B1, 100); conf.setMaximumCapacity(B1, 100); conf.setCapacityByLabel(B1, "y", 100); + conf.setMaximumApplicationMasterResourcePerQueuePercent(B1, 1f); final String C1 = C + ".c1"; conf.setQueues(C, new String[] {"c1"});