YARN-4740. AM may not receive the container complete msg when it restarts. 
Contributed by Jun Gong


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9cb0c963
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9cb0c963
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9cb0c963

Branch: refs/heads/HDFS-1312
Commit: 9cb0c963d21bcbefc56716d332a3bbdf090417c0
Parents: ce5b481
Author: Jian He <jia...@apache.org>
Authored: Fri Apr 8 11:19:36 2016 -0700
Committer: Jian He <jia...@apache.org>
Committed: Fri Apr 8 11:20:35 2016 -0700

----------------------------------------------------------------------
 .../rmapp/attempt/RMAppAttemptImpl.java         |  24 ++++-
 .../applicationsmanager/TestAMRestart.java      | 107 +++++++++++++++++++
 2 files changed, 126 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9cb0c963/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 8a9a55e..1e2a293 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -927,6 +927,20 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
     this.justFinishedContainers = attempt.getJustFinishedContainersReference();
     this.finishedContainersSentToAM =
         attempt.getFinishedContainersSentToAMReference();
+    // container complete msg was moved from justFinishedContainers to
+    // finishedContainersSentToAM in ApplicationMasterService#allocate,
+    // if am crashed and not received this response, we should resend
+    // this msg again after am restart
+    if (!this.finishedContainersSentToAM.isEmpty()) {
+      for (NodeId nodeId : this.finishedContainersSentToAM.keySet()) {
+        List<ContainerStatus> containerStatuses =
+            this.finishedContainersSentToAM.get(nodeId);
+        this.justFinishedContainers.putIfAbsent(nodeId,
+            new ArrayList<ContainerStatus>());
+        this.justFinishedContainers.get(nodeId).addAll(containerStatuses);
+      }
+      this.finishedContainersSentToAM.clear();
+    }
   }
 
   private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
@@ -1845,13 +1859,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
     } else {
       LOG.warn("No ContainerStatus in containerFinishedEvent");
     }
-    finishedContainersSentToAM.putIfAbsent(nodeId,
-      new ArrayList<ContainerStatus>());
-    appAttempt.finishedContainersSentToAM.get(nodeId).add(
-      containerFinishedEvent.getContainerStatus());
 
     if (!appAttempt.getSubmissionContext()
-      .getKeepContainersAcrossApplicationAttempts()) {
+        .getKeepContainersAcrossApplicationAttempts()) {
+      finishedContainersSentToAM.putIfAbsent(nodeId,
+          new ArrayList<ContainerStatus>());
+      appAttempt.finishedContainersSentToAM.get(nodeId).add(
+          containerFinishedEvent.getContainerStatus());
       appAttempt.sendFinishedContainersToNM();
     } else {
       appAttempt.sendFinishedAMContainerToNM(nodeId,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9cb0c963/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index 16f3f60..6cfd868 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -907,4 +907,111 @@ public class TestAMRestart {
     rm1.stop();
     rm2.stop();
   }
+
+  private boolean isContainerIdInContainerStatus(
+      List<ContainerStatus> containerStatuses, ContainerId containerId) {
+    for (ContainerStatus status : containerStatuses) {
+      if (status.getContainerId().equals(containerId)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Test(timeout = 30000)
+  public void testAMRestartNotLostContainerCompleteMsg() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    RMApp app1 =
+        rm1.submitApp(200, "name", "user",
+            new HashMap<ApplicationAccessType, String>(), false, "default", -1,
+            null, "MAPREDUCE", false, true);
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    allocateContainers(nm1, am1, 1);
+
+    nm1.nodeHeartbeat(
+        am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
+    ContainerId containerId2 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
+
+    // container complete
+    nm1.nodeHeartbeat(
+        am1.getApplicationAttemptId(), 2, ContainerState.COMPLETE);
+    rm1.waitForState(nm1, containerId2, RMContainerState.COMPLETED);
+
+    // make sure allocate() get complete container,
+    // before this msg pass to AM, AM may crash
+    while (true) {
+      AllocateResponse response = am1.allocate(
+          new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
+      List<ContainerStatus> containerStatuses =
+          response.getCompletedContainersStatuses();
+      if (isContainerIdInContainerStatus(
+          containerStatuses, containerId2) == false) {
+        Thread.sleep(100);
+        continue;
+      }
+
+      // is containerId still in justFinishedContainer?
+      containerStatuses =
+          app1.getCurrentAppAttempt().getJustFinishedContainers();
+      if (isContainerIdInContainerStatus(containerStatuses,
+          containerId2)) {
+        Assert.fail();
+      }
+      break;
+    }
+
+    // fail the AM by sending CONTAINER_FINISHED event without registering.
+    nm1.nodeHeartbeat(
+        am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    am1.waitForState(RMAppAttemptState.FAILED);
+
+    // wait for app to start a new attempt.
+    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+    // assert this is a new AM.
+    ApplicationAttemptId newAttemptId =
+        app1.getCurrentAppAttempt().getAppAttemptId();
+    Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId()));
+
+    // launch the new AM
+    RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
+    nm1.nodeHeartbeat(true);
+    MockAM am2 = rm1.sendAMLaunched(attempt2.getAppAttemptId());
+    am2.registerAppAttempt();
+    rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+
+    // whether new AM could get container complete msg
+    AllocateResponse allocateResponse = am2.allocate(
+        new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
+    List<ContainerStatus> containerStatuses =
+        allocateResponse.getCompletedContainersStatuses();
+    if (isContainerIdInContainerStatus(containerStatuses,
+        containerId2) == false) {
+      Assert.fail();
+    }
+    containerStatuses = attempt2.getJustFinishedContainers();
+    if (isContainerIdInContainerStatus(containerStatuses, containerId2)) {
+      Assert.fail();
+    }
+
+    // the second allocate should not get container complete msg
+    allocateResponse = am2.allocate(
+        new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
+    containerStatuses =
+        allocateResponse.getCompletedContainersStatuses();
+    if (isContainerIdInContainerStatus(containerStatuses, containerId2)) {
+      Assert.fail();
+    }
+
+    rm1.stop();
+  }
 }

Reply via email to