YARN-4740. AM may not receive the container complete msg when it restarts. Contributed by Jun Gong
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9cb0c963 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9cb0c963 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9cb0c963 Branch: refs/heads/HDFS-1312 Commit: 9cb0c963d21bcbefc56716d332a3bbdf090417c0 Parents: ce5b481 Author: Jian He <jia...@apache.org> Authored: Fri Apr 8 11:19:36 2016 -0700 Committer: Jian He <jia...@apache.org> Committed: Fri Apr 8 11:20:35 2016 -0700 ---------------------------------------------------------------------- .../rmapp/attempt/RMAppAttemptImpl.java | 24 ++++- .../applicationsmanager/TestAMRestart.java | 107 +++++++++++++++++++ 2 files changed, 126 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9cb0c963/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 8a9a55e..1e2a293 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -927,6 +927,20 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { this.justFinishedContainers = attempt.getJustFinishedContainersReference(); this.finishedContainersSentToAM = attempt.getFinishedContainersSentToAMReference(); + // container complete msg was moved from justFinishedContainers to + // finishedContainersSentToAM in ApplicationMasterService#allocate, + // if am crashed and not received this response, we should resend + // this msg again after am restart + if (!this.finishedContainersSentToAM.isEmpty()) { + for (NodeId nodeId : this.finishedContainersSentToAM.keySet()) { + List<ContainerStatus> containerStatuses = + this.finishedContainersSentToAM.get(nodeId); + this.justFinishedContainers.putIfAbsent(nodeId, + new ArrayList<ContainerStatus>()); + this.justFinishedContainers.get(nodeId).addAll(containerStatuses); + } + this.finishedContainersSentToAM.clear(); + } } private void recoverAppAttemptCredentials(Credentials appAttemptTokens, @@ -1845,13 +1859,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { } else { LOG.warn("No ContainerStatus in containerFinishedEvent"); } - finishedContainersSentToAM.putIfAbsent(nodeId, - new ArrayList<ContainerStatus>()); - appAttempt.finishedContainersSentToAM.get(nodeId).add( - containerFinishedEvent.getContainerStatus()); if (!appAttempt.getSubmissionContext() - .getKeepContainersAcrossApplicationAttempts()) { + .getKeepContainersAcrossApplicationAttempts()) { + finishedContainersSentToAM.putIfAbsent(nodeId, + new ArrayList<ContainerStatus>()); + appAttempt.finishedContainersSentToAM.get(nodeId).add( + containerFinishedEvent.getContainerStatus()); appAttempt.sendFinishedContainersToNM(); } else { appAttempt.sendFinishedAMContainerToNM(nodeId, http://git-wip-us.apache.org/repos/asf/hadoop/blob/9cb0c963/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 16f3f60..6cfd868 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -907,4 +907,111 @@ public class TestAMRestart { rm1.stop(); rm2.stop(); } + + private boolean isContainerIdInContainerStatus( + List<ContainerStatus> containerStatuses, ContainerId containerId) { + for (ContainerStatus status : containerStatuses) { + if (status.getContainerId().equals(containerId)) { + return true; + } + } + return false; + } + + @Test(timeout = 30000) + public void testAMRestartNotLostContainerCompleteMsg() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + + MockRM rm1 = new MockRM(conf); + rm1.start(); + RMApp app1 = + rm1.submitApp(200, "name", "user", + new HashMap<ApplicationAccessType, String>(), false, "default", -1, + null, "MAPREDUCE", false, true); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService()); + nm1.registerNode(); + + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + allocateContainers(nm1, am1, 1); + + nm1.nodeHeartbeat( + am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + + // container complete + nm1.nodeHeartbeat( + am1.getApplicationAttemptId(), 2, ContainerState.COMPLETE); + rm1.waitForState(nm1, containerId2, RMContainerState.COMPLETED); + + // make sure allocate() get complete container, + // before this msg pass to AM, AM may crash + while (true) { + AllocateResponse response = am1.allocate( + new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()); + List<ContainerStatus> containerStatuses = + response.getCompletedContainersStatuses(); + if (isContainerIdInContainerStatus( + containerStatuses, containerId2) == false) { + Thread.sleep(100); + continue; + } + + // is containerId still in justFinishedContainer? + containerStatuses = + app1.getCurrentAppAttempt().getJustFinishedContainers(); + if (isContainerIdInContainerStatus(containerStatuses, + containerId2)) { + Assert.fail(); + } + break; + } + + // fail the AM by sending CONTAINER_FINISHED event without registering. + nm1.nodeHeartbeat( + am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am1.waitForState(RMAppAttemptState.FAILED); + + // wait for app to start a new attempt. + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + // assert this is a new AM. + ApplicationAttemptId newAttemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId())); + + // launch the new AM + RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); + nm1.nodeHeartbeat(true); + MockAM am2 = rm1.sendAMLaunched(attempt2.getAppAttemptId()); + am2.registerAppAttempt(); + rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + // whether new AM could get container complete msg + AllocateResponse allocateResponse = am2.allocate( + new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()); + List<ContainerStatus> containerStatuses = + allocateResponse.getCompletedContainersStatuses(); + if (isContainerIdInContainerStatus(containerStatuses, + containerId2) == false) { + Assert.fail(); + } + containerStatuses = attempt2.getJustFinishedContainers(); + if (isContainerIdInContainerStatus(containerStatuses, containerId2)) { + Assert.fail(); + } + + // the second allocate should not get container complete msg + allocateResponse = am2.allocate( + new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()); + containerStatuses = + allocateResponse.getCompletedContainersStatuses(); + if (isContainerIdInContainerStatus(containerStatuses, containerId2)) { + Assert.fail(); + } + + rm1.stop(); + } }