YARN-1372. Ensure all completed containers are reported to the AMs across RM 
restart. Contributed by Anubhav Dhoot


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0a641496
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0a641496
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0a641496

Branch: refs/heads/HDFS-6581
Commit: 0a641496c706fc175e7bf66d69ebf71c7d078e84
Parents: 376233c
Author: Jian He <jia...@apache.org>
Authored: Mon Sep 22 10:30:53 2014 -0700
Committer: Jian He <jia...@apache.org>
Committed: Mon Sep 22 10:30:53 2014 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../protocolrecords/NodeHeartbeatResponse.java  |   5 +
 .../impl/pb/NodeHeartbeatResponsePBImpl.java    |  67 ++++++
 .../yarn_server_common_service_protos.proto     |   1 +
 .../yarn/server/nodemanager/NodeManager.java    |   2 +-
 .../nodemanager/NodeStatusUpdaterImpl.java      |  80 ++++---
 .../nodemanager/TestNodeManagerResync.java      |  24 +-
 .../nodemanager/TestNodeStatusUpdater.java      | 234 ++++++++++++++-----
 .../resourcemanager/ResourceTrackerService.java |   7 +-
 .../server/resourcemanager/rmapp/RMAppImpl.java |  12 +-
 .../rmapp/attempt/RMAppAttempt.java             |  23 +-
 .../rmapp/attempt/RMAppAttemptImpl.java         | 133 +++++++++--
 .../RMAppAttemptContainerFinishedEvent.java     |   8 +-
 .../rmcontainer/RMContainerImpl.java            |  13 +-
 .../resourcemanager/rmnode/RMNodeEventType.java |   3 +
 ...RMNodeFinishedContainersPulledByAMEvent.java |  41 ++++
 .../resourcemanager/rmnode/RMNodeImpl.java      |  48 +++-
 .../TestResourceTrackerService.java             |   8 +-
 .../applicationsmanager/TestAMRestart.java      |  24 +-
 .../attempt/TestRMAppAttemptTransitions.java    | 143 ++++++++++--
 .../security/TestAMRMTokens.java                |   2 +-
 21 files changed, 701 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 5c9cafd..7e5d0f9 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -238,6 +238,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2001. Added a time threshold for RM to wait before starting container
     allocations after restart/failover. (Jian He via vinodkv)
 
