http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 51b567b..8694efb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CyclicBarrier; +import com.google.common.collect.ImmutableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; @@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -78,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestK import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -196,6 +199,7 @@ public class TestLeafQueue { cs.setRMContext(spyRMContext); cs.init(csConf); + cs.setResourceCalculator(rC); cs.start(); when(spyRMContext.getScheduler()).thenReturn(cs); @@ -268,6 +272,12 @@ public class TestLeafQueue { any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), any(RMContainer.class), any(ContainerStatus.class), any(RMContainerEventType.class), any(CSQueue.class), anyBoolean()); + + // Stub out parent queue's accept and apply. + doReturn(true).when(parent).accept(any(Resource.class), + any(ResourceCommitRequest.class)); + doNothing().when(parent).apply(any(Resource.class), + any(ResourceCommitRequest.class)); return queue; } @@ -339,6 +349,12 @@ public class TestLeafQueue { FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(), + node_0); + final int numNodes = 1; Resource clusterResource = Resources.createResource(numNodes * (8*GB), numNodes * 16); @@ -353,8 +369,10 @@ public class TestLeafQueue { // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0, new ResourceLimits( - clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals( (int)(node_0.getTotalResource().getMemorySize() * a.getCapacity()) - (1*GB), a.getMetrics().getAvailableMB()); @@ -526,6 +544,12 @@ public class TestLeafQueue { FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(), + node_0); + final int numNodes = 1; Resource clusterResource = Resources.createResource(numNodes * (8*GB), numNodes * 16); @@ -544,8 +568,10 @@ public class TestLeafQueue { // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(1*GB, a.getUsedResources().getMemorySize()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -555,8 +581,10 @@ public class TestLeafQueue { // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(2*GB, a.getUsedResources().getMemorySize()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -564,8 +592,10 @@ public class TestLeafQueue { assertEquals(2*GB, a.getMetrics().getAllocatedMB()); // Can't allocate 3rd due to user-limit - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(2*GB, a.getUsedResources().getMemorySize()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -574,8 +604,10 @@ public class TestLeafQueue { // Bump up user-limit-factor, now allocate should work a.setUserLimitFactor(10); - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(3*GB, a.getUsedResources().getMemorySize()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -583,8 +615,10 @@ public class TestLeafQueue { assertEquals(3*GB, a.getMetrics().getAllocatedMB()); // One more should work, for app_1, due to user-limit-factor - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(4*GB, a.getUsedResources().getMemorySize()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -594,8 +628,10 @@ public class TestLeafQueue { // Test max-capacity // Now - no more allocs since we are at max-cap a.setMaxCapacity(0.5f); - a.assignContainers(clusterResource, node_0, new ResourceLimits( - clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(4*GB, a.getUsedResources().getMemorySize()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -687,6 +723,13 @@ public class TestLeafQueue { assign.getResource().getMemorySize() > 0); } + private void applyCSAssignment(Resource clusterResource, CSAssignment assign, + LeafQueue q, final Map<NodeId, FiCaSchedulerNode> nodes, + final Map<ApplicationAttemptId, FiCaSchedulerApp> apps) + throws IOException { + TestUtils.applyResourceCommitRequest(clusterResource, assign, nodes, apps); + } + @Test public void testDRFUserLimits() throws Exception { setUpWithDominantResourceCalculator(); @@ -723,6 +766,12 @@ public class TestLeafQueue { FiCaSchedulerNode node1 = TestUtils.getMockNode(host1, DEFAULT_RACK, 0, 8 * GB, 100); + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node0.getNodeID(), + node0, node1.getNodeID(), node1); + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( + app0.getApplicationAttemptId(), app0, app2.getApplicationAttemptId(), + app2); + int numNodes = 2; Resource clusterResource = Resources.createResource(numNodes * (8 * GB), numNodes * 100); @@ -758,12 +807,14 @@ public class TestLeafQueue { b.assignContainers(clusterResource, node0, new ResourceLimits( clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); LOG.info(assign.toString()); + applyCSAssignment(clusterResource, assign, b, nodes, apps); } while (assign.getResource().getMemorySize() > 0 && assign.getAssignmentInformation().getNumReservations() == 0); do { assign = b.assignContainers(clusterResource, node1, new ResourceLimits( clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assign, b, nodes, apps); } while (assign.getResource().getMemorySize() > 0 && assign.getAssignmentInformation().getNumReservations() == 0); //LOG.info("user_0: " + queueUser0.getUsed()); @@ -847,6 +898,12 @@ public class TestLeafQueue { TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, priority, recordFactory))); + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(), + node_0, node_1.getNodeID(), node_1); + /** * Start testing... */ @@ -859,8 +916,10 @@ public class TestLeafQueue { assertEquals(2, a.getActiveUsersManager().getNumActiveUsers()); // 1 container to user_0 - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(3*GB, a.getUsedResources().getMemorySize()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -868,8 +927,10 @@ public class TestLeafQueue { // Allocate one container to app_1. Even if app_0 // submit earlier, it cannot get this container assigned since user_0 // exceeded user-limit already. - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(4*GB, a.getUsedResources().getMemorySize()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -877,8 +938,10 @@ public class TestLeafQueue { // Allocate one container to app_0, before allocating this container, // user-limit = ceil((4 + 1) / 2) = 3G. app_0's used resource (3G) <= // user-limit. - a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(7*GB, a.getUsedResources().getMemorySize()); assertEquals(6*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -890,7 +953,7 @@ public class TestLeafQueue { } @Test - public void testComputeUserLimitAndSetHeadroom(){ + public void testComputeUserLimitAndSetHeadroom() throws IOException { LeafQueue qb = stubLeafQueue((LeafQueue)queues.get(B)); qb.setMaxCapacity(1.0f); // Users @@ -903,6 +966,9 @@ public class TestLeafQueue { String host_1 = "127.0.0.2"; FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(), + node_0, node_1.getNodeID(), node_1); + final int numNodes = 2; Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1); when(csContext.getNumClusterNodes()).thenReturn(numNodes); @@ -925,6 +991,8 @@ public class TestLeafQueue { FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, qb, qb.getActiveUsersManager(), spyRMContext); + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = new HashMap<>(); + apps.put(app_0.getApplicationAttemptId(), app_0); qb.submitApplicationAttempt(app_0, user_0); Priority u0Priority = TestUtils.createMockPriority(1); SchedulerRequestKey u0SchedKey = toSchedulerKey(u0Priority); @@ -935,8 +1003,10 @@ public class TestLeafQueue { assertEquals("There should only be 1 active user!", 1, qb.getActiveUsersManager().getNumActiveUsers()); //get headroom - qb.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + qb.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps); qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); @@ -949,14 +1019,17 @@ public class TestLeafQueue { FiCaSchedulerApp app_2 = new FiCaSchedulerApp(appAttemptId_2, user_1, qb, qb.getActiveUsersManager(), spyRMContext); + apps.put(app_2.getApplicationAttemptId(), app_2); Priority u1Priority = TestUtils.createMockPriority(2); SchedulerRequestKey u1SchedKey = toSchedulerKey(u1Priority); app_2.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true, u1Priority, recordFactory))); qb.submitApplicationAttempt(app_2, user_1); - qb.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + qb.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps); qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); @@ -984,11 +1057,13 @@ public class TestLeafQueue { FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, qb, qb.getActiveUsersManager(), spyRMContext); + apps.put(app_1.getApplicationAttemptId(), app_1); final ApplicationAttemptId appAttemptId_3 = TestUtils.getMockApplicationAttemptId(3, 0); FiCaSchedulerApp app_3 = new FiCaSchedulerApp(appAttemptId_3, user_1, qb, qb.getActiveUsersManager(), spyRMContext); + apps.put(app_3.getApplicationAttemptId(), app_3); app_1.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, u0Priority, recordFactory))); @@ -997,10 +1072,14 @@ public class TestLeafQueue { u1Priority, recordFactory))); qb.submitApplicationAttempt(app_1, user_0); qb.submitApplicationAttempt(app_3, user_1); - qb.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - qb.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + qb.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps); + applyCSAssignment(clusterResource, + qb.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps); qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(4*GB, qb.getUsedResources().getMemorySize()); @@ -1013,12 +1092,15 @@ public class TestLeafQueue { FiCaSchedulerApp app_4 = new FiCaSchedulerApp(appAttemptId_4, user_0, qb, qb.getActiveUsersManager(), spyRMContext); + apps.put(app_4.getApplicationAttemptId(), app_4); qb.submitApplicationAttempt(app_4, user_0); app_4.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true, u0Priority, recordFactory))); - qb.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + qb.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps); qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, @@ -1078,6 +1160,12 @@ public class TestLeafQueue { FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 16*GB); + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1, app_2.getApplicationAttemptId(), app_2); + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(), + node_0, node_1.getNodeID(), node_1); + final int numNodes = 2; Resource clusterResource = Resources.createResource(numNodes * (16*GB), 1); when(csContext.getNumClusterNodes()).thenReturn(numNodes); @@ -1088,8 +1176,10 @@ public class TestLeafQueue { TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory))); - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(1*GB, a.getUsedResources().getMemorySize()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1105,8 +1195,10 @@ public class TestLeafQueue { TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, priority, recordFactory))); - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(2*GB, a.getUsedResources().getMemorySize()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1165,6 +1257,12 @@ public class TestLeafQueue { FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); String host_1 = "127.0.0.2"; FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1, app_2.getApplicationAttemptId(), app_2); + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(), + node_0, node_1.getNodeID(), node_1); final int numNodes = 2; Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1); @@ -1194,8 +1292,10 @@ public class TestLeafQueue { 1, a.getActiveUsersManager().getNumActiveUsers()); // 1 container to user_0 - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(2*GB, a.getUsedResources().getMemorySize()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1206,8 +1306,10 @@ public class TestLeafQueue { // the application is not yet active // Again one to user_0 since he hasn't exceeded user limit yet - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(3*GB, a.getUsedResources().getMemorySize()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1223,8 +1325,10 @@ public class TestLeafQueue { // No more to user_0 since he is already over user-limit // and no more containers to queue since it's already at max-cap - a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(3*GB, a.getUsedResources().getMemorySize()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1237,8 +1341,10 @@ public class TestLeafQueue { TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true, priority, recordFactory))); assertEquals(1, a.getActiveUsersManager().getNumActiveUsers()); - a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(0*GB, app_2.getHeadroom().getMemorySize()); // hit queue max-cap } @@ -1283,10 +1389,17 @@ public class TestLeafQueue { new FiCaSchedulerApp(appAttemptId_3, user_2, a, a.getActiveUsersManager(), spyRMContext); a.submitApplicationAttempt(app_3, user_2); + + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1, app_2.getApplicationAttemptId(), app_2, + app_3.getApplicationAttemptId(), app_3); // Setup some nodes String host_0 = "127.0.0.1"; FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(), + node_0); final int numNodes = 1; Resource clusterResource = @@ -1308,24 +1421,30 @@ public class TestLeafQueue { */ // Only 1 container - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(1*GB, a.getUsedResources().getMemorySize()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(2*GB, a.getUsedResources().getMemorySize()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); // Can't allocate 3rd due to user-limit a.setUserLimit(25); - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(2*GB, a.getUsedResources().getMemorySize()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1343,8 +1462,10 @@ public class TestLeafQueue { // Now allocations should goto app_2 since // user_0 is at limit inspite of high user-limit-factor a.setUserLimitFactor(10); - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(5*GB, a.getUsedResources().getMemorySize()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1353,8 +1474,10 @@ public class TestLeafQueue { // Now allocations should goto app_0 since // user_0 is at user-limit not above it - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(6*GB, a.getUsedResources().getMemorySize()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1364,8 +1487,10 @@ public class TestLeafQueue { // Test max-capacity // Now - no more allocs since we are at max-cap a.setMaxCapacity(0.5f); - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(6*GB, a.getUsedResources().getMemorySize()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1376,8 +1501,10 @@ public class TestLeafQueue { // Now, allocations should goto app_3 since it's under user-limit a.setMaxCapacity(1.0f); a.setUserLimitFactor(1); - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(7*GB, a.getUsedResources().getMemorySize()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1385,8 +1512,10 @@ public class TestLeafQueue { assertEquals(1*GB, app_3.getCurrentConsumption().getMemorySize()); // Now we should assign to app_3 again since user_2 is under user-limit - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(8*GB, a.getUsedResources().getMemorySize()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1466,6 +1595,12 @@ public class TestLeafQueue { // Setup some nodes String host_0 = "127.0.0.1"; FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); + + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(), + node_0); final int numNodes = 2; Resource clusterResource = @@ -1485,8 +1620,10 @@ public class TestLeafQueue { // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(1*GB, a.getUsedResources().getMemorySize()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1496,8 +1633,10 @@ public class TestLeafQueue { // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(2*GB, a.getUsedResources().getMemorySize()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1505,8 +1644,10 @@ public class TestLeafQueue { assertEquals(2*GB, a.getMetrics().getAllocatedMB()); // Now, reservation should kick in for app_1 - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(6*GB, a.getUsedResources().getMemorySize()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1522,8 +1663,10 @@ public class TestLeafQueue { ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(5*GB, a.getUsedResources().getMemorySize()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1539,8 +1682,10 @@ public class TestLeafQueue { ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(4*GB, a.getUsedResources().getMemorySize()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(4*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1587,6 +1732,12 @@ public class TestLeafQueue { when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(), + node_0, node_1.getNodeID(), node_1); final int numNodes = 3; Resource clusterResource = @@ -1613,23 +1764,29 @@ public class TestLeafQueue { // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(1*GB, a.getUsedResources().getMemorySize()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(2*GB, a.getUsedResources().getMemorySize()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); // Now, reservation should kick in for app_1 - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(6*GB, a.getUsedResources().getMemorySize()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1643,8 +1800,10 @@ public class TestLeafQueue { ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(5*GB, a.getUsedResources().getMemorySize()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1653,8 +1812,10 @@ public class TestLeafQueue { assertEquals(1, app_1.getReReservations(toSchedulerKey(priority))); // Re-reserve - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(5*GB, a.getUsedResources().getMemorySize()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1663,8 +1824,10 @@ public class TestLeafQueue { assertEquals(2, app_1.getReReservations(toSchedulerKey(priority))); // Try to schedule on node_1 now, should *move* the reservation - a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(9*GB, a.getUsedResources().getMemorySize()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(4*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1681,8 +1844,10 @@ public class TestLeafQueue { ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals(4*GB, a.getUsedResources().getMemorySize()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(4*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -1735,6 +1900,15 @@ public class TestLeafQueue { String rack_2 = "rack_2"; FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB); + String host_3 = "127.0.0.4"; // on rack_1 + FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB); + + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0); + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(), + node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2, + node_3.getNodeID(), node_3); + final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (8*GB), numNodes * 16); @@ -1767,6 +1941,7 @@ public class TestLeafQueue { // Start with off switch, shouldn't allocate due to delay scheduling assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey)); assertEquals(3, app_0.getTotalRequiredResources(schedulerKey)); @@ -1775,6 +1950,7 @@ public class TestLeafQueue { // Another off switch, shouldn't allocate due to delay scheduling assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); assertEquals(2, app_0.getSchedulingOpportunities(schedulerKey)); assertEquals(3, app_0.getTotalRequiredResources(schedulerKey)); @@ -1783,6 +1959,7 @@ public class TestLeafQueue { // Another off switch, shouldn't allocate due to delay scheduling assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); assertEquals(3, app_0.getSchedulingOpportunities(schedulerKey)); assertEquals(3, app_0.getTotalRequiredResources(schedulerKey)); @@ -1792,6 +1969,7 @@ public class TestLeafQueue { // since missedOpportunities=3 and reqdContainers=3 assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyContainerAllocated(assignment, NodeType.OFF_SWITCH); // should NOT reset assertEquals(4, app_0.getSchedulingOpportunities(schedulerKey)); @@ -1800,6 +1978,7 @@ public class TestLeafQueue { // NODE_LOCAL - node_0 assignment = a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); // should reset assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); @@ -1808,6 +1987,7 @@ public class TestLeafQueue { // NODE_LOCAL - node_1 assignment = a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); // should reset assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); @@ -1828,9 +2008,6 @@ public class TestLeafQueue { app_0.updateResourceRequests(app_0_requests_0); assertEquals(4, app_0.getTotalRequiredResources(schedulerKey)); - String host_3 = "127.0.0.4"; // on rack_1 - FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB); - // Rack-delay doReturn(true).when(a).getRackLocalityFullReset(); doReturn(1).when(a).getNodeLocalityDelay(); @@ -1838,12 +2015,14 @@ public class TestLeafQueue { // Shouldn't assign RACK_LOCAL yet assignment = a.assignContainers(clusterResource, node_3, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey)); assertEquals(4, app_0.getTotalRequiredResources(schedulerKey)); // Should assign RACK_LOCAL now assignment = a.assignContainers(clusterResource, node_3, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyContainerAllocated(assignment, NodeType.RACK_LOCAL); // should reset assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); @@ -1852,6 +2031,7 @@ public class TestLeafQueue { // Shouldn't assign RACK_LOCAL because schedulingOpportunities should have gotten reset. assignment = a.assignContainers(clusterResource, node_3, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey)); assertEquals(3, app_0.getTotalRequiredResources(schedulerKey)); @@ -1861,6 +2041,7 @@ public class TestLeafQueue { // Should assign RACK_LOCAL now assignment = a.assignContainers(clusterResource, node_3, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyContainerAllocated(assignment, NodeType.RACK_LOCAL); // should NOT reset assertEquals(2, app_0.getSchedulingOpportunities(schedulerKey)); @@ -1869,6 +2050,7 @@ public class TestLeafQueue { // Another RACK_LOCAL since schedulingOpportunities not reset assignment = a.assignContainers(clusterResource, node_3, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyContainerAllocated(assignment, NodeType.RACK_LOCAL); // should NOT reset assertEquals(3, app_0.getSchedulingOpportunities(schedulerKey)); @@ -1894,12 +2076,14 @@ public class TestLeafQueue { assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits( clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); assertEquals(i+1, app_0.getSchedulingOpportunities(schedulerKey)); } // delay should be capped at numNodes so next one should allocate assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyContainerAllocated(assignment, NodeType.OFF_SWITCH); assertEquals(numNodes+1, app_0.getSchedulingOpportunities(schedulerKey)); } @@ -1933,6 +2117,11 @@ public class TestLeafQueue { String rack_2 = "rack_2"; FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB); + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0); + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(), + node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2); + final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1); @@ -1981,6 +2170,7 @@ public class TestLeafQueue { // thus, no P2 either! CSAssignment assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey1)); assertEquals(2, app_0.getTotalRequiredResources(schedulerKey1)); @@ -1991,6 +2181,7 @@ public class TestLeafQueue { // thus, no P2 either! assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); assertEquals(2, app_0.getSchedulingOpportunities(schedulerKey1)); assertEquals(2, app_0.getTotalRequiredResources(schedulerKey1)); @@ -2000,6 +2191,7 @@ public class TestLeafQueue { // Another off-switch, shouldn't allocate OFF_SWITCH P1 assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyContainerAllocated(assignment, NodeType.OFF_SWITCH); assertEquals(3, app_0.getSchedulingOpportunities(schedulerKey1)); assertEquals(1, app_0.getTotalRequiredResources(schedulerKey1)); @@ -2009,6 +2201,7 @@ public class TestLeafQueue { // Now, DATA_LOCAL for P1 assignment = a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey1)); assertEquals(0, app_0.getTotalRequiredResources(schedulerKey1)); @@ -2018,6 +2211,7 @@ public class TestLeafQueue { // Now, OFF_SWITCH for P2 assignment = a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyContainerAllocated(assignment, NodeType.OFF_SWITCH); assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey1)); assertEquals(0, app_0.getTotalRequiredResources(schedulerKey1)); @@ -2054,6 +2248,12 @@ public class TestLeafQueue { String host_1_0 = "127.0.0.3"; String rack_1 = "rack_1"; FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB); + + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0); + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0_0.getNodeID(), + node_0_0, node_0_1.getNodeID(), node_0_1, node_1_0.getNodeID(), + node_1_0); final int numNodes = 3; Resource clusterResource = Resources.createResource( @@ -2093,6 +2293,7 @@ public class TestLeafQueue { // NODE_LOCAL - node_0_1 CSAssignment assignment = a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); // should reset @@ -2102,6 +2303,7 @@ public class TestLeafQueue { // required(ANY) == 0 assignment = a.assignContainers(clusterResource, node_1_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); // Still zero // since #req=0 @@ -2119,6 +2321,7 @@ public class TestLeafQueue { // required(rack_1) == 0 assignment = a.assignContainers(clusterResource, node_0_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey)); assertEquals(1, app_0.getTotalRequiredResources(schedulerKey)); @@ -2126,6 +2329,7 @@ public class TestLeafQueue { // NODE_LOCAL - node_1 assignment = a.assignContainers(clusterResource, node_1_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); // should reset assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); @@ -2319,18 +2523,25 @@ public class TestLeafQueue { mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + // Setup some nodes and racks String host_0_0 = "127.0.0.1"; String rack_0 = "rack_0"; String host_0_1 = "127.0.0.2"; FiCaSchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB); - - + String host_1_0 = "127.0.0.3"; String rack_1 = "rack_1"; FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB); String host_1_1 = "127.0.0.4"; FiCaSchedulerNode node_1_1 = TestUtils.getMockNode(host_1_1, rack_1, 0, 8*GB); + + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0_1.getNodeID(), + node_0_1, node_1_0.getNodeID(), node_1_0, node_1_1.getNodeID(), + node_1_1); final int numNodes = 4; Resource clusterResource = Resources.createResource( @@ -2380,6 +2591,7 @@ public class TestLeafQueue { CSAssignment assignment = a.assignContainers(clusterResource, node_0_1, new ResourceLimits( clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); // should be 0 assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); @@ -2403,6 +2615,7 @@ public class TestLeafQueue { // Shouldn't allocate since RR(rack_1) = relax: false assignment = a.assignContainers(clusterResource, node_1_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); // should be 0 assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); @@ -2434,6 +2647,7 @@ public class TestLeafQueue { // Shouldn't allocate since node_1_1 is blacklisted assignment = a.assignContainers(clusterResource, node_1_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); // should be 0 assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); @@ -2461,8 +2675,10 @@ public class TestLeafQueue { // node_1_1 // Shouldn't allocate since rack_1 is blacklisted - assignment = a.assignContainers(clusterResource, node_1_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + assignment = a.assignContainers(clusterResource, node_1_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); // should be 0 assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); @@ -2490,6 +2706,7 @@ public class TestLeafQueue { // Now, should allocate since RR(rack_1) = relax: true assignment = a.assignContainers(clusterResource, node_1_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); assertEquals(1, app_0.getTotalRequiredResources(schedulerKey)); @@ -2520,6 +2737,7 @@ public class TestLeafQueue { assignment = a.assignContainers(clusterResource, node_1_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); assertEquals(0, app_0.getTotalRequiredResources(schedulerKey)); @@ -2590,6 +2808,12 @@ public class TestLeafQueue { FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8 * GB); + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(), + node_0); + final int numNodes = 1; Resource clusterResource = Resources.createResource(numNodes * (8 * GB), numNodes * 16); @@ -2603,91 +2827,111 @@ public class TestLeafQueue { recordFactory))); try { - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); } catch (NullPointerException e) { Assert.fail("NPE when allocating container on node but " + "forget to set off-switch request should be handled"); } } - - @Test + + @Test public void testFifoAssignment() throws Exception { - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); - + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + a.setOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>()); String host_0_0 = "127.0.0.1"; String rack_0 = "rack_0"; - FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 16*GB); - + FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, + 16 * GB); + final int numNodes = 4; - Resource clusterResource = Resources.createResource( - numNodes * (16*GB), numNodes * 16); + Resource clusterResource = Resources.createResource(numNodes * (16 * GB), + numNodes * 16); when(csContext.getNumClusterNodes()).thenReturn(numNodes); String user_0 = "user_0"; - - final ApplicationAttemptId appAttemptId_0 = - TestUtils.getMockApplicationAttemptId(0, 0); - FiCaSchedulerApp app_0 = - spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), spyRMContext, - Priority.newInstance(3), false)); + + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3), + false)); a.submitApplicationAttempt(app_0, user_0); - - final ApplicationAttemptId appAttemptId_1 = - TestUtils.getMockApplicationAttemptId(1, 0); - FiCaSchedulerApp app_1 = - spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), spyRMContext, - Priority.newInstance(5), false)); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5), + false)); a.submitApplicationAttempt(app_1, user_0); - + + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0_0.getNodeID(), + node_0_0); + Priority priority = TestUtils.createMockPriority(1); List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>(); List<ResourceRequest> app_1_requests_0 = new ArrayList<ResourceRequest>(); - + app_0_requests_0.clear(); - app_0_requests_0.add( - TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, - true, priority, recordFactory)); + app_0_requests_0.add(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, priority, + recordFactory)); app_0.updateResourceRequests(app_0_requests_0); - + app_1_requests_0.clear(); - app_1_requests_0.add( - TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, - true, priority, recordFactory)); + app_1_requests_0.add(TestUtils + .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority, + recordFactory)); app_1.updateResourceRequests(app_1_requests_0); // app_1 will get containers as it has high priority - a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); - a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + Assert.assertEquals(1 * GB, app_1.getCurrentConsumption().getMemorySize()); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + Assert.assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); app_0_requests_0.clear(); - app_0_requests_0.add( - TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, - true, priority, recordFactory)); + app_0_requests_0.add(TestUtils + .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority, + recordFactory)); app_0.updateResourceRequests(app_0_requests_0); app_1_requests_0.clear(); - app_1_requests_0.add( - TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, - true, priority, recordFactory)); + app_1_requests_0.add(TestUtils + .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority, + recordFactory)); app_1.updateResourceRequests(app_1_requests_0); //app_1 will still get assigned first as priority is more. - a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize()); - Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + Assert.assertEquals(2 * GB, app_1.getCurrentConsumption().getMemorySize()); + Assert.assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); //and only then will app_2 - a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize()); -} + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + Assert.assertEquals(3 * GB, app_0.getCurrentConsumption().getMemorySize()); + } + @Test public void testConcurrentAccess() throws Exception { YarnConfiguration conf = new YarnConfiguration(); @@ -2792,6 +3036,12 @@ public class TestLeafQueue { mock(ActiveUsersManager.class), spyRMContext)); a.submitApplicationAttempt(app_1, user_0); + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0_0.getNodeID(), + node_0_0); + Priority priority = TestUtils.createMockPriority(1); List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>(); List<ResourceRequest> app_1_requests_0 = new ArrayList<ResourceRequest>(); @@ -2809,9 +3059,15 @@ public class TestLeafQueue { app_1.updateResourceRequests(app_1_requests_0); // app_0 will get containers as its submitted first. - a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); - a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); app_0_requests_0.clear(); @@ -2828,12 +3084,18 @@ public class TestLeafQueue { //Since it already has more resources, app_0 will not get //assigned first, but app_1 will - a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize()); //and only then will app_0 - a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize()); } @@ -2874,6 +3136,12 @@ public class TestLeafQueue { String rack_2 = "rack_2"; FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB); + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(), + node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2); + final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (8*GB), numNodes * 16); @@ -2922,6 +3190,7 @@ public class TestLeafQueue { // Check app_0's scheduling opportunities increased and app_1 get allocated assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey)); assertEquals(3, app_0.getTotalRequiredResources(schedulerKey)); @@ -2965,6 +3234,12 @@ public class TestLeafQueue { FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 100*GB); + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(), + node_0); + final int numNodes = 1; Resource clusterResource = Resources.createResource(numNodes * (100*GB), numNodes * 128); @@ -2983,9 +3258,10 @@ public class TestLeafQueue { // Start testing... // Assign 1st Container of 1GB - e.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps); // With queue capacity set at 1% of 100GB and user-limit-factor set to 1.0, // all users (only user_0) queue 'e' should be able to consume 1GB. // The first container should be assigned to app_0 with no headroom left @@ -2996,9 +3272,10 @@ public class TestLeafQueue { clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize()); // Assign 2nd container of 1GB - e.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps); // user_0 has no headroom due to user-limit-factor of 1.0. However capacity // scheduler will assign one container more than user-limit-factor. // This container also went to app_0. Still with no neadroom even though @@ -3009,9 +3286,10 @@ public class TestLeafQueue { clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize()); // Can't allocate 3rd container due to user-limit. Headroom still 0. - e.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps); assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( @@ -3025,9 +3303,10 @@ public class TestLeafQueue { assertEquals(3*GB, e.getTotalPendingResourcesConsideringUserLimit( clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize()); - e.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps); // app_0 is now satisified, app_1 is still asking for 2GB. assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -3035,12 +3314,14 @@ public class TestLeafQueue { clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize()); // Get the last 2 containers for app_1, no more pending requests. - e.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - e.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps); + applyCSAssignment(clusterResource, + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps); assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( @@ -3112,10 +3393,17 @@ public class TestLeafQueue { mock(ActiveUsersManager.class), spyRMContext); e.submitApplicationAttempt(app_3, user_1); + Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1, app_2.getApplicationAttemptId(), app_2, + app_3.getApplicationAttemptId(), app_3); + // Setup 1 node with 100GB of memory resources. String host_0 = "127.0.0.1"; FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 100*GB); + Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(), + node_0); final int numNodes = 1; Resource clusterResource = @@ -3156,9 +3444,10 @@ public class TestLeafQueue { assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize()); // Assign 1st Container of 1GB - e.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps); // The first container was assigned to user_0's app_0. Queues total headroom // has 1GB left for user_1. assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit( @@ -3171,9 +3460,10 @@ public class TestLeafQueue { assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize()); // Assign 2nd container of 1GB - e.assignContainers(clusterResource, node_0, + applyCSAssignment(clusterResource, + e.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps); // user_0 has no headroom due to user-limit-factor of 1.0. However capacity // scheduler will assign one container more than user-limit-factor. So, // this container went to user_0's app_1. so, headroom for queue 'e'e is @@ -3188,9 +3478,10 @@ public class TestLeafQueue { assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize()); // Assign 3rd container. - e.assignContainers(clusterResource, node_0, + applyCSAssignment(clusterResource, + e.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps); // Container was allocated to user_1's app_2 since user_1, Now, no headroom // is left. assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( @@ -3203,9 +3494,10 @@ public class TestLeafQueue { assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize()); // Assign 4th container. - e.assignContainers(clusterResource, node_0, + applyCSAssignment(clusterResource, + e.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps); // Allocated to user_1's app_2 since scheduler allocates 1 container // above user resource limit. Available headroom still 0. assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( @@ -3222,9 +3514,10 @@ public class TestLeafQueue { assertEquals(0*GB, app_3_consumption); // Attempt to assign 5th container. Will be a no-op. - e.assignContainers(clusterResource, node_0, + applyCSAssignment(clusterResource, + e.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps); // Cannot allocate 5th container because both users are above their allowed // user resource limit. Values should be the same as previously. assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( @@ -3241,9 +3534,10 @@ public class TestLeafQueue { // factor is no longer the limiting factor. e.setUserLimitFactor(10.0f); - e.assignContainers(clusterResource, node_0, + applyCSAssignment(clusterResource, + e.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps); // Next container goes to user_0's app_1, since it still wanted 1GB. assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit( clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize()); @@ -3254,9 +3548,10 @@ public class TestLeafQueue { assertEquals(2*GB, app_2.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize()); - e.assignContainers(clusterResource, node_0, + applyCSAssignment(clusterResource, + e.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps); // Last container goes to user_1's app_3, since it still wanted 1GB. // user_0's apps: assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
--------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
