Repository: hadoop Updated Branches: refs/heads/branch-3.1 dd8479e80 -> 52af95fdc
Revert "YARN-8233. NPE in CapacityScheduler#tryCommit when handling allocate/reserve proposal whose allocatedOrReservedContainer is null. Contributed by Tao Yang." This reverts commit dd8479e80d3f0fe87a6edb099e7f617bff42106a. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52af95fd Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52af95fd Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52af95fd Branch: refs/heads/branch-3.1 Commit: 52af95fdcead53c4e419d62cf24b3e90e3911ab7 Parents: dd8479e Author: Akira Ajisaka <[email protected]> Authored: Wed Nov 7 11:33:31 2018 +0900 Committer: Akira Ajisaka <[email protected]> Committed: Wed Nov 7 11:33:31 2018 +0900 ---------------------------------------------------------------------- .../scheduler/capacity/CapacityScheduler.java | 86 +++++++------------- .../TestCapacitySchedulerAsyncScheduling.java | 83 ------------------- 2 files changed, 28 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/52af95fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index f8ad730..e0f99bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2627,11 +2627,7 @@ public class CapacityScheduler extends .getContainersToKill().isEmpty()) { list = new ArrayList<>(); for (RMContainer rmContainer : csAssignment.getContainersToKill()) { - SchedulerContainer schedulerContainer = - getSchedulerContainer(rmContainer, false); - if (schedulerContainer != null) { - list.add(schedulerContainer); - } + list.add(getSchedulerContainer(rmContainer, false)); } } @@ -2639,16 +2635,10 @@ public class CapacityScheduler extends if (null == list) { list = new ArrayList<>(); } - SchedulerContainer schedulerContainer = - getSchedulerContainer(csAssignment.getExcessReservation(), false); - if (schedulerContainer != null) { - list.add(schedulerContainer); - } + list.add( + getSchedulerContainer(csAssignment.getExcessReservation(), false)); } - if (list != null && list.isEmpty()) { - list = null; - } return list; } @@ -2733,15 +2723,11 @@ public class CapacityScheduler extends ((RMContainerImpl)rmContainer).setAllocationTags( new HashSet<>(schedulingRequest.getAllocationTags())); - SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> - schedulerContainer = getSchedulerContainer(rmContainer, true); - if (schedulerContainer == null) { - allocated = null; - } else { - allocated = new ContainerAllocationProposal<>(schedulerContainer, - null, null, NodeType.NODE_LOCAL, NodeType.NODE_LOCAL, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, resource); - } + allocated = new ContainerAllocationProposal<>( + getSchedulerContainer(rmContainer, true), + null, null, NodeType.NODE_LOCAL, NodeType.NODE_LOCAL, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, + resource); } if (null != allocated) { @@ -2771,27 +2757,16 @@ public class CapacityScheduler extends csAssignment.getAssignmentInformation().getAllocationDetails(); if (!allocations.isEmpty()) { RMContainer rmContainer = allocations.get(0).rmContainer; - SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> - schedulerContainer = getSchedulerContainer(rmContainer, true); - if (schedulerContainer == null) { - allocated = null; - // Decrease unconfirmed resource if app is alive - FiCaSchedulerApp app = getApplicationAttempt( - rmContainer.getApplicationAttemptId()); - if (app != null) { - app.decUnconfirmedRes(rmContainer.getAllocatedResource()); - } - } else { - allocated = new ContainerAllocationProposal<>(schedulerContainer, - getSchedulerContainersToRelease(csAssignment), - getSchedulerContainer( - csAssignment.getFulfilledReservedContainer(), false), - csAssignment.getType(), csAssignment.getRequestLocalityType(), - csAssignment.getSchedulingMode() != null ? - csAssignment.getSchedulingMode() : - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, - csAssignment.getResource()); - } + allocated = new ContainerAllocationProposal<>( + getSchedulerContainer(rmContainer, true), + getSchedulerContainersToRelease(csAssignment), + getSchedulerContainer(csAssignment.getFulfilledReservedContainer(), + false), csAssignment.getType(), + csAssignment.getRequestLocalityType(), + csAssignment.getSchedulingMode() != null ? + csAssignment.getSchedulingMode() : + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, + csAssignment.getResource()); } // Reserved something @@ -2799,21 +2774,16 @@ public class CapacityScheduler extends csAssignment.getAssignmentInformation().getReservationDetails(); if (!reservation.isEmpty()) { RMContainer rmContainer = reservation.get(0).rmContainer; - SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> - schedulerContainer = getSchedulerContainer(rmContainer, false); - if (schedulerContainer == null) { - reserved = null; - } else { - reserved = new ContainerAllocationProposal<>(schedulerContainer, - getSchedulerContainersToRelease(csAssignment), - getSchedulerContainer( - csAssignment.getFulfilledReservedContainer(), false), - csAssignment.getType(), csAssignment.getRequestLocalityType(), - csAssignment.getSchedulingMode() != null ? - csAssignment.getSchedulingMode() : - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, - csAssignment.getResource()); - } + reserved = new ContainerAllocationProposal<>( + getSchedulerContainer(rmContainer, false), + getSchedulerContainersToRelease(csAssignment), + getSchedulerContainer(csAssignment.getFulfilledReservedContainer(), + false), csAssignment.getType(), + csAssignment.getRequestLocalityType(), + csAssignment.getSchedulingMode() != null ? + csAssignment.getSchedulingMode() : + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, + csAssignment.getResource()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/52af95fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 67c504d..840d30d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -56,11 +56,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerA import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; @@ -846,86 +843,6 @@ public class TestCapacitySchedulerAsyncScheduling { return new ResourceCommitRequest(allocateProposals, null, null); } - @Test(timeout = 30000) - public void testReturnNullWhenGetSchedulerContainer() throws Exception { - // disable async-scheduling for simulating complex scenario - Configuration disableAsyncConf = new Configuration(conf); - disableAsyncConf.setBoolean( - CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false); - - // init RM & NMs - final MockRM rm = new MockRM(disableAsyncConf); - rm.start(); - final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 8 * GB); - final MockNM nm2 = rm.registerNode("192.168.0.2:2234", 8 * GB); - rm.drainEvents(); - CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler(); - SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId()); - RMNode rmNode1 = cs.getNode(nm1.getNodeId()).getRMNode(); - SchedulerNode sn2 = cs.getSchedulerNode(nm2.getNodeId()); - - // launch app1-am on nm1 - RMApp app1 = rm.submitApp(1 * GB, "app1", "user", null, false, "default", - YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); - - // app2 asks 1 * 1G container - am1.allocate(ImmutableList.of(ResourceRequest - .newInstance(Priority.newInstance(0), "*", - Resources.createResource(1 * GB), 1)), null); - RMContainer amContainer = cs.getRMContainer( - ContainerId.newContainerId(am1.getApplicationAttemptId(), 1)); - - // spy CapacityScheduler - final CapacityScheduler spyCs = Mockito.spy(cs); - // hook CapacityScheduler#submitResourceCommitRequest - List<CSAssignment> assignmentSnapshots = new ArrayList<>(); - Mockito.doAnswer(new Answer<Object>() { - public Boolean answer(InvocationOnMock invocation) throws Exception { - CSAssignment assignment = (CSAssignment) invocation.getArguments()[1]; - if (cs.getNode(nm1.getNodeId()) != null) { - // decommission nm1 for first allocation on nm1 - cs.getRMContext().getDispatcher().getEventHandler().handle( - new RMNodeEvent(nm1.getNodeId(), RMNodeEventType.DECOMMISSION)); - rm.drainEvents(); - Assert.assertEquals(NodeState.DECOMMISSIONED, rmNode1.getState()); - Assert.assertNull(cs.getNode(nm1.getNodeId())); - assignmentSnapshots.add(assignment); - } else { - // add am container on nm1 to containersToKill - // for second allocation on nm2 - assignment.setContainersToKill(ImmutableList.of(amContainer)); - } - // check no NPE in actual submit, before YARN-8233 will throw NPE - cs.submitResourceCommitRequest((Resource) invocation.getArguments()[0], - assignment); - return false; - } - }).when(spyCs).submitResourceCommitRequest(Mockito.any(Resource.class), - Mockito.any(CSAssignment.class)); - - // allocation on nm1, test return null when get scheduler container - CandidateNodeSet<FiCaSchedulerNode> candidateNodeSet = - new SimpleCandidateNodeSet(sn1); - spyCs.allocateContainersToNode(candidateNodeSet, false); - // make sure unconfirmed resource is decreased correctly - Assert.assertTrue(spyCs.getApplicationAttempt(am1.getApplicationAttemptId()) - .hasPendingResourceRequest(RMNodeLabelsManager.NO_LABEL, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); - - // allocation on nm2, - // test return null when get scheduler container to release - candidateNodeSet = - new SimpleCandidateNodeSet(sn2); - spyCs.allocateContainersToNode(candidateNodeSet, false); - // make sure unconfirmed resource is decreased correctly - Assert.assertTrue(spyCs.getApplicationAttempt(am1.getApplicationAttemptId()) - .hasPendingResourceRequest(RMNodeLabelsManager.NO_LABEL, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); - - rm.stop(); - } - private void keepNMHeartbeat(List<MockNM> mockNMs, int interval) { if (nmHeartbeatThread != null) { nmHeartbeatThread.setShouldStop(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
