Repository: hadoop Updated Branches: refs/heads/branch-2 0c7caba08 -> e2917180e
YARN-4862. Handle duplicate completed containers in RMNodeImpl. Contributed by Rohith Sharma K S (cherry picked from commit 352cbaa7a54a94bad2bed131d6a250c5b21a7701) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e2917180 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e2917180 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e2917180 Branch: refs/heads/branch-2 Commit: e2917180ee3e0a8a34d64d906b5f7e20fd20480f Parents: 0c7caba Author: Jason Lowe <[email protected]> Authored: Thu Nov 3 14:03:56 2016 +0000 Committer: Jason Lowe <[email protected]> Committed: Thu Nov 3 14:03:56 2016 +0000 ---------------------------------------------------------------------- .../resourcemanager/rmnode/RMNodeImpl.java | 32 ++++++++--- .../scheduler/AbstractYarnScheduler.java | 21 ++++++-- .../resourcemanager/TestRMNodeTransitions.java | 40 ++++++++++++++ .../TestResourceTrackerService.java | 56 ++++++++++++++++++++ 4 files changed, 140 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2917180/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index c668198..9b0198a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -141,6 +141,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { private final Set<ContainerId> launchedContainers = new HashSet<ContainerId>(); + /* track completed container globally */ + private final Set<ContainerId> completedContainers = + new HashSet<ContainerId>(); + /* set of containers that need to be cleaned */ private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>( new ContainerIdComparator()); @@ -578,6 +582,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { response.addContainersToBeRemovedFromNM( new ArrayList<ContainerId>(this.containersToBeRemovedFromNM)); response.addAllContainersToSignal(this.containersToSignal); + this.completedContainers.removeAll(this.containersToBeRemovedFromNM); this.containersToClean.clear(); this.finishedApplications.clear(); this.containersToSignal.clear(); @@ -1287,6 +1292,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { return this.launchedContainers; } + @VisibleForTesting + public Set<ContainerId> getCompletedContainers() { + return this.completedContainers; + } + @Override public Set<String> getNodeLabels() { RMNodeLabelsManager nlm = context.getNodeLabelManager(); @@ -1329,7 +1339,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { // containers. List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>(); - List<ContainerStatus> completedContainers = + List<ContainerStatus> newlyCompletedContainers = new ArrayList<ContainerStatus>(); int numRemoteRunningContainers = 0; for (ContainerStatus remoteContainer : containerStatuses) { @@ -1385,15 +1395,25 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { } // Completed containers should also include the OPPORTUNISTIC containers // so that the AM gets properly notified. - completedContainers.add(remoteContainer); + if (completedContainers.add(containerId)) { + newlyCompletedContainers.add(remoteContainer); + } + } + } + + List<ContainerStatus> lostContainers = + findLostContainers(numRemoteRunningContainers, containerStatuses); + for (ContainerStatus remoteContainer : lostContainers) { + ContainerId containerId = remoteContainer.getContainerId(); + if (completedContainers.add(containerId)) { + newlyCompletedContainers.add(remoteContainer); } } - completedContainers.addAll(findLostContainers( - numRemoteRunningContainers, containerStatuses)); - if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) { + if (newlyLaunchedContainers.size() != 0 + || newlyCompletedContainers.size() != 0) { nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers, - completedContainers)); + newlyCompletedContainers)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2917180/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index df59556..5e6a726 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -909,16 +910,18 @@ public abstract class AbstractYarnScheduler * Process completed container list. * @param completedContainers Extracted list of completed containers * @param releasedResources Reference resource object for completed containers + * @param nodeId NodeId corresponding to the NodeManager * @return The total number of released containers */ protected int updateCompletedContainers(List<ContainerStatus> - completedContainers, Resource releasedResources) { + completedContainers, Resource releasedResources, NodeId nodeId) { int releasedContainers = 0; + List<ContainerId> untrackedContainerIdList = new ArrayList<ContainerId>(); for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); LOG.debug("Container FINISHED: " + containerId); RMContainer container = getRMContainer(containerId); - completedContainer(getRMContainer(containerId), + completedContainer(container, completedContainer, RMContainerEventType.FINISHED); if (container != null) { releasedContainers++; @@ -930,8 +933,19 @@ public abstract class AbstractYarnScheduler if (rrs != null) { Resources.addTo(releasedResources, rrs); } + } else { + // Add containers which are untracked by RM. + untrackedContainerIdList.add(containerId); } } + + // Acknowledge NM to remove RM-untracked-containers from NM context. + if (!untrackedContainerIdList.isEmpty()) { + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId, + untrackedContainerIdList)); + } + return releasedContainers; } @@ -977,7 +991,7 @@ public abstract class AbstractYarnScheduler // Process completed containers Resource releasedResources = Resource.newInstance(0, 0); int releasedContainers = updateCompletedContainers(completedContainers, - releasedResources); + releasedResources, nm.getNodeID()); // If the node is decommissioning, send an update to have the total // resource equal to the used resource, so no available resource to @@ -1004,4 +1018,5 @@ public abstract class AbstractYarnScheduler " availableResource: " + node.getUnallocatedResource()); } } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2917180/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 6038b31..6055afb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; @@ -1065,4 +1066,43 @@ public class TestRMNodeTransitions { Assert.assertTrue("second container not running", node.getLaunchedContainers().contains(cid2)); } + + @Test + public void testForHandlingDuplicatedCompltedContainers() { + // Start the node + node.handle(new RMNodeStartedEvent(null, null, null)); + // Add info to the queue first + node.setNextHeartBeat(false); + + ContainerId completedContainerId1 = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(BuilderUtils.newApplicationId(0, 0), 0), 0); + + RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null); + + ContainerStatus containerStatus1 = mock(ContainerStatus.class); + + doReturn(completedContainerId1).when(containerStatus1).getContainerId(); + doReturn(Collections.singletonList(containerStatus1)).when(statusEvent1) + .getContainers(); + + verify(scheduler, times(1)).handle(any(NodeUpdateSchedulerEvent.class)); + node.handle(statusEvent1); + verify(scheduler, times(1)).handle(any(NodeUpdateSchedulerEvent.class)); + Assert.assertEquals(1, node.getQueueSize()); + Assert.assertEquals(1, node.getCompletedContainers().size()); + + // test for duplicate entries + node.handle(statusEvent1); + Assert.assertEquals(1, node.getQueueSize()); + + // send clean up container event + node.handle(new RMNodeFinishedContainersPulledByAMEvent(node.getNodeID(), + Collections.singletonList(completedContainerId1))); + + NodeHeartbeatResponse hbrsp = + Records.newRecord(NodeHeartbeatResponse.class); + node.updateNodeHeartbeatResponseForCleanup(hbrsp); + Assert.assertEquals(1, hbrsp.getContainersToBeRemovedFromNM().size()); + Assert.assertEquals(0, node.getCompletedContainers().size()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2917180/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 298e673..5d57917 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -19,9 +19,11 @@ package org.apache.hadoop.yarn.server.resourcemanager; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.File; import java.io.FileOutputStream; @@ -30,6 +32,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -1917,4 +1920,57 @@ public class TestResourceTrackerService extends NodeLabelTestBase { DefaultMetricsSystem.shutdown(); } } + + @Test(timeout = 60000) + public void testNodeHeartBeatResponseForUnknownContainerCleanUp() + throws Exception { + Configuration conf = new Configuration(); + rm = new MockRM(conf); + rm.init(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + rm.drainEvents(); + + // send 1st heartbeat + nm1.nodeHeartbeat(true); + + // Create 2 unknown containers tracked by NM + ApplicationId applicationId = BuilderUtils.newApplicationId(1, 1); + ApplicationAttemptId applicationAttemptId = BuilderUtils + .newApplicationAttemptId(applicationId, 1); + ContainerId cid1 = BuilderUtils.newContainerId(applicationAttemptId, 2); + ContainerId cid2 = BuilderUtils.newContainerId(applicationAttemptId, 3); + ArrayList<ContainerStatus> containerStats = + new ArrayList<ContainerStatus>(); + containerStats.add( + ContainerStatus.newInstance(cid1, ContainerState.COMPLETE, "", -1)); + containerStats.add( + ContainerStatus.newInstance(cid2, ContainerState.COMPLETE, "", -1)); + + Map<ApplicationId, List<ContainerStatus>> conts = + new HashMap<ApplicationId, List<ContainerStatus>>(); + conts.put(applicationAttemptId.getApplicationId(), containerStats); + + // add RMApp into context. + RMApp app1 = mock(RMApp.class); + when(app1.getApplicationId()).thenReturn(applicationId); + rm.getRMContext().getRMApps().put(applicationId, app1); + + // Send unknown container status in heartbeat + nm1.nodeHeartbeat(conts, true); + rm.drainEvents(); + + int containersToBeRemovedFromNM = 0; + while (true) { + NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + rm.drainEvents(); + containersToBeRemovedFromNM += + nodeHeartbeat.getContainersToBeRemovedFromNM().size(); + // asserting for 2 since two unknown containers status has been sent + if (containersToBeRemovedFromNM == 2) { + break; + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
