http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index 73d9e5c..641ef64 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -34,11 +34,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -64,8 +68,11 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistrib import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; @@ -75,13 +82,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; /** @@ -91,8 +102,10 @@ public class TestOpportunisticContainerAllocatorAMService { private static final int GB = 1024; - @Test(timeout = 60000) - public void testNodeRemovalDuringAllocate() throws Exception { + private MockRM rm; + + @Before + public void createAndStartRM() { CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); YarnConfiguration conf = new YarnConfiguration(csConf); @@ -102,8 +115,445 @@ public class TestOpportunisticContainerAllocatorAMService { YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); conf.setInt( YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100); - MockRM rm = new MockRM(conf); + rm = new MockRM(conf); rm.start(); + } + + @After + public void stopRM() { + if (rm != null) { + rm.stop(); + } + } + + @Test(timeout = 600000) + public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception { + HashMap<NodeId, MockNM> nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h1:4321", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + MockNM nm3 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm3.getNodeId(), nm3); + MockNM nm4 = new MockNM("h2:4321", 4096, rm.getResourceTrackerService()); + nodes.put(nm4.getNodeId(), nm4); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + ApplicationAttemptId attemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + ResourceScheduler scheduler = rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId()); + RMNode rmNode4 = rm.getRMContext().getRMNodes().get(nm4.getNodeId()); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + nm4.nodeHeartbeat(true); + + ((RMNodeImpl) rmNode1) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + ((RMNodeImpl) rmNode2) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + ((RMNodeImpl) rmNode3) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + ((RMNodeImpl) rmNode4) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + + OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) + .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); + // Send add and update node events to AM Service. + amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode3)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode4)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode3)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode4)); + // All nodes 1 - 4 will be applicable for scheduling. + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + nm4.nodeHeartbeat(true); + + Thread.sleep(1000); + + QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue() + .getMetrics(); + + // Verify Metrics + verifyMetrics(metrics, 15360, 15, 1024, 1, 1); + + AllocateResponse allocateResponse = am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))), + null); + List<Container> allocatedContainers = allocateResponse + .getAllocatedContainers(); + Assert.assertEquals(2, allocatedContainers.size()); + Container container = allocatedContainers.get(0); + MockNM allocNode = nodes.get(container.getNodeId()); + MockNM sameHostDiffNode = null; + for (NodeId n : nodes.keySet()) { + if (n.getHost().equals(allocNode.getNodeId().getHost()) && + n.getPort() != allocNode.getNodeId().getPort()) { + sameHostDiffNode = nodes.get(n); + } + } + + // Verify Metrics After OPP allocation (Nothing should change) + verifyMetrics(metrics, 15360, 15, 1024, 1, 1); + + am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(0, + container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED))); + // Node on same host should not result in allocation + sameHostDiffNode.nodeHeartbeat(true); + Thread.sleep(200); + allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); + Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); + + // Verify Metrics After OPP allocation (Nothing should change again) + verifyMetrics(metrics, 15360, 15, 1024, 1, 1); + + // Send Promotion req again... this should result in update error + allocateResponse = am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(0, + container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED))); + Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); + Assert.assertEquals(1, allocateResponse.getUpdateErrors().size()); + Assert.assertEquals("UPDATE_OUTSTANDING_ERROR", + allocateResponse.getUpdateErrors().get(0).getReason()); + Assert.assertEquals(container.getId(), + allocateResponse.getUpdateErrors().get(0) + .getUpdateContainerRequest().getContainerId()); + + // Send Promotion req again with incorrect version... + // this should also result in update error + allocateResponse = am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(1, + container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED))); + + Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); + Assert.assertEquals(1, allocateResponse.getUpdateErrors().size()); + Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR|1|0", + allocateResponse.getUpdateErrors().get(0).getReason()); + Assert.assertEquals(container.getId(), + allocateResponse.getUpdateErrors().get(0) + .getUpdateContainerRequest().getContainerId()); + + // Ensure after correct node heartbeats, we should get the allocation + allocNode.nodeHeartbeat(true); + Thread.sleep(200); + allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); + Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); + Container uc = + allocateResponse.getUpdatedContainers().get(0).getContainer(); + Assert.assertEquals(ExecutionType.GUARANTEED, uc.getExecutionType()); + Assert.assertEquals(uc.getId(), container.getId()); + Assert.assertEquals(uc.getVersion(), container.getVersion() + 1); + + // Verify Metrics After OPP allocation : + // Allocated cores+mem should have increased, available should decrease + verifyMetrics(metrics, 14336, 14, 2048, 2, 2); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + nm4.nodeHeartbeat(true); + Thread.sleep(200); + + // Verify that the container is still in ACQUIRED state wrt the RM. + RMContainer rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt( + uc.getId().getApplicationAttemptId()).getRMContainer(uc.getId()); + Assert.assertEquals(RMContainerState.ACQUIRED, rmContainer.getState()); + + // Now demote the container back.. + allocateResponse = am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(uc.getVersion(), + uc.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE, + null, ExecutionType.OPPORTUNISTIC))); + // This should happen in the same heartbeat.. + Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); + uc = allocateResponse.getUpdatedContainers().get(0).getContainer(); + Assert.assertEquals(ExecutionType.OPPORTUNISTIC, uc.getExecutionType()); + Assert.assertEquals(uc.getId(), container.getId()); + Assert.assertEquals(uc.getVersion(), container.getVersion() + 2); + + // Verify Metrics After OPP allocation : + // Everything should have reverted to what it was + verifyMetrics(metrics, 15360, 15, 1024, 1, 1); + } + + @Test(timeout = 60000) + public void testContainerPromoteAfterContainerStart() throws Exception { + HashMap<NodeId, MockNM> nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + nm1.registerNode(); + nm2.registerNode(); + + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + ApplicationAttemptId attemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + ResourceScheduler scheduler = rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + ((RMNodeImpl) rmNode1) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + ((RMNodeImpl) rmNode2) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + + OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) + .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); + // Send add and update node events to AM Service. + amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // All nodes 1 to 2 will be applicable for scheduling. + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + Thread.sleep(1000); + + QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue() + .getMetrics(); + + // Verify Metrics + verifyMetrics(metrics, 7168, 7, 1024, 1, 1); + + AllocateResponse allocateResponse = am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))), + null); + List<Container> allocatedContainers = allocateResponse + .getAllocatedContainers(); + Assert.assertEquals(2, allocatedContainers.size()); + Container container = allocatedContainers.get(0); + MockNM allocNode = nodes.get(container.getNodeId()); + + // Start Container in NM + allocNode.nodeHeartbeat(Arrays.asList( + ContainerStatus.newInstance(container.getId(), + ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), + true); + Thread.sleep(200); + + // Verify that container is actually running wrt the RM.. + RMContainer rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt( + container.getId().getApplicationAttemptId()).getRMContainer( + container.getId()); + Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + + // Verify Metrics After OPP allocation (Nothing should change) + verifyMetrics(metrics, 7168, 7, 1024, 1, 1); + + am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(0, + container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED))); + + // Verify Metrics After OPP allocation (Nothing should change again) + verifyMetrics(metrics, 7168, 7, 1024, 1, 1); + + // Send Promotion req again... this should result in update error + allocateResponse = am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(0, + container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED))); + Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); + Assert.assertEquals(1, allocateResponse.getUpdateErrors().size()); + Assert.assertEquals("UPDATE_OUTSTANDING_ERROR", + allocateResponse.getUpdateErrors().get(0).getReason()); + Assert.assertEquals(container.getId(), + allocateResponse.getUpdateErrors().get(0) + .getUpdateContainerRequest().getContainerId()); + + // Start Container in NM + allocNode.nodeHeartbeat(Arrays.asList( + ContainerStatus.newInstance(container.getId(), + ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), + true); + Thread.sleep(200); + + allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); + Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); + Container uc = + allocateResponse.getUpdatedContainers().get(0).getContainer(); + Assert.assertEquals(ExecutionType.GUARANTEED, uc.getExecutionType()); + Assert.assertEquals(uc.getId(), container.getId()); + Assert.assertEquals(uc.getVersion(), container.getVersion() + 1); + + // Verify that the Container is still in RUNNING state wrt RM.. + rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt( + uc.getId().getApplicationAttemptId()).getRMContainer(uc.getId()); + Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + + // Verify Metrics After OPP allocation : + // Allocated cores+mem should have increased, available should decrease + verifyMetrics(metrics, 6144, 6, 2048, 2, 2); + } + + @Test(timeout = 600000) + public void testContainerPromoteAfterContainerComplete() throws Exception { + HashMap<NodeId, MockNM> nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + nm1.registerNode(); + nm2.registerNode(); + + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + ApplicationAttemptId attemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + ResourceScheduler scheduler = rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + ((RMNodeImpl) rmNode1) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + ((RMNodeImpl) rmNode2) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + + OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) + .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); + // Send add and update node events to AM Service. + amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // All nodes 1 to 2 will be applicable for scheduling. + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + Thread.sleep(1000); + + QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue() + .getMetrics(); + + // Verify Metrics + verifyMetrics(metrics, 7168, 7, 1024, 1, 1); + + AllocateResponse allocateResponse = am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))), + null); + List<Container> allocatedContainers = allocateResponse + .getAllocatedContainers(); + Assert.assertEquals(2, allocatedContainers.size()); + Container container = allocatedContainers.get(0); + MockNM allocNode = nodes.get(container.getNodeId()); + + // Start Container in NM + allocNode.nodeHeartbeat(Arrays.asList( + ContainerStatus.newInstance(container.getId(), + ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), + true); + Thread.sleep(200); + + // Verify that container is actually running wrt the RM.. + RMContainer rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt( + container.getId().getApplicationAttemptId()).getRMContainer( + container.getId()); + Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + + // Container Completed in the NM + allocNode.nodeHeartbeat(Arrays.asList( + ContainerStatus.newInstance(container.getId(), + ExecutionType.OPPORTUNISTIC, ContainerState.COMPLETE, "", 0)), + true); + Thread.sleep(200); + + // Verify that container has been removed.. + rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt( + container.getId().getApplicationAttemptId()).getRMContainer( + container.getId()); + Assert.assertNull(rmContainer); + + // Verify Metrics After OPP allocation (Nothing should change) + verifyMetrics(metrics, 7168, 7, 1024, 1, 1); + + // Send Promotion req... this should result in update error + // Since the container doesn't exist anymore.. + allocateResponse = am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(0, + container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED))); + + Assert.assertEquals(1, + allocateResponse.getCompletedContainersStatuses().size()); + Assert.assertEquals(container.getId(), + allocateResponse.getCompletedContainersStatuses().get(0) + .getContainerId()); + Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); + Assert.assertEquals(1, allocateResponse.getUpdateErrors().size()); + Assert.assertEquals("INVALID_CONTAINER_ID", + allocateResponse.getUpdateErrors().get(0).getReason()); + Assert.assertEquals(container.getId(), + allocateResponse.getUpdateErrors().get(0) + .getUpdateContainerRequest().getContainerId()); + + // Verify Metrics After OPP allocation (Nothing should change again) + verifyMetrics(metrics, 7168, 7, 1024, 1, 1); + } + + private void verifyMetrics(QueueMetrics metrics, long availableMB, + int availableVirtualCores, long allocatedMB, + int allocatedVirtualCores, int allocatedContainers) { + Assert.assertEquals(availableMB, metrics.getAvailableMB()); + Assert.assertEquals(availableVirtualCores, metrics.getAvailableVirtualCores()); + Assert.assertEquals(allocatedMB, metrics.getAllocatedMB()); + Assert.assertEquals(allocatedVirtualCores, metrics.getAllocatedVirtualCores()); + Assert.assertEquals(allocatedContainers, metrics.getAllocatedContainers()); + } + + @Test(timeout = 60000) + public void testNodeRemovalDuringAllocate() throws Exception { MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); nm1.registerNode();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 497c6d0..786cc50 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -100,6 +100,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -478,7 +479,7 @@ public class TestRMAppAttemptTransitions { assertEquals(expectedState, applicationAttempt.getAppAttemptState()); verify(scheduler, times(expectedAllocateCount)).allocate( any(ApplicationAttemptId.class), any(List.class), any(List.class), - any(List.class), any(List.class), any(List.class), any(List.class)); + any(List.class), any(List.class), any(ContainerUpdates.class)); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); @@ -499,7 +500,7 @@ public class TestRMAppAttemptTransitions { verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class)); verify(scheduler, times(2)).allocate(any(ApplicationAttemptId.class), any(List.class), any(List.class), any(List.class), any(List.class), - any(List.class), any(List.class)); + any(ContainerUpdates.class)); verify(nmTokenManager).clearNodeSetForAttempt( applicationAttempt.getAppAttemptId()); } @@ -526,7 +527,7 @@ public class TestRMAppAttemptTransitions { } /** - * {@link RMAppAttemptState#LAUNCH} + * {@link RMAppAttemptState#LAUNCHED} */ private void testAppAttemptLaunchedState(Container container) { assertEquals(RMAppAttemptState.LAUNCHED, @@ -649,8 +650,8 @@ public class TestRMAppAttemptTransitions { when(allocation.getContainers()). thenReturn(Collections.singletonList(container)); when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class), - any(List.class), any(List.class), any(List.class), any(List.class), - any(List.class))). + any(List.class), any(List.class), any(List.class), + any(ContainerUpdates.class))). thenReturn(allocation); RMContainer rmContainer = mock(RMContainerImpl.class); when(scheduler.getRMContainer(container.getId())). @@ -1129,8 +1130,9 @@ public class TestRMAppAttemptTransitions { when(allocation.getContainers()). thenReturn(Collections.singletonList(amContainer)); when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class), - any(List.class), any(List.class), any(List.class), any(List.class), - any(List.class))).thenReturn(allocation); + any(List.class), any(List.class), any(List.class), + any(ContainerUpdates.class))) + .thenReturn(allocation); RMContainer rmContainer = mock(RMContainerImpl.class); when(scheduler.getRMContainer(amContainer.getId())).thenReturn(rmContainer); @@ -1610,7 +1612,8 @@ public class TestRMAppAttemptTransitions { YarnScheduler mockScheduler = mock(YarnScheduler.class); when(mockScheduler.allocate(any(ApplicationAttemptId.class), any(List.class), any(List.class), any(List.class), any(List.class), - any(List.class), any(List.class))).thenAnswer(new Answer<Allocation>() { + any(ContainerUpdates.class))) + .thenAnswer(new Answer<Allocation>() { @SuppressWarnings("rawtypes") @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index e737a84..893f802 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Test; @@ -114,7 +115,8 @@ public class TestRMContainerImpl { YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO, true); when(rmContext.getYarnConfiguration()).thenReturn(conf); - RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, + RMContainer rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), appAttemptId, nodeId, "user", rmContext); assertEquals(RMContainerState.NEW, rmContainer.getState()); @@ -216,7 +218,8 @@ public class TestRMContainerImpl { when(rmContext.getYarnConfiguration()).thenReturn(conf); when(rmContext.getRMApps()).thenReturn(appMap); - RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, + RMContainer rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), appAttemptId, nodeId, "user", rmContext); assertEquals(RMContainerState.NEW, rmContainer.getState()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 27860a6..43bdc8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -114,6 +114,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event. ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -160,7 +161,9 @@ import com.google.common.collect.Sets; public class TestCapacityScheduler { private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class); private final int GB = 1024; - + private final static ContainerUpdates NULL_UPDATE_REQUESTS = + new ContainerUpdates(); + private static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; private static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; private static final String A1 = A + ".a1"; @@ -738,12 +741,12 @@ public class TestCapacityScheduler { // Verify the blacklist can be updated independent of requesting containers cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), Collections.<ContainerId>emptyList(), - Collections.singletonList(host), null, null, null); + Collections.singletonList(host), null, NULL_UPDATE_REQUESTS); Assert.assertTrue(cs.getApplicationAttempt(appAttemptId) .isPlaceBlacklisted(host)); cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), Collections.<ContainerId>emptyList(), null, - Collections.singletonList(host), null, null); + Collections.singletonList(host), NULL_UPDATE_REQUESTS); Assert.assertFalse(cs.getApplicationAttempt(appAttemptId) .isPlaceBlacklisted(host)); rm.stop(); @@ -839,7 +842,7 @@ public class TestCapacityScheduler { cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1), Collections.<ContainerId>emptyList(), - null, null, null, null); + null, null, NULL_UPDATE_REQUESTS); //And this will result in container assignment for app1 CapacityScheduler.schedule(cs); @@ -856,7 +859,7 @@ public class TestCapacityScheduler { cs.allocate(appAttemptId2, Collections.<ResourceRequest>singletonList(r2), Collections.<ContainerId>emptyList(), - null, null, null, null); + null, null, NULL_UPDATE_REQUESTS); //In this case we do not perform container assignment because we want to //verify re-ordering based on the allocation alone @@ -2981,7 +2984,8 @@ public class TestCapacityScheduler { Allocation allocate = cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(), - Collections.<ContainerId> emptyList(), null, null, null, null); + Collections.<ContainerId> emptyList(), null, null, + NULL_UPDATE_REQUESTS); Assert.assertNotNull(attempt); @@ -2997,7 +3001,8 @@ public class TestCapacityScheduler { allocate = cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(), - Collections.<ContainerId> emptyList(), null, null, null, null); + Collections.<ContainerId> emptyList(), null, null, + NULL_UPDATE_REQUESTS); // All resources should be sent as headroom Assert.assertEquals(newResource, allocate.getResourceLimit()); @@ -3504,7 +3509,7 @@ public class TestCapacityScheduler { cs.allocate(appAttemptId3, Collections.<ResourceRequest>singletonList(y1Req), Collections.<ContainerId>emptyList(), - null, null, null, null); + null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); } assertEquals("Y1 Used Resource should be 4 GB", 4 * GB, @@ -3518,7 +3523,7 @@ public class TestCapacityScheduler { cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(x1Req), Collections.<ContainerId>emptyList(), - null, null, null, null); + null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); } assertEquals("X1 Used Resource should be 7 GB", 7 * GB, @@ -3531,7 +3536,7 @@ public class TestCapacityScheduler { cs.allocate(appAttemptId2, Collections.<ResourceRequest>singletonList(x2Req), Collections.<ContainerId>emptyList(), - null, null, null, null); + null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); assertEquals("X2 Used Resource should be 0", 0, cs.getQueue("x2").getUsedResources().getMemorySize()); @@ -3543,7 +3548,7 @@ public class TestCapacityScheduler { cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(x1Req), Collections.<ContainerId>emptyList(), - null, null, null, null); + null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); assertEquals("X1 Used Resource should be 7 GB", 7 * GB, cs.getQueue("x1").getUsedResources().getMemorySize()); @@ -3557,7 +3562,7 @@ public class TestCapacityScheduler { cs.allocate(appAttemptId3, Collections.<ResourceRequest>singletonList(y1Req), Collections.<ContainerId>emptyList(), - null, null, null, null); + null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); } assertEquals("P2 Used Resource should be 8 GB", 8 * GB, @@ -3616,7 +3621,7 @@ public class TestCapacityScheduler { //This will allocate for app1 cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1), Collections.<ContainerId>emptyList(), - null, null, null, null).getContainers().size(); + null, null, NULL_UPDATE_REQUESTS).getContainers().size(); CapacityScheduler.schedule(cs); ResourceRequest r2 = null; for (int i =0; i < 13; i++) { @@ -3625,7 +3630,7 @@ public class TestCapacityScheduler { cs.allocate(appAttemptId2, Collections.<ResourceRequest>singletonList(r2), Collections.<ContainerId>emptyList(), - null, null, null, null); + null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); } assertEquals("A Used Resource should be 2 GB", 2 * GB, @@ -3638,11 +3643,11 @@ public class TestCapacityScheduler { ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1), Collections.<ContainerId>emptyList(), - null, null, null, null).getContainers().size(); + null, null, NULL_UPDATE_REQUESTS).getContainers().size(); CapacityScheduler.schedule(cs); cs.allocate(appAttemptId2, Collections.<ResourceRequest>singletonList(r2), - Collections.<ContainerId>emptyList(), null, null, null, null); + Collections.<ContainerId>emptyList(), null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); //Check blocked Resource assertEquals("A Used Resource should be 2 GB", 2 * GB, http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index a6ae0c2..cf91841 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -55,6 +55,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preempti import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; + +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -280,7 +282,8 @@ public class TestChildQueueOrder { ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); Container container=TestUtils.getMockContainer(containerId, node_0.getNodeID(), Resources.createResource(1*GB), priority); - RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, + RMContainer rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), appAttemptId, node_0.getNodeID(), "user", rmContext); // Assign {1,2,3,4} 1GB containers respectively to queues http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index c9eb8b3..f9bf89d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -923,13 +923,15 @@ public class TestReservations { Container container = TestUtils.getMockContainer(containerId, node_1.getNodeID(), Resources.createResource(2*GB), priorityMap.getPriority()); - RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, + RMContainer rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), appAttemptId, node_1.getNodeID(), "user", rmContext); Container container_1 = TestUtils.getMockContainer(containerId, node_0.getNodeID(), Resources.createResource(1*GB), priorityMap.getPriority()); - RMContainer rmContainer_1 = new RMContainerImpl(container_1, appAttemptId, + RMContainer rmContainer_1 = new RMContainerImpl(container_1, + SchedulerRequestKey.extractFrom(container_1), appAttemptId, node_0.getNodeID(), "user", rmContext); // no reserved containers @@ -996,7 +998,8 @@ public class TestReservations { Container container = TestUtils.getMockContainer(containerId, node_1.getNodeID(), Resources.createResource(2*GB), priorityMap.getPriority()); - RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, + RMContainer rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), appAttemptId, node_1.getNodeID(), "user", rmContext); // nothing reserved http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 992b75d..8e8267f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -41,6 +41,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; + + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -71,6 +74,8 @@ public class FairSchedulerTestBase { public static final float TEST_RESERVATION_THRESHOLD = 0.09f; private static final int SLEEP_DURATION = 10; private static final int SLEEP_RETRIES = 1000; + final static ContainerUpdates NULL_UPDATE_REQUESTS = + new ContainerUpdates(); /** * The list of nodes added to the cluster using the {@link #addNode} method. @@ -181,7 +186,8 @@ public class FairSchedulerTestBase { resourceManager.getRMContext().getRMApps() .put(id.getApplicationId(), rmApp); - scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null, null, null); + scheduler.allocate(id, ask, new ArrayList<ContainerId>(), + null, null, NULL_UPDATE_REQUESTS); return id; } @@ -207,7 +213,8 @@ public class FairSchedulerTestBase { resourceManager.getRMContext().getRMApps() .put(id.getApplicationId(), rmApp); - scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null, null, null); + scheduler.allocate(id, ask, new ArrayList<ContainerId>(), + null, null, NULL_UPDATE_REQUESTS); return id; } @@ -229,7 +236,8 @@ public class FairSchedulerTestBase { ResourceRequest request, ApplicationAttemptId attId) { List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); ask.add(request); - scheduler.allocate(attId, ask, new ArrayList<ContainerId>(), null, null, null, null); + scheduler.allocate(attId, ask, new ArrayList<ContainerId>(), + null, null, NULL_UPDATE_REQUESTS); } protected void createApplicationWithAMResource(ApplicationAttemptId attId, http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java index 4cc99b2..8bb06e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java @@ -119,7 +119,8 @@ public class TestContinuousScheduling extends FairSchedulerTestBase { List<ResourceRequest> ask = new ArrayList<>(); ask.add(createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true)); scheduler.allocate( - appAttemptId, ask, new ArrayList<ContainerId>(), null, null, null, null); + appAttemptId, ask, new ArrayList<ContainerId>(), + null, null, NULL_UPDATE_REQUESTS); FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); triggerSchedulingAttempt(); @@ -157,7 +158,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase { createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true); ask.add(request); scheduler.allocate(appAttemptId, ask, - new ArrayList<ContainerId>(), null, null, null, null); + new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); triggerSchedulingAttempt(); FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); @@ -169,7 +170,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase { ask.clear(); ask.add(request); scheduler.allocate(appAttemptId, ask, - new ArrayList<ContainerId>(), null, null, null, null); + new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); triggerSchedulingAttempt(); checkAppConsumption(app, Resources.createResource(2048,2)); @@ -335,7 +336,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase { ask1.add(request1); ask1.add(request2); scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null, null, - null, null); + NULL_UPDATE_REQUESTS); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 5aa1e2d..b1e412b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -93,6 +93,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; + + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; @@ -124,6 +127,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { private final int GB = 1024; private final static String ALLOC_FILE = new File(TEST_DIR, "test-queues").getAbsolutePath(); + private final static ContainerUpdates NULL_UPDATE_REQUESTS = + new ContainerUpdates(); @Before public void setUp() throws IOException { @@ -1257,7 +1262,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { asks.add(createResourceRequest(2048, node2.getRackName(), 1, 1, false)); scheduler.allocate(attemptId, asks, new ArrayList<ContainerId>(), null, - null, null, null); + null, NULL_UPDATE_REQUESTS); ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", "user1", 1); scheduler.update(); @@ -2111,7 +2116,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { ResourceRequest request1 = createResourceRequest(minReqSize * 2, ResourceRequest.ANY, 1, 1, true); ask1.add(request1); - scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null, null, null, null); + scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), + null, null, NULL_UPDATE_REQUESTS); // Second ask, queue2 requests 1 large. List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>(); @@ -2121,7 +2127,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { ResourceRequest.ANY, 1, 1, false); ask2.add(request2); ask2.add(request3); - scheduler.allocate(id21, ask2, new ArrayList<ContainerId>(), null, null, null, null); + scheduler.allocate(id21, ask2, new ArrayList<ContainerId>(), + null, null, NULL_UPDATE_REQUESTS); // Third ask, queue2 requests 2 small (minReqSize). List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>(); @@ -2131,7 +2138,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { ResourceRequest.ANY, 2, 2, true); ask3.add(request4); ask3.add(request5); - scheduler.allocate(id22, ask3, new ArrayList<ContainerId>(), null, null, null, null); + scheduler.allocate(id22, ask3, new ArrayList<ContainerId>(), + null, null, NULL_UPDATE_REQUESTS); scheduler.update(); @@ -2665,7 +2673,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Complete container scheduler.allocate(attId, new ArrayList<ResourceRequest>(), - Arrays.asList(containerId), null, null, null, null); + Arrays.asList(containerId), null, null, NULL_UPDATE_REQUESTS); assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB()); assertEquals(4, scheduler.getRootQueueMetrics().getAvailableVirtualCores()); @@ -2757,7 +2765,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2, true)); scheduler.allocate(attemptId, asks, new ArrayList<ContainerId>(), null, - null, null, null); + null, NULL_UPDATE_REQUESTS); // node 1 checks in scheduler.update(); @@ -3203,7 +3211,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { createResourceRequest(1024, node1.getHostName(), 1, 0, true), createResourceRequest(1024, "rack1", 1, 0, true), createResourceRequest(1024, ResourceRequest.ANY, 1, 1, true)); - scheduler.allocate(attId1, update, new ArrayList<ContainerId>(), null, null, null, null); + scheduler.allocate(attId1, update, new ArrayList<ContainerId>(), + null, null, NULL_UPDATE_REQUESTS); // then node2 should get the container scheduler.handle(node2UpdateEvent); @@ -3250,7 +3259,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { anyRequest = createResourceRequest(1024, ResourceRequest.ANY, 1, 1, false); scheduler.allocate(attId, Arrays.asList(rackRequest, anyRequest), - new ArrayList<ContainerId>(), null, null, null, null); + new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); scheduler.handle(nodeUpdateEvent); assertEquals(0, app.getReservedContainers().size()); @@ -4275,7 +4284,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ask1.add(request1); scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null, - null, null, null); + null, NULL_UPDATE_REQUESTS); String hostName = "127.0.0.1"; RMNode node1 = MockNodes.newNodeInfo(1, @@ -4351,11 +4360,11 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Verify the blacklist can be updated independent of requesting containers scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), Collections.<ContainerId>emptyList(), - Collections.singletonList(host), null, null, null); + Collections.singletonList(host), null, NULL_UPDATE_REQUESTS); assertTrue(app.isPlaceBlacklisted(host)); scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), Collections.<ContainerId>emptyList(), null, - Collections.singletonList(host), null, null); + Collections.singletonList(host), NULL_UPDATE_REQUESTS); assertFalse(scheduler.getSchedulerApp(appAttemptId) .isPlaceBlacklisted(host)); @@ -4365,7 +4374,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Verify a container does not actually get placed on the blacklisted host scheduler.allocate(appAttemptId, update, Collections.<ContainerId>emptyList(), - Collections.singletonList(host), null, null, null); + Collections.singletonList(host), null, NULL_UPDATE_REQUESTS); assertTrue(app.isPlaceBlacklisted(host)); scheduler.update(); scheduler.handle(updateEvent); @@ -4375,7 +4384,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Verify a container gets placed on the empty blacklist scheduler.allocate(appAttemptId, update, Collections.<ContainerId>emptyList(), null, - Collections.singletonList(host), null, null); + Collections.singletonList(host), NULL_UPDATE_REQUESTS); assertFalse(app.isPlaceBlacklisted(host)); createSchedulingRequest(GB, "root.default", "user", 1); scheduler.update(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index bfbc7bd..3f97b59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; @@ -119,7 +120,10 @@ public class TestFifoScheduler { private static Configuration conf; private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - + + private final static ContainerUpdates NULL_UPDATE_REQUESTS = + new ContainerUpdates(); + @Before public void setUp() throws Exception { conf = new Configuration(); @@ -274,7 +278,8 @@ public class TestFifoScheduler { ask.add(nodeLocal); ask.add(rackLocal); ask.add(any); - scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null, null, null); + scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), + null, null, NULL_UPDATE_REQUESTS); NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0); @@ -368,7 +373,8 @@ public class TestFifoScheduler { ask.add(nodeLocal); ask.add(rackLocal); ask.add(any); - scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null, null, null); + scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), + null, null, NULL_UPDATE_REQUESTS); // Before the node update event, there are one local request Assert.assertEquals(1, nodeLocal.getNumContainers()); @@ -944,7 +950,7 @@ public class TestFifoScheduler { ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1, RMNodeLabelsManager.NO_LABEL)); fs.allocate(appAttemptId1, ask1, emptyId, - Collections.singletonList(host_1_0), null, null, null); + Collections.singletonList(host_1_0), null, NULL_UPDATE_REQUESTS); // Trigger container assignment fs.handle(new NodeUpdateSchedulerEvent(n3)); @@ -952,14 +958,16 @@ public class TestFifoScheduler { // Get the allocation for the application and verify no allocation on // blacklist node Allocation allocation1 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null); + fs.allocate(appAttemptId1, emptyAsk, emptyId, + null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("allocation1", 0, allocation1.getContainers().size()); // verify host_1_1 can get allocated as not in blacklist fs.handle(new NodeUpdateSchedulerEvent(n4)); Allocation allocation2 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null); + fs.allocate(appAttemptId1, emptyAsk, emptyId, + null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("allocation2", 1, allocation2.getContainers().size()); List<Container> containerList = allocation2.getContainers(); for (Container container : containerList) { @@ -974,29 +982,33 @@ public class TestFifoScheduler { ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1)); fs.allocate(appAttemptId1, ask2, emptyId, - Collections.singletonList("rack0"), null, null, null); + Collections.singletonList("rack0"), null, NULL_UPDATE_REQUESTS); // verify n1 is not qualified to be allocated fs.handle(new NodeUpdateSchedulerEvent(n1)); Allocation allocation3 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null); + fs.allocate(appAttemptId1, emptyAsk, emptyId, + null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("allocation3", 0, allocation3.getContainers().size()); // verify n2 is not qualified to be allocated fs.handle(new NodeUpdateSchedulerEvent(n2)); Allocation allocation4 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null); + fs.allocate(appAttemptId1, emptyAsk, emptyId, + null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("allocation4", 0, allocation4.getContainers().size()); // verify n3 is not qualified to be allocated fs.handle(new NodeUpdateSchedulerEvent(n3)); Allocation allocation5 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null); + fs.allocate(appAttemptId1, emptyAsk, emptyId, + null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("allocation5", 0, allocation5.getContainers().size()); fs.handle(new NodeUpdateSchedulerEvent(n4)); Allocation allocation6 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null); + fs.allocate(appAttemptId1, emptyAsk, emptyId, + null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("allocation6", 1, allocation6.getContainers().size()); containerList = allocation6.getContainers(); @@ -1055,25 +1067,29 @@ public class TestFifoScheduler { List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>(); ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1)); - fs.allocate(appAttemptId1, ask1, emptyId, null, null, null, null); + fs.allocate(appAttemptId1, ask1, emptyId, + null, null, NULL_UPDATE_REQUESTS); // Ask for a 2 GB container for app 2 List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>(); ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), ResourceRequest.ANY, BuilderUtils.newResource(2 * GB, 1), 1)); - fs.allocate(appAttemptId2, ask2, emptyId, null, null, null, null); + fs.allocate(appAttemptId2, ask2, emptyId, + null, null, NULL_UPDATE_REQUESTS); // Trigger container assignment fs.handle(new NodeUpdateSchedulerEvent(n1)); // Get the allocation for the applications and verify headroom Allocation allocation1 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null); + fs.allocate(appAttemptId1, emptyAsk, emptyId, + null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("Allocation headroom", 1 * GB, allocation1 .getResourceLimit().getMemorySize()); Allocation allocation2 = - fs.allocate(appAttemptId2, emptyAsk, emptyId, null, null, null, null); + fs.allocate(appAttemptId2, emptyAsk, emptyId, + null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("Allocation headroom", 1 * GB, allocation2 .getResourceLimit().getMemorySize()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
