http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java new file mode 100644 index 0000000..9854a15 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class TestCapacitySchedulerAsyncScheduling { + private final int GB = 1024; + + private YarnConfiguration conf; + + RMNodeLabelsManager mgr; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true); + mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + } + + @Test(timeout = 300000) + public void testSingleThreadAsyncContainerAllocation() throws Exception { + testAsyncContainerAllocation(1); + } + + @Test(timeout = 300000) + public void testTwoThreadsAsyncContainerAllocation() throws Exception { + testAsyncContainerAllocation(2); + } + + @Test(timeout = 300000) + public void testThreeThreadsAsyncContainerAllocation() throws Exception { + testAsyncContainerAllocation(3); + } + + public void testAsyncContainerAllocation(int numThreads) throws Exception { + conf.setInt( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, + numThreads); + conf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + + ".scheduling-interval-ms", 100); + + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + + // inject node label manager + MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + + List<MockNM> nms = new ArrayList<>(); + // Add 10 nodes to the cluster, in the cluster we have 200 GB resource + for (int i = 0; i < 10; i++) { + nms.add(rm.registerNode("h-" + i + ":1234", 20 * GB)); + } + + List<MockAM> ams = new ArrayList<>(); + // Add 3 applications to the cluster, one app in one queue + // the i-th app ask (20 * i) containers. So in total we will have + // 123G container allocated + int totalAsked = 3 * GB; // 3 AMs + + for (int i = 0; i < 3; i++) { + RMApp rmApp = rm.submitApp(1024, "app", "user", null, false, + Character.toString((char) (i % 34 + 97)), 1, null, null, false); + MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm); + am.registerAppAttempt(); + ams.add(am); + } + + for (int i = 0; i < 3; i++) { + ams.get(i).allocate("*", 1024, 20 * (i + 1), new ArrayList<>()); + totalAsked += 20 * (i + 1) * GB; + } + + // Wait for at most 15000 ms + int waitTime = 15000; // ms + while (waitTime > 0) { + if (rm.getResourceScheduler().getRootQueueMetrics().getAllocatedMB() + == totalAsked) { + break; + } + Thread.sleep(50); + waitTime -= 50; + } + + Assert.assertEquals( + rm.getResourceScheduler().getRootQueueMetrics().getAllocatedMB(), + totalAsked); + + // Wait for another 2 sec to make sure we will not allocate more than + // required + waitTime = 2000; // ms + while (waitTime > 0) { + Assert.assertEquals( + rm.getResourceScheduler().getRootQueueMetrics().getAllocatedMB(), + totalAsked); + waitTime -= 50; + Thread.sleep(50); + } + + rm.close(); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 7f4fc2c..40e5d2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -110,15 +111,16 @@ public class TestChildQueueOrder { return application; } - private void stubQueueAllocation(final CSQueue queue, - final Resource clusterResource, final FiCaSchedulerNode node, + private void stubQueueAllocation(final CSQueue queue, + final Resource clusterResource, final FiCaSchedulerNode node, final int allocation) { - stubQueueAllocation(queue, clusterResource, node, allocation, + stubQueueAllocation(queue, clusterResource, node, allocation, NodeType.NODE_LOCAL); } - private void stubQueueAllocation(final CSQueue queue, - final Resource clusterResource, final FiCaSchedulerNode node, + @SuppressWarnings("unchecked") + private void stubQueueAllocation(final CSQueue queue, + final Resource clusterResource, final FiCaSchedulerNode node, final int allocation, final NodeType type) { // Simulate the queue allocation @@ -145,7 +147,7 @@ public class TestChildQueueOrder { if (allocation > 0) { doReturn(new CSAssignment(Resources.none(), type)). when(queue) - .assignContainers(eq(clusterResource), eq(node), + .assignContainers(eq(clusterResource), any(PlacementSet.class), any(ResourceLimits.class), any(SchedulingMode.class)); // Mock the node's resource availability @@ -157,7 +159,7 @@ public class TestChildQueueOrder { return new CSAssignment(allocatedResource, type); } }). - when(queue).assignContainers(eq(clusterResource), eq(node), + when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class), any(ResourceLimits.class), any(SchedulingMode.class)); doNothing().when(node).releaseContainer(any(Container.class)); } @@ -214,6 +216,7 @@ public class TestChildQueueOrder { } @Test + @SuppressWarnings("unchecked") public void testSortedQueues() throws Exception { // Setup queue configs setupSortedQueues(csConf); @@ -418,10 +421,10 @@ public class TestChildQueueOrder { clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(d,b); allocationOrder.verify(d).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), any(ResourceLimits.class), + any(PlacementSet.class), any(ResourceLimits.class), any(SchedulingMode.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), any(ResourceLimits.class), + any(PlacementSet.class), any(ResourceLimits.class), any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index e2b4952..555e0fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -81,7 +81,7 @@ public class TestContainerAllocation { mgr.init(conf); } - @Test(timeout = 3000000) + @Test(timeout = 60000) public void testExcessReservationThanNodeManagerCapacity() throws Exception { @SuppressWarnings("resource") MockRM rm = new MockRM(conf); @@ -598,4 +598,47 @@ public class TestContainerAllocation { rm1.close(); } + + @Test(timeout = 60000) + public void testAssignMultipleOffswitchContainers() throws Exception { + MockRM rm1 = new MockRM(); + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 80 * GB); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>()); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + // Do node heartbeats once + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + FiCaSchedulerApp schedulerApp1 = + cs.getApplicationAttempt(am1.getApplicationAttemptId()); + + // App1 will get one container allocated (plus AM container + Assert.assertEquals(2, schedulerApp1.getLiveContainers().size()); + + // Set assign multiple off-switch containers to 3 + CapacitySchedulerConfiguration newCSConf = new CapacitySchedulerConfiguration(); + newCSConf.setInt( + CapacitySchedulerConfiguration.OFFSWITCH_PER_HEARTBEAT_LIMIT, 3); + + cs.reinitialize(newCSConf, rm1.getRMContext()); + + // Do node heartbeats once + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + // App1 will get 3 new container allocated (plus 2 previously allocated + // container) + Assert.assertEquals(5, schedulerApp1.getLiveContainers().size()); + + rm1.close(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index 2614630..0696f57 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -59,9 +59,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica .FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; public class TestContainerResizing { @@ -97,13 +100,14 @@ public class TestContainerResizing { } @Override - public synchronized void allocateContainersToNode(FiCaSchedulerNode node) { + public CSAssignment allocateContainersToNode( + PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) { try { Thread.sleep(1000); } catch(InterruptedException e) { LOG.debug("Thread interrupted."); } - super.allocateContainersToNode(node); + return super.allocateContainersToNode(ps, withNodeHeartbeat); } } @@ -452,7 +456,7 @@ public class TestContainerResizing { ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); sentRMContainerLaunched(rm1, containerId1); - // am1 asks to change its AM container from 1GB to 3GB + // am1 asks to change its AM container from 1GB to 7GB am1.sendContainerResizingRequest(Arrays.asList( UpdateContainerRequest .newInstance(0, containerId1, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org