+    YARN-1372. Ensure all completed containers are reported to the AMs across
+    RM restart. (Anubhav Dhoot via jianhe)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index 38dfa58..9887acc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -30,6 +30,7 @@ public interface NodeHeartbeatResponse {
   NodeAction getNodeAction();
 
   List<ContainerId> getContainersToCleanup();
+  List<ContainerId> getFinishedContainersPulledByAM();
 
   List<ApplicationId> getApplicationsToCleanup();
 
@@ -43,6 +44,10 @@ public interface NodeHeartbeatResponse {
   void setNMTokenMasterKey(MasterKey secretKey);
 
   void addAllContainersToCleanup(List<ContainerId> containers);
+
+  // This tells NM to remove finished containers only after the AM
+  // has actually received it in a previous allocate response
+  void addFinishedContainersPulledByAM(List<ContainerId> containers);
   
   void addAllApplicationsToCleanup(List<ApplicationId> applications);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index 775f95a..e9296f4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -46,6 +46,7 @@ public class NodeHeartbeatResponsePBImpl extends 
ProtoBase<NodeHeartbeatResponse
   boolean viaProto = false;
   
   private List<ContainerId> containersToCleanup = null;
+  private List<ContainerId> finishedContainersPulledByAM = null;
   private List<ApplicationId> applicationsToCleanup = null;
   private MasterKey containerTokenMasterKey = null;
   private MasterKey nmTokenMasterKey = null;
@@ -73,6 +74,9 @@ public class NodeHeartbeatResponsePBImpl extends 
ProtoBase<NodeHeartbeatResponse
     if (this.applicationsToCleanup != null) {
       addApplicationsToCleanupToProto();
     }
+    if (this.finishedContainersPulledByAM != null) {
+      addFinishedContainersPulledByAMToProto();
+    }
     if (this.containerTokenMasterKey != null) {
       builder.setContainerTokenMasterKey(
           convertToProtoFormat(this.containerTokenMasterKey));
@@ -199,6 +203,12 @@ public class NodeHeartbeatResponsePBImpl extends 
ProtoBase<NodeHeartbeatResponse
     return this.containersToCleanup;
   }
 
+  @Override
+  public List<ContainerId> getFinishedContainersPulledByAM() {
+    initFinishedContainersPulledByAM();
+    return this.finishedContainersPulledByAM;
+  }
+
   private void initContainersToCleanup() {
     if (this.containersToCleanup != null) {
       return;
@@ -212,6 +222,19 @@ public class NodeHeartbeatResponsePBImpl extends 
ProtoBase<NodeHeartbeatResponse
     }
   }
 
+  private void initFinishedContainersPulledByAM() {
+    if (this.finishedContainersPulledByAM != null) {
+      return;
+    }
+    NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+    List<ContainerIdProto> list = p.getFinishedContainersPulledByAmList();
+    this.finishedContainersPulledByAM = new ArrayList<ContainerId>();
+
+    for (ContainerIdProto c : list) {
+      this.finishedContainersPulledByAM.add(convertFromProtoFormat(c));
+    }
+  }
+
   @Override
   public void addAllContainersToCleanup(
       final List<ContainerId> containersToCleanup) {
@@ -221,6 +244,15 @@ public class NodeHeartbeatResponsePBImpl extends 
ProtoBase<NodeHeartbeatResponse
     this.containersToCleanup.addAll(containersToCleanup);
   }
 
+  @Override
+  public void addFinishedContainersPulledByAM(
+      final List<ContainerId> finishedContainersPulledByAM) {
+    if (finishedContainersPulledByAM == null)
+      return;
+    initFinishedContainersPulledByAM();
+    this.finishedContainersPulledByAM.addAll(finishedContainersPulledByAM);
+  }
+
   private void addContainersToCleanupToProto() {
     maybeInitBuilder();
     builder.clearContainersToCleanup();
@@ -256,6 +288,41 @@ public class NodeHeartbeatResponsePBImpl extends 
ProtoBase<NodeHeartbeatResponse
     builder.addAllContainersToCleanup(iterable);
   }
 
+  private void addFinishedContainersPulledByAMToProto() {
+    maybeInitBuilder();
+    builder.clearFinishedContainersPulledByAm();
+    if (finishedContainersPulledByAM == null)
+      return;
+    Iterable<ContainerIdProto> iterable = new Iterable<ContainerIdProto>() {
+
+      @Override
+      public Iterator<ContainerIdProto> iterator() {
+        return new Iterator<ContainerIdProto>() {
+
+          Iterator<ContainerId> iter = finishedContainersPulledByAM.iterator();
+
+          @Override
+          public boolean hasNext() {
+            return iter.hasNext();
+          }
+
+          @Override
+          public ContainerIdProto next() {
+            return convertToProtoFormat(iter.next());
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+
+          }
+        };
+
+      }
+    };
+    builder.addAllFinishedContainersPulledByAm(iterable);
+  }
+
   @Override
   public List<ApplicationId> getApplicationsToCleanup() {
     initApplicationsToCleanup();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 29cd64e..600f54d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -58,6 +58,7 @@ message NodeHeartbeatResponseProto {
   repeated ApplicationIdProto applications_to_cleanup = 6;
   optional int64 nextHeartBeatInterval = 7;
   optional string diagnostics_message = 8;
+  repeated ContainerIdProto finished_containers_pulled_by_am = 9;
 }
 
 message NMContainerStatusProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index a479be2..43770c1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -311,7 +311,7 @@ public class NodeManager extends CompositeService
   public static class NMContext implements Context {
 
     private NodeId nodeId = null;
-    private final ConcurrentMap<ApplicationId, Application> applications =
+    protected final ConcurrentMap<ApplicationId, Application> applications =
         new ConcurrentHashMap<ApplicationId, Application>();
     protected final ConcurrentMap<ContainerId, Container> containers =
         new ConcurrentSkipListMap<ContainerId, Container>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index b52b0fb..b9feacb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -104,11 +104,6 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
   // Duration for which to track recently stopped container.
   private long durationToTrackStoppedContainers;
 
-  // This is used to track the current completed containers when nodeheartBeat
-  // is called. These completed containers will be removed from NM context 
after
-  // nodeHeartBeat succeeds and the response from the nodeHeartBeat is
-  // processed.
-  private final Set<ContainerId> previousCompletedContainers;
   private final NodeHealthCheckerService healthChecker;
   private final NodeManagerMetrics metrics;
 
@@ -125,7 +120,6 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
     this.metrics = metrics;
     this.recentlyStoppedContainers =
         new LinkedHashMap<ContainerId, Long>();
-    this.previousCompletedContainers = new HashSet<ContainerId>();
   }
 
   @Override
@@ -331,7 +325,7 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
     return appList;
   }
 
-  private NodeStatus getNodeStatus(int responseId) {
+  private NodeStatus getNodeStatus(int responseId) throws IOException {
 
     NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
     nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
@@ -352,11 +346,18 @@ public class NodeStatusUpdaterImpl extends 
AbstractService implements
 
   // Iterate through the NMContext and clone and get all the containers'
   // statuses. If it's a completed container, add into the
-  // recentlyStoppedContainers and previousCompletedContainers collections.
+  // recentlyStoppedContainers collections.
   @VisibleForTesting
-  protected List<ContainerStatus> getContainerStatuses() {
+  protected List<ContainerStatus> getContainerStatuses() throws IOException {
     List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
     for (Container container : this.context.getContainers().values()) {
+      ContainerId containerId = container.getContainerId();
+      ApplicationId applicationId = container.getContainerId()
+          .getApplicationAttemptId().getApplicationId();
+      if (!this.context.getApplications().containsKey(applicationId)) {
+        context.getContainers().remove(containerId);
+        continue;
+      }
       org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
           container.cloneAndGetContainerStatus();
       containerStatuses.add(containerStatus);
@@ -381,10 +382,17 @@ public class NodeStatusUpdaterImpl extends 
AbstractService implements
   }
 
   // These NMContainerStatus are sent on NM registration and used by YARN only.
-  private List<NMContainerStatus> getNMContainerStatuses() {
+  private List<NMContainerStatus> getNMContainerStatuses() throws IOException {
     List<NMContainerStatus> containerStatuses =
         new ArrayList<NMContainerStatus>();
     for (Container container : this.context.getContainers().values()) {
+      ContainerId containerId = container.getContainerId();
+      ApplicationId applicationId = container.getContainerId()
+          .getApplicationAttemptId().getApplicationId();
+      if (!this.context.getApplications().containsKey(applicationId)) {
+        context.getContainers().remove(containerId);
+        continue;
+      }
       NMContainerStatus status =
           container.getNMContainerStatus();
       containerStatuses.add(status);
@@ -402,26 +410,30 @@ public class NodeStatusUpdaterImpl extends 
AbstractService implements
 
   @Override
   public void addCompletedContainer(ContainerId containerId) {
-    synchronized (previousCompletedContainers) {
-      previousCompletedContainers.add(containerId);
-    }
     synchronized (recentlyStoppedContainers) {
       removeVeryOldStoppedContainersFromCache();
-      recentlyStoppedContainers.put(containerId,
-        System.currentTimeMillis() + durationToTrackStoppedContainers);
+      if (!recentlyStoppedContainers.containsKey(containerId)) {
+        recentlyStoppedContainers.put(containerId,
+            System.currentTimeMillis() + durationToTrackStoppedContainers);
+      }
     }
   }
 
-  private void removeCompletedContainersFromContext() {
-    synchronized (previousCompletedContainers) {
-      if (!previousCompletedContainers.isEmpty()) {
-        for (ContainerId containerId : previousCompletedContainers) {
-          this.context.getContainers().remove(containerId);
-        }
-        LOG.info("Removed completed containers from NM context: "
-            + previousCompletedContainers);
-        previousCompletedContainers.clear();
-      }
+  @VisibleForTesting
+  @Private
+  public void removeCompletedContainersFromContext(
+      List<ContainerId>containerIds) throws IOException {
+    Set<ContainerId> removedContainers = new HashSet<ContainerId>();
+
+    // If the AM has pulled the completedContainer it can be removed
+    for (ContainerId containerId : containerIds) {
+      context.getContainers().remove(containerId);
+      removedContainers.add(containerId);
+    }
+
+    if (!removedContainers.isEmpty()) {
+      LOG.info("Removed completed containers from NM context: " +
+          removedContainers);
     }
   }
 
@@ -454,7 +466,7 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
       return recentlyStoppedContainers.containsKey(containerId);
     }
   }
-  
+
   @Override
   public void clearFinishedContainersFromCache() {
     synchronized (recentlyStoppedContainers) {
@@ -472,11 +484,13 @@ public class NodeStatusUpdaterImpl extends 
AbstractService implements
       while (i.hasNext()) {
         ContainerId cid = i.next();
         if (recentlyStoppedContainers.get(cid) < currentTime) {
-          i.remove();
-          try {
-            context.getNMStateStore().removeContainer(cid);
-          } catch (IOException e) {
-            LOG.error("Unable to remove container " + cid + " in store", e);
+          if (!context.getContainers().containsKey(cid)) {
+            i.remove();
+            try {
+              context.getNMStateStore().removeContainer(cid);
+            } catch (IOException e) {
+              LOG.error("Unable to remove container " + cid + " in store", e);
+            }
           }
         } else {
           break;
@@ -542,7 +556,9 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
             // don't want to remove the completed containers before resync
             // because these completed containers will be reported back to RM
             // when NM re-registers with RM.
-            removeCompletedContainersFromContext();
+            // Only remove the cleanedup containers that are acked
+            removeCompletedContainersFromContext(response
+                  .getFinishedContainersPulledByAM());
 
             lastHeartBeatID = response.getResponseId();
             List<ContainerId> containersToCleanup = response

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
index acda2a9..85bafb3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
@@ -58,6 +58,7 @@ import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -247,6 +248,10 @@ public class TestNodeManagerResync {
                   // put the completed container into the context
                   getNMContext().getContainers().put(
                     testCompleteContainer.getContainerId(), container);
+                  getNMContext().getApplications().put(
+                      testCompleteContainer.getContainerId()
+                          .getApplicationAttemptId().getApplicationId(),
+                      mock(Application.class));
                 } else {
                   // second register contains the completed container info.
                   List<NMContainerStatus> statuses =
@@ -382,9 +387,17 @@ public class TestNodeManagerResync {
             if (containersShouldBePreserved) {
               Assert.assertFalse(containers.isEmpty());
               Assert.assertTrue(containers.containsKey(existingCid));
+              Assert.assertEquals(ContainerState.RUNNING,
+                  containers.get(existingCid)
+                  .cloneAndGetContainerStatus().getState());
             } else {
-              // ensure that containers are empty before restart 
nodeStatusUpdater
-              Assert.assertTrue(containers.isEmpty());
+              // ensure that containers are empty or are completed before
+              // restart nodeStatusUpdater
+              if (!containers.isEmpty()) {
+                Assert.assertEquals(ContainerState.COMPLETE,
+                    containers.get(existingCid)
+                        .cloneAndGetContainerStatus().getState());
+              }
             }
             super.rebootNodeStatusUpdaterAndRegisterWithRM();
           }
@@ -465,7 +478,12 @@ public class TestNodeManagerResync {
 
         try {
           // ensure that containers are empty before restart nodeStatusUpdater
-          Assert.assertTrue(containers.isEmpty());
+          if (!containers.isEmpty()) {
+            for (Container container: containers.values()) {
+              Assert.assertEquals(ContainerState.COMPLETE,
+                  container.cloneAndGetContainerStatus().getState());
+            }
+          }
           super.rebootNodeStatusUpdaterAndRegisterWithRM();
           // After this point new containers are free to be launched, except
           // containers from previous RM

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index f2a3a4a..8fb51a3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.RMProxy;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -180,7 +181,7 @@ public class TestNodeStatusUpdater {
       Map<ApplicationId, List<ContainerStatus>> map =
           new HashMap<ApplicationId, List<ContainerStatus>>();
       for (ContainerStatus cs : containers) {
-        ApplicationId applicationId = 
+        ApplicationId applicationId =
             cs.getContainerId().getApplicationAttemptId().getApplicationId();
         List<ContainerStatus> appContainers = map.get(applicationId);
         if (appContainers == null) {
@@ -205,10 +206,10 @@ public class TestNodeStatusUpdater {
       nodeStatus.setResponseId(heartBeatID++);
       Map<ApplicationId, List<ContainerStatus>> appToContainers =
           getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
-      
+
       ApplicationId appId1 = ApplicationId.newInstance(0, 1);
       ApplicationId appId2 = ApplicationId.newInstance(0, 2);
-      
+
       if (heartBeatID == 1) {
         Assert.assertEquals(0, nodeStatus.getContainersStatuses().size());
 
@@ -419,7 +420,7 @@ public class TestNodeStatusUpdater {
   }
 
   private class MyNodeManager extends NodeManager {
-    
+
     private MyNodeStatusUpdater3 nodeStatusUpdater;
     @Override
     protected NodeStatusUpdater createNodeStatusUpdater(Context context,
@@ -433,7 +434,7 @@ public class TestNodeStatusUpdater {
       return this.nodeStatusUpdater;
     }
   }
-  
+
   private class MyNodeManager2 extends NodeManager {
     public boolean isStopped = false;
     private NodeStatusUpdater nodeStatusUpdater;
@@ -467,7 +468,7 @@ public class TestNodeStatusUpdater {
       syncBarrier.await(10000, TimeUnit.MILLISECONDS);
     }
   }
-  // 
+  //
   private class MyResourceTracker2 implements ResourceTracker {
     public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
     public NodeAction registerNodeAction = NodeAction.NORMAL;
@@ -478,7 +479,7 @@ public class TestNodeStatusUpdater {
     public RegisterNodeManagerResponse registerNodeManager(
         RegisterNodeManagerRequest request) throws YarnException,
         IOException {
-      
+
       RegisterNodeManagerResponse response = recordFactory
           .newRecordInstance(RegisterNodeManagerResponse.class);
       response.setNodeAction(registerNodeAction );
@@ -493,7 +494,7 @@ public class TestNodeStatusUpdater {
         throws YarnException, IOException {
       NodeStatus nodeStatus = request.getNodeStatus();
       nodeStatus.setResponseId(heartBeatID++);
-      
+
       NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
           newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null,
               null, null, null, 1000L);
@@ -501,7 +502,7 @@ public class TestNodeStatusUpdater {
       return nhResponse;
     }
   }
-  
+
   private class MyResourceTracker3 implements ResourceTracker {
     public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
     public NodeAction registerNodeAction = NodeAction.NORMAL;
@@ -513,7 +514,7 @@ public class TestNodeStatusUpdater {
     MyResourceTracker3(Context context) {
       this.context = context;
     }
-    
+
     @Override
     public RegisterNodeManagerResponse registerNodeManager(
         RegisterNodeManagerRequest request) throws YarnException,
@@ -564,6 +565,14 @@ public class TestNodeStatusUpdater {
     public NodeAction registerNodeAction = NodeAction.NORMAL;
     public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
     private Context context;
+    private final ContainerStatus containerStatus2 =
+        createContainerStatus(2, ContainerState.RUNNING);
+    private final ContainerStatus containerStatus3 =
+        createContainerStatus(3, ContainerState.COMPLETE);
+    private final ContainerStatus containerStatus4 =
+        createContainerStatus(4, ContainerState.RUNNING);
+    private final ContainerStatus containerStatus5 =
+        createContainerStatus(5, ContainerState.COMPLETE);
 
     public MyResourceTracker4(Context context) {
       this.context = context;
@@ -583,6 +592,8 @@ public class TestNodeStatusUpdater {
     @Override
     public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
         throws YarnException, IOException {
+      List<ContainerId> finishedContainersPulledByAM = new ArrayList
+          <ContainerId>();
       try {
         if (heartBeatID == 0) {
           Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
@@ -594,10 +605,6 @@ public class TestNodeStatusUpdater {
           Assert.assertEquals(statuses.size(), 2);
           Assert.assertEquals(context.getContainers().size(), 2);
 
-          ContainerStatus containerStatus2 =
-              createContainerStatus(2, ContainerState.RUNNING);
-          ContainerStatus containerStatus3 =
-              createContainerStatus(3, ContainerState.COMPLETE);
           boolean container2Exist = false, container3Exist = false;
           for (ContainerStatus status : statuses) {
             if (status.getContainerId().equals(
@@ -619,23 +626,14 @@ public class TestNodeStatusUpdater {
           // nodeStatusUpdaterRunnable, otherwise nm just shuts down and the
           // test passes.
           throw new YarnRuntimeException("Lost the heartbeat response");
-        } else if (heartBeatID == 2) {
+        } else if (heartBeatID == 2 || heartBeatID == 3) {
           List<ContainerStatus> statuses =
               request.getNodeStatus().getContainersStatuses();
           Assert.assertEquals(statuses.size(), 4);
           Assert.assertEquals(context.getContainers().size(), 4);
 
-          ContainerStatus containerStatus2 =
-              createContainerStatus(2, ContainerState.RUNNING);
-          ContainerStatus containerStatus3 =
-              createContainerStatus(3, ContainerState.COMPLETE);
-          ContainerStatus containerStatus4 =
-              createContainerStatus(4, ContainerState.RUNNING);
-          ContainerStatus containerStatus5 =
-              createContainerStatus(5, ContainerState.COMPLETE);
-
-          boolean container2Exist = false, container3Exist = false, 
container4Exist =
-              false, container5Exist = false;
+          boolean container2Exist = false, container3Exist = false,
+              container4Exist = false, container5Exist = false;
           for (ContainerStatus status : statuses) {
             if (status.getContainerId().equals(
               containerStatus2.getContainerId())) {
@@ -664,6 +662,24 @@ public class TestNodeStatusUpdater {
           }
           Assert.assertTrue(container2Exist && container3Exist
               && container4Exist && container5Exist);
+
+          if (heartBeatID == 3) {
+            
finishedContainersPulledByAM.add(containerStatus3.getContainerId());
+          }
+        } else if (heartBeatID == 4) {
+          List<ContainerStatus> statuses =
+              request.getNodeStatus().getContainersStatuses();
+          Assert.assertEquals(statuses.size(), 3);
+          Assert.assertEquals(context.getContainers().size(), 3);
+
+          boolean container3Exist = false;
+          for (ContainerStatus status : statuses) {
+            if (status.getContainerId().equals(
+                containerStatus3.getContainerId())) {
+              container3Exist = true;
+            }
+          }
+          Assert.assertFalse(container3Exist);
         }
       } catch (AssertionError error) {
         error.printStackTrace();
@@ -676,6 +692,7 @@ public class TestNodeStatusUpdater {
       NodeHeartbeatResponse nhResponse =
           YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
             heartBeatNodeAction, null, null, null, null, 1000L);
+      nhResponse.addFinishedContainersPulledByAM(finishedContainersPulledByAM);
       return nhResponse;
     }
   }
@@ -686,7 +703,7 @@ public class TestNodeStatusUpdater {
     public RegisterNodeManagerResponse registerNodeManager(
         RegisterNodeManagerRequest request) throws YarnException,
         IOException {
-      
+
       RegisterNodeManagerResponse response = recordFactory
           .newRecordInstance(RegisterNodeManagerResponse.class);
       response.setNodeAction(registerNodeAction );
@@ -694,7 +711,7 @@ public class TestNodeStatusUpdater {
       response.setNMTokenMasterKey(createMasterKey());
       return response;
     }
-    
+
     @Override
     public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
         throws YarnException, IOException {
@@ -767,11 +784,11 @@ public class TestNodeStatusUpdater {
     lfs.delete(new Path(basedir.getPath()), true);
   }
 
-  @Test(timeout = 90000)                                                      
-  public void testRecentlyFinishedContainers() throws Exception {             
-    NodeManager nm = new NodeManager();                                       
-    YarnConfiguration conf = new YarnConfiguration();                         
-    conf.set(                                                                 
+  @Test(timeout = 90000)
+  public void testRecentlyFinishedContainers() throws Exception {
+    NodeManager nm = new NodeManager();
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(
         
NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
         "10000");                                                             
     nm.init(conf);                                                            
@@ -780,27 +797,112 @@ public class TestNodeStatusUpdater {
     ApplicationId appId = ApplicationId.newInstance(0, 0);                    
     ApplicationAttemptId appAttemptId =                                       
         ApplicationAttemptId.newInstance(appId, 0);                           
-    ContainerId cId = ContainerId.newInstance(appAttemptId, 0);               
-                                                                              
-                                                                              
+    ContainerId cId = ContainerId.newInstance(appAttemptId, 0);
+    nm.getNMContext().getApplications().putIfAbsent(appId,
+        mock(Application.class));
+    nm.getNMContext().getContainers().putIfAbsent(cId, mock(Container.class));
+
     nodeStatusUpdater.addCompletedContainer(cId);
     Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));     
-                                                                              
+
+    nm.getNMContext().getContainers().remove(cId);
     long time1 = System.currentTimeMillis();                                  
     int waitInterval = 15;                                                    
     while (waitInterval-- > 0                                                 
         && nodeStatusUpdater.isContainerRecentlyStopped(cId)) {               
-      nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();          
+      nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();
       Thread.sleep(1000);                                                     
     }                                                                         
-    long time2 = System.currentTimeMillis();                                  
+    long time2 = System.currentTimeMillis();
     // By this time the container will be removed from cache. need to verify.
-    Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId));    
-    Assert.assertTrue((time2 - time1) >= 10000 && (time2 -time1) <= 250000);  
-  }                                                                           
-                                                                              
+    Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId));
+    Assert.assertTrue((time2 - time1) >= 10000 && (time2 - time1) <= 250000);
+  }
 
+  @Test(timeout = 90000)
+  public void testRemovePreviousCompletedContainersFromContext() throws 
Exception {
+    NodeManager nm = new NodeManager();
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(
+        NodeStatusUpdaterImpl
+            .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
+        "10000");
+    nm.init(conf);
+    NodeStatusUpdaterImpl nodeStatusUpdater =
+        (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
+    ApplicationId appId = ApplicationId.newInstance(0, 0);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 0);
   
+    ContainerId cId = ContainerId.newInstance(appAttemptId, 1);
+    Token containerToken =
+        BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser",
+            BuilderUtils.newResource(1024, 1), 0, 123,
+            "password".getBytes(), 0);
+    Container anyCompletedContainer = new ContainerImpl(conf, null,
+        null, null, null, null,
+        BuilderUtils.newContainerTokenIdentifier(containerToken)) {
+
+      @Override
+      public ContainerState getCurrentState() {
+        return ContainerState.COMPLETE;
+      }
+    };
+
+    nm.getNMContext().getApplications().putIfAbsent(appId,
+        mock(Application.class));
+    nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
+    Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
+
+    List<ContainerId> ackedContainers = new ArrayList<ContainerId>();
+    ackedContainers.add(cId);
+
+    nodeStatusUpdater.removeCompletedContainersFromContext(ackedContainers);
+    Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().isEmpty());
+  }
+
+  @Test
+  public void testCleanedupApplicationContainerCleanup() throws IOException {
+    NodeManager nm = new NodeManager();
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(NodeStatusUpdaterImpl
+            .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
+        "1000000");
+    nm.init(conf);
+
+    NodeStatusUpdaterImpl nodeStatusUpdater =
+        (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
+    ApplicationId appId = ApplicationId.newInstance(0, 0);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 0);
+
+    ContainerId cId = ContainerId.newInstance(appAttemptId, 1);
+    Token containerToken =
+        BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser",
+            BuilderUtils.newResource(1024, 1), 0, 123,
+            "password".getBytes(), 0);
+    Container anyCompletedContainer = new ContainerImpl(conf, null,
+        null, null, null, null,
+        BuilderUtils.newContainerTokenIdentifier(containerToken)) {
+
+      @Override
+      public ContainerState getCurrentState() {
+        return ContainerState.COMPLETE;
+      }
+    };
+
+    nm.getNMContext().getApplications().putIfAbsent(appId,
+        mock(Application.class));
+    nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
+
+    Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
+
+    nm.getNMContext().getApplications().remove(appId);
+    nodeStatusUpdater.removeCompletedContainersFromContext(new ArrayList
+        <ContainerId>());
+    Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size());
+  }
+
   @Test
   public void testNMRegistration() throws InterruptedException {
     nm = new NodeManager() {
@@ -860,7 +962,7 @@ public class TestNodeStatusUpdater {
 
     nm.stop();
   }
-  
+
   @Test
   public void testStopReentrant() throws Exception {
     final AtomicInteger numCleanups = new AtomicInteger(0);
@@ -875,7 +977,7 @@ public class TestNodeStatusUpdater {
         myNodeStatusUpdater.resourceTracker = myResourceTracker2;
         return myNodeStatusUpdater;
       }
-      
+
       @Override
       protected ContainerManagerImpl createContainerManager(Context context,
           ContainerExecutor exec, DeletionService del,
@@ -897,7 +999,7 @@ public class TestNodeStatusUpdater {
     YarnConfiguration conf = createNMConfig();
     nm.init(conf);
     nm.start();
-    
+
     int waitCount = 0;
     while (heartBeatID < 1 && waitCount++ != 200) {
       Thread.sleep(500);
@@ -906,7 +1008,7 @@ public class TestNodeStatusUpdater {
 
     // Meanwhile call stop directly as the shutdown hook would
     nm.stop();
-    
+
     // NM takes a while to reach the STOPPED state.
     waitCount = 0;
     while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
@@ -1172,9 +1274,13 @@ public class TestNodeStatusUpdater {
     nm.start();
 
     int waitCount = 0;
-    while (heartBeatID <= 3 && waitCount++ != 20) {
+    while (heartBeatID <= 4 && waitCount++ != 20) {
       Thread.sleep(500);
     }
+    if (heartBeatID <= 4) {
+      Assert.fail("Failed to get all heartbeats in time, " +
+          "heartbeatID:" + heartBeatID);
+    }
     if(assertionFailedInThread.get()) {
       Assert.fail("ContainerStatus Backup failed");
     }
@@ -1182,7 +1288,7 @@ public class TestNodeStatusUpdater {
   }
 
   @Test(timeout = 200000)
-  public void testNodeStatusUpdaterRetryAndNMShutdown() 
+  public void testNodeStatusUpdaterRetryAndNMShutdown()
       throws Exception {
     final long connectionWaitSecs = 1000;
     final long connectionRetryIntervalMs = 1000;
@@ -1190,7 +1296,7 @@ public class TestNodeStatusUpdater {
     conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
         connectionWaitSecs);
     conf.setLong(YarnConfiguration
-        .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
+            .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
         connectionRetryIntervalMs);
     conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000);
     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
@@ -1281,30 +1387,36 @@ public class TestNodeStatusUpdater {
       } else if (heartBeatID == 1) {
         ContainerStatus containerStatus2 =
             createContainerStatus(2, ContainerState.RUNNING);
-        Container container2 = getMockContainer(containerStatus2);
-        containers.put(containerStatus2.getContainerId(), container2);
+        putMockContainer(containerStatus2);
 
         ContainerStatus containerStatus3 =
             createContainerStatus(3, ContainerState.COMPLETE);
-        Container container3 = getMockContainer(containerStatus3);
-        containers.put(containerStatus3.getContainerId(), container3);
+        putMockContainer(containerStatus3);
         return containers;
       } else if (heartBeatID == 2) {
         ContainerStatus containerStatus4 =
             createContainerStatus(4, ContainerState.RUNNING);
-        Container container4 = getMockContainer(containerStatus4);
-        containers.put(containerStatus4.getContainerId(), container4);
+        putMockContainer(containerStatus4);
 
         ContainerStatus containerStatus5 =
             createContainerStatus(5, ContainerState.COMPLETE);
-        Container container5 = getMockContainer(containerStatus5);
-        containers.put(containerStatus5.getContainerId(), container5);
+        putMockContainer(containerStatus5);
+        return containers;
+      } else if (heartBeatID == 3 || heartBeatID == 4) {
         return containers;
       } else {
         containers.clear();
         return containers;
       }
     }
+
+    private void putMockContainer(ContainerStatus containerStatus) {
+      Container container = getMockContainer(containerStatus);
+      containers.put(containerStatus.getContainerId(), container);
+      applications.putIfAbsent(containerStatus.getContainerId()
+          .getApplicationAttemptId().getApplicationId(),
+          mock(Application.class));
+    }
   }
 
   public static ContainerStatus createContainerStatus(int id,
@@ -1345,7 +1457,7 @@ public class TestNodeStatusUpdater {
         throw e;
       }
     }
-    
+
     // the service should be stopped
     Assert.assertEquals("NM state is wrong!", STATE.STOPPED, nm
         .getServiceState());
@@ -1364,7 +1476,7 @@ public class TestNodeStatusUpdater {
     }
     conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB
     conf.set(YarnConfiguration.NM_ADDRESS, localhostAddress + ":12345");
-    conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, localhostAddress + 
":12346");  
+    conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, localhostAddress + 
":12346");
     conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
     conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
       remoteLogsDir.getAbsolutePath());
@@ -1372,7 +1484,7 @@ public class TestNodeStatusUpdater {
     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
     return conf;
   }
-  
+
   private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) {
     return new NodeManager() {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 4798120..4222888 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -198,7 +198,7 @@ public class ResourceTrackerService extends AbstractService 
implements
    */
   @SuppressWarnings("unchecked")
   @VisibleForTesting
-  void handleNMContainerStatus(NMContainerStatus containerStatus) {
+  void handleNMContainerStatus(NMContainerStatus containerStatus, NodeId 
nodeId) {
     ApplicationAttemptId appAttemptId =
         containerStatus.getContainerId().getApplicationAttemptId();
     RMApp rmApp =
@@ -229,7 +229,8 @@ public class ResourceTrackerService extends AbstractService 
implements
             containerStatus.getContainerExitStatus());
       // sending master container finished event.
       RMAppAttemptContainerFinishedEvent evt =
-          new RMAppAttemptContainerFinishedEvent(appAttemptId, status);
+          new RMAppAttemptContainerFinishedEvent(appAttemptId, status,
+              nodeId);
       rmContext.getDispatcher().getEventHandler().handle(evt);
     }
   }
@@ -324,7 +325,7 @@ public class ResourceTrackerService extends AbstractService 
implements
         LOG.info("received container statuses on node manager register :"
             + request.getNMContainerStatuses());
         for (NMContainerStatus status : request.getNMContainerStatuses()) {
-          handleNMContainerStatus(status);
+          handleNMContainerStatus(status, nodeId);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index ff520be..0b8f321 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -1181,7 +1181,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       int numberOfFailure = app.getNumFailedAppAttempts();
       if (!app.submissionContext.getUnmanagedAM()
           && numberOfFailure < app.maxAppAttempts) {
-        boolean transferStateFromPreviousAttempt = false;
+        boolean transferStateFromPreviousAttempt;
         RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
         transferStateFromPreviousAttempt =
             failedEvent.getTransferStateFromPreviousAttempt();
@@ -1191,11 +1191,11 @@ public class RMAppImpl implements RMApp, Recoverable {
         // Transfer the state from the previous attempt to the current attempt.
         // Note that the previous failed attempt may still be collecting the
         // container events from the scheduler and update its data structures
-        // before the new attempt is created.
-        if (transferStateFromPreviousAttempt) {
-          ((RMAppAttemptImpl) app.currentAttempt)
-            .transferStateFromPreviousAttempt(oldAttempt);
-        }
+        // before the new attempt is created. We always transferState for
+        // finished containers so that they can be acked to NM,
+        // but when pulling finished container we will check this flag again.
+        ((RMAppAttemptImpl) app.currentAttempt)
+          .transferStateFromPreviousAttempt(oldAttempt);
         return initialState;
       } else {
         if (numberOfFailure >= app.maxAppAttempts) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
index 943a5e5..cf8c2bb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
 
 import java.util.List;
+import java.util.concurrent.ConcurrentMap;
 
 import javax.crypto.SecretKey;
 
@@ -31,6 +32,7 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -120,14 +122,29 @@ public interface RMAppAttempt extends 
EventHandler<RMAppAttemptEvent> {
   List<ContainerStatus> pullJustFinishedContainers();
 
   /**
-   * Return the list of last set of finished containers. This does not reset 
the
-   * finished containers.
-   * @return the list of just finished contianers, this does not reset the
+   * Returns a reference to the map of last set of finished containers to the
+   * corresponding node. This does not reset the finished containers.
+   * @return the list of just finished containers, this does not reset the
    * finished containers.
    */
+  ConcurrentMap<NodeId, List<ContainerStatus>>
+      getJustFinishedContainersReference();
+
+  /**
+   * Return the list of last set of finished containers. This does not reset
+   * the finished containers.
+   * @return the list of just finished containers
+   */
   List<ContainerStatus> getJustFinishedContainers();
 
   /**
+   * The map of conatiners per Node that are already sent to the AM.
+   * @return map of per node list of finished container status sent to AM
+   */
+  ConcurrentMap<NodeId, List<ContainerStatus>>
+      getFinishedContainersSentToAMReference();
+
+  /**
    * The container on which the Application Master is running.
    * @return the {@link Container} on which the application master is running.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 7ca57ee..d75a871 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -24,9 +24,12 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -52,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
@@ -83,6 +87,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -129,9 +134,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
   private final ApplicationSubmissionContext submissionContext;
   private Token<AMRMTokenIdentifier> amrmToken = null;
   private SecretKey clientTokenMasterKey = null;
-  
-  private List<ContainerStatus> justFinishedContainers =
-    new ArrayList<ContainerStatus>();
+
+  private ConcurrentMap<NodeId, List<ContainerStatus>>
+      justFinishedContainers =
+      new ConcurrentHashMap<NodeId, List<ContainerStatus>>();
+  // Tracks the previous finished containers that are waiting to be
+  // verified as received by the AM. If the AM sends the next allocate
+  // request it implicitly acks this list.
+  private ConcurrentMap<NodeId, List<ContainerStatus>>
+      finishedContainersSentToAM =
+      new ConcurrentHashMap<NodeId, List<ContainerStatus>>();
   private Container masterContainer;
 
   private float progress = 0;
@@ -627,10 +639,28 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
     }
   }
 
+  @VisibleForTesting
   @Override
   public List<ContainerStatus> getJustFinishedContainers() {
     this.readLock.lock();
     try {
+      List<ContainerStatus> returnList = new ArrayList<ContainerStatus>();
+      for (Collection<ContainerStatus> containerStatusList :
+          justFinishedContainers.values()) {
+        returnList.addAll(containerStatusList);
+      }
+      return returnList;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  public ConcurrentMap<NodeId, List<ContainerStatus>>
+  getJustFinishedContainersReference
+      () {
+    this.readLock.lock();
+    try {
       return this.justFinishedContainers;
     } finally {
       this.readLock.unlock();
@@ -638,14 +668,67 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
   }
 
   @Override
+  public ConcurrentMap<NodeId, List<ContainerStatus>>
+  getFinishedContainersSentToAMReference() {
+    this.readLock.lock();
+    try {
+      return this.finishedContainersSentToAM;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
   public List<ContainerStatus> pullJustFinishedContainers() {
     this.writeLock.lock();
 
     try {
-      List<ContainerStatus> returnList = new ArrayList<ContainerStatus>(
-          this.justFinishedContainers.size());
-      returnList.addAll(this.justFinishedContainers);
-      this.justFinishedContainers.clear();
+      List<ContainerStatus> returnList = new ArrayList<ContainerStatus>();
+
+      // A new allocate means the AM received the previously sent
+      // finishedContainers. We can ack this to NM now
+      for (NodeId nodeId:finishedContainersSentToAM.keySet()) {
+
+        // Clear and get current values
+        List<ContainerStatus> currentSentContainers =
+            finishedContainersSentToAM
+            .put(nodeId, new ArrayList<ContainerStatus>());
+        List<ContainerId> containerIdList = new ArrayList<ContainerId>
+            (currentSentContainers.size());
+        for (ContainerStatus containerStatus:currentSentContainers) {
+          containerIdList.add(containerStatus.getContainerId());
+        }
+        eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(
+            nodeId, containerIdList));
+      }
+
+      // Mark every containerStatus as being sent to AM though we may return
+      // only the ones that belong to the current attempt
+      boolean keepContainersAcressAttempts = this.submissionContext
+          .getKeepContainersAcrossApplicationAttempts();
+      for (NodeId nodeId:justFinishedContainers.keySet()) {
+
+        // Clear and get current values
+        List<ContainerStatus> finishedContainers = justFinishedContainers.put
+            (nodeId, new ArrayList<ContainerStatus>());
+
+        if (keepContainersAcressAttempts) {
+          returnList.addAll(finishedContainers);
+        } else {
+          // Filter out containers from previous attempt
+          for (ContainerStatus containerStatus: finishedContainers) {
+            if (containerStatus.getContainerId().getApplicationAttemptId()
+                .equals(this.getAppAttemptId())) {
+              returnList.add(containerStatus);
+            }
+          }
+        }
+
+        finishedContainersSentToAM.putIfAbsent(nodeId, new ArrayList
+              <ContainerStatus>());
+        finishedContainersSentToAM.get(nodeId).addAll(finishedContainers);
+      }
+
       return returnList;
     } finally {
       this.writeLock.unlock();
@@ -732,7 +815,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
     }
     setMasterContainer(attemptState.getMasterContainer());
     recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(),
-      attemptState.getState());
+        attemptState.getState());
     this.recoveredFinalState = attemptState.getState();
     this.originalTrackingUrl = attemptState.getFinalTrackingUrl();
     this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
@@ -744,7 +827,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
   }
 
   public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
-    this.justFinishedContainers = attempt.getJustFinishedContainers();
+    this.justFinishedContainers = attempt.getJustFinishedContainersReference();
+    this.finishedContainersSentToAM =
+        attempt.getFinishedContainersSentToAMReference();
   }
 
   private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
@@ -1507,6 +1592,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
       ContainerStatus containerStatus =
           containerFinishedEvent.getContainerStatus();
 
+      // Add all finished containers so that they can be acked to NM
+      addJustFinishedContainer(appAttempt, containerFinishedEvent);
+
       // Is this container the AmContainer? If the finished container is same 
as
       // the AMContainer, AppAttempt fails
       if (appAttempt.masterContainer != null
@@ -1519,12 +1607,18 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
         return RMAppAttemptState.FINAL_SAVING;
       }
 
-      // Normal container.Put it in completed containers list
-      appAttempt.justFinishedContainers.add(containerStatus);
       return this.currentState;
     }
   }
 
+  private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt,
+      RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
+    appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent
+        .getNodeId(), new ArrayList<ContainerStatus>());
+    appAttempt.justFinishedContainers.get(containerFinishedEvent
+            .getNodeId()).add(containerFinishedEvent.getContainerStatus());
+  }
+
   private static final class ContainerFinishedAtFinalStateTransition
       extends BaseTransition {
     @Override
@@ -1533,10 +1627,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
       RMAppAttemptContainerFinishedEvent containerFinishedEvent =
           (RMAppAttemptContainerFinishedEvent) event;
       
-      ContainerStatus containerStatus =
-          containerFinishedEvent.getContainerStatus();
       // Normal container. Add it in completed containers list
-      appAttempt.justFinishedContainers.add(containerStatus);
+      addJustFinishedContainer(appAttempt, containerFinishedEvent);
     }
   }
 
@@ -1569,6 +1661,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
       ContainerStatus containerStatus =
           containerFinishedEvent.getContainerStatus();
 
+      // Add all finished containers so that they can be acked to NM.
+      addJustFinishedContainer(appAttempt, containerFinishedEvent);
+
       // Is this container the ApplicationMaster container?
       if (appAttempt.masterContainer.getId().equals(
           containerStatus.getContainerId())) {
@@ -1576,8 +1671,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
             appAttempt, containerFinishedEvent);
         return RMAppAttemptState.FINISHED;
       }
-      // Normal container.
-      appAttempt.justFinishedContainers.add(containerStatus);
+
       return RMAppAttemptState.FINISHING;
     }
   }
@@ -1592,6 +1686,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
       ContainerStatus containerStatus =
           containerFinishedEvent.getContainerStatus();
 
+      // Add all finished containers so that they can be acked to NM.
+      addJustFinishedContainer(appAttempt, containerFinishedEvent);
+
       // If this is the AM container, it means the AM container is finished,
       // but we are not yet acknowledged that the final state has been saved.
       // Thus, we still return FINAL_SAVING state here.
@@ -1611,8 +1708,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
             appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED);
         return;
       }
-      // Normal container.
-      appAttempt.justFinishedContainers.add(containerStatus);
     }
   }
 
@@ -1629,7 +1724,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
         transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
       appAttempt.updateInfoOnAMUnregister(amUnregisteredEvent);
       new FinalTransition(RMAppAttemptState.FINISHED).transition(appAttempt,
-        event);
+          event);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java
index 3660597..39c6f29 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java
@@ -20,21 +20,27 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 
 public class RMAppAttemptContainerFinishedEvent extends RMAppAttemptEvent {
 
   private final ContainerStatus containerStatus;
+  private final NodeId nodeId;
 
   public RMAppAttemptContainerFinishedEvent(ApplicationAttemptId appAttemptId, 
-      ContainerStatus containerStatus) {
+      ContainerStatus containerStatus, NodeId nodeId) {
     super(appAttemptId, RMAppAttemptEventType.CONTAINER_FINISHED);
     this.containerStatus = containerStatus;
+    this.nodeId = nodeId;
   }
 
   public ContainerStatus getContainerStatus() {
     return this.containerStatus;
   }
 
+  public NodeId getNodeId() {
+    return this.nodeId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 885e864..479734a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -78,13 +78,13 @@ public class RMContainerImpl implements RMContainer {
         RMContainerEventType.RECOVER, new ContainerRecoveredTransition())
 
     // Transitions from RESERVED state
-    .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, 
+    .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED,
         RMContainerEventType.RESERVED, new ContainerReservedTransition())
-    .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED, 
+    .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED,
         RMContainerEventType.START, new ContainerStartedTransition())
-    .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED, 
+    .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED,
         RMContainerEventType.KILL) // nothing to do
-    .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED, 
+    .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED,
         RMContainerEventType.RELEASED) // nothing to do
        
 
@@ -100,7 +100,7 @@ public class RMContainerImpl implements RMContainer {
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING,
         RMContainerEventType.LAUNCHED, new LaunchedTransition())
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED,
-        RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState()) 
       
+        RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState())
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED,
         RMContainerEventType.RELEASED, new KillTransition())
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.EXPIRED,
@@ -495,7 +495,8 @@ public class RMContainerImpl implements RMContainer {
       updateAttemptMetrics(container);
 
       container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
-        container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
+        container.appAttemptId, finishedEvent.getRemoteContainerStatus(),
+          container.getAllocatedNode()));
 
       container.rmContext.getRMApplicationHistoryWriter().containerFinished(
         container);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
index c0096b9..b4d0b8b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
@@ -40,6 +40,9 @@ public enum RMNodeEventType {
   CONTAINER_ALLOCATED,
   CLEANUP_CONTAINER,
 
+  // Source: RMAppAttempt
+  FINISHED_CONTAINERS_PULLED_BY_AM,
+
   // Source: NMLivelinessMonitor
   EXPIRE
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeFinishedContainersPulledByAMEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeFinishedContainersPulledByAMEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeFinishedContainersPulledByAMEvent.java
new file mode 100644
index 0000000..a4fb707
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeFinishedContainersPulledByAMEvent.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+import java.util.List;
+
+// Happens after an implicit ack from AM that the container completion has
+// been notified successfully to the AM
+public class RMNodeFinishedContainersPulledByAMEvent extends RMNodeEvent {
+
+  private List<ContainerId> containers;
+
+  public RMNodeFinishedContainersPulledByAMEvent(NodeId nodeId,
+      List<ContainerId> containers) {
+    super(nodeId, RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM);
+    this.containers = containers;
+  }
+
+  public List<ContainerId> getContainers() {
+    return this.containers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 1265958..f0ae826 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -112,6 +112,10 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
   private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
       new ContainerIdComparator());
 
+  /* set of containers that were notified to AM about their completion */
+  private final Set<ContainerId> finishedContainersPulledByAM =
+      new HashSet<ContainerId>();
+
   /* the list of applications that have finished and need to be purged */
   private final List<ApplicationId> finishedApplications = new 
ArrayList<ApplicationId>();
 
@@ -135,7 +139,7 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
          new UpdateNodeResourceWhenUnusableTransition())
 
      //Transitions from RUNNING state
-     .addTransition(NodeState.RUNNING, 
+     .addTransition(NodeState.RUNNING,
          EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
          RMNodeEventType.STATUS_UPDATE, new 
StatusUpdateWhenHealthyTransition())
      .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED,
@@ -152,29 +156,39 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
          RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
+         RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+         new FinishedContainersPulledByAMTransition())
+     .addTransition(NodeState.RUNNING, NodeState.RUNNING,
          RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
          RMNodeEventType.RESOURCE_UPDATE, new 
UpdateNodeResourceWhenRunningTransition())
 
      //Transitions from REBOOTED state
      .addTransition(NodeState.REBOOTED, NodeState.REBOOTED,
-         RMNodeEventType.RESOURCE_UPDATE, 
+         RMNodeEventType.RESOURCE_UPDATE,
          new UpdateNodeResourceWhenUnusableTransition())
          
      //Transitions from DECOMMISSIONED state
      .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
-         RMNodeEventType.RESOURCE_UPDATE, 
+         RMNodeEventType.RESOURCE_UPDATE,
          new UpdateNodeResourceWhenUnusableTransition())
-         
+     .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
+         RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+         new FinishedContainersPulledByAMTransition())
+
      //Transitions from LOST state
      .addTransition(NodeState.LOST, NodeState.LOST,
-         RMNodeEventType.RESOURCE_UPDATE, 
+         RMNodeEventType.RESOURCE_UPDATE,
          new UpdateNodeResourceWhenUnusableTransition())
+     .addTransition(NodeState.LOST, NodeState.LOST,
+         RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+         new FinishedContainersPulledByAMTransition())
 
      //Transitions from UNHEALTHY state
-     .addTransition(NodeState.UNHEALTHY, 
+     .addTransition(NodeState.UNHEALTHY,
          EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING),
-         RMNodeEventType.STATUS_UPDATE, new 
StatusUpdateWhenUnHealthyTransition())
+         RMNodeEventType.STATUS_UPDATE,
+         new StatusUpdateWhenUnHealthyTransition())
      .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED,
          RMNodeEventType.DECOMMISSION,
          new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
@@ -192,7 +206,10 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
          RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
      .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
          RMNodeEventType.RESOURCE_UPDATE, new 
UpdateNodeResourceWhenUnusableTransition())
-         
+     .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
+         RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+         new FinishedContainersPulledByAMTransition())
+
      // create the topology tables
      .installTopology(); 
 
@@ -365,8 +382,11 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
       response.addAllContainersToCleanup(
           new ArrayList<ContainerId>(this.containersToClean));
       response.addAllApplicationsToCleanup(this.finishedApplications);
+      response.addFinishedContainersPulledByAM(
+          new ArrayList<ContainerId>(this.finishedContainersPulledByAM));
       this.containersToClean.clear();
       this.finishedApplications.clear();
+      this.finishedContainersPulledByAM.clear();
     } finally {
       this.writeLock.unlock();
     }
@@ -652,6 +672,16 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
     }
   }
 
+  public static class FinishedContainersPulledByAMTransition implements
+      SingleArcTransition<RMNodeImpl, RMNodeEvent> {
+
+    @Override
+    public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+      rmNode.finishedContainersPulledByAM.addAll(((
+          RMNodeFinishedContainersPulledByAMEvent) event).getContainers());
+    }
+  }
+
   public static class DeactivateNodeTransition
     implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
 
@@ -726,7 +756,7 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
           new ArrayList<ContainerStatus>();
       for (ContainerStatus remoteContainer : statusEvent.getContainers()) {
         ContainerId containerId = remoteContainer.getContainerId();
-        
+
         // Don't bother with containers already scheduled for cleanup, or for
         // applications already killed. The scheduler doens't need to know any
         // more about this container

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index 115f0b4..877a122 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -491,7 +491,7 @@ public class TestResourceTrackerService {
             ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1),
           ContainerState.COMPLETE, Resource.newInstance(1024, 1),
           "Dummy Completed", 0, Priority.newInstance(10), 1234);
-    rm.getResourceTrackerService().handleNMContainerStatus(report);
+    rm.getResourceTrackerService().handleNMContainerStatus(report, null);
     verify(handler, never()).handle((Event) any());
 
     // Case 1.2: Master container is null
@@ -502,7 +502,7 @@ public class TestResourceTrackerService {
           ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
           ContainerState.COMPLETE, Resource.newInstance(1024, 1),
           "Dummy Completed", 0, Priority.newInstance(10), 1234);
-    rm.getResourceTrackerService().handleNMContainerStatus(report);
+    rm.getResourceTrackerService().handleNMContainerStatus(report, null);
     verify(handler, never()).handle((Event)any());
 
     // Case 2: Managed AM
@@ -515,7 +515,7 @@ public class TestResourceTrackerService {
           ContainerState.COMPLETE, Resource.newInstance(1024, 1),
           "Dummy Completed", 0, Priority.newInstance(10), 1234);
     try {
-      rm.getResourceTrackerService().handleNMContainerStatus(report);
+      rm.getResourceTrackerService().handleNMContainerStatus(report, null);
     } catch (Exception e) {
       // expected - ignore
     }
@@ -530,7 +530,7 @@ public class TestResourceTrackerService {
       ContainerState.COMPLETE, Resource.newInstance(1024, 1),
       "Dummy Completed", 0, Priority.newInstance(10), 1234);
     try {
-      rm.getResourceTrackerService().handleNMContainerStatus(report);
+      rm.getResourceTrackerService().handleNMContainerStatus(report, null);
     } catch (Exception e) {
       // expected - ignore
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index fcb4e45..ba592fc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -98,6 +98,9 @@ public class TestAMRestart {
       Thread.sleep(200);
     }
 
+    ContainerId amContainerId = ContainerId.newInstance(am1
+        .getApplicationAttemptId(), 1);
+
     // launch the 2nd container, for testing running container transferred.
     nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, 
ContainerState.RUNNING);
     ContainerId containerId2 =
@@ -196,11 +199,15 @@ public class TestAMRestart {
     // completed containerId4 is also transferred to the new attempt.
     RMAppAttempt newAttempt =
         app1.getRMAppAttempt(am2.getApplicationAttemptId());
-    // 4 containers finished, acquired/allocated/reserved/completed.
-    waitForContainersToFinish(4, newAttempt);
+    // 4 containers finished, acquired/allocated/reserved/completed + AM
+    // container.
+    waitForContainersToFinish(5, newAttempt);
     boolean container3Exists = false, container4Exists = false, 
container5Exists =
-        false, container6Exists = false;
+        false, container6Exists = false, amContainerExists = false;
     for(ContainerStatus status :  newAttempt.getJustFinishedContainers()) {
+      if(status.getContainerId().equals(amContainerId)) {
+        amContainerExists = true;
+      }
       if(status.getContainerId().equals(containerId3)) {
         // containerId3 is the container ran by previous attempt but finished 
by the
         // new attempt.
@@ -220,8 +227,11 @@ public class TestAMRestart {
         container6Exists = true;
       }
     }
-    Assert.assertTrue(container3Exists && container4Exists && container5Exists
-        && container6Exists);
+    Assert.assertTrue(amContainerExists);
+    Assert.assertTrue(container3Exists);
+    Assert.assertTrue(container4Exists);
+    Assert.assertTrue(container5Exists);
+    Assert.assertTrue(container6Exists);
 
     // New SchedulerApplicationAttempt also has the containers info.
     rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
@@ -240,14 +250,14 @@ public class TestAMRestart {
     // all 4 normal containers finished.
     System.out.println("New attempt's just finished containers: "
         + newAttempt.getJustFinishedContainers());
-    waitForContainersToFinish(5, newAttempt);
+    waitForContainersToFinish(6, newAttempt);
     rm1.stop();
   }
 
   private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt)
       throws InterruptedException {
     int count = 0;
-    while (attempt.getJustFinishedContainers().size() != expectedNum
+    while (attempt.getJustFinishedContainers().size() < expectedNum
         && count < 500) {
       Thread.sleep(100);
       count++;

Reply via email to