http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
index b8e6f43..15028f9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
@@ -28,6 +28,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -35,6 +36,7 @@ import static org.mockito.Mockito.when;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -91,6 +93,11 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@@ -151,6 +158,7 @@ public class TestRMAppAttemptTransitions {
   private NMTokenSecretManagerInRM nmTokenManager =
       spy(new NMTokenSecretManagerInRM(conf));
   private boolean transferStateFromPreviousAttempt = false;
+  private EventHandler<RMNodeEvent> rmnodeEventHandler;
 
   private final class TestApplicationAttemptEventDispatcher implements
       EventHandler<RMAppAttemptEvent> {
@@ -203,7 +211,7 @@ public class TestRMAppAttemptTransitions {
       applicationMasterLauncher.handle(event);
     }
   }
-  
+
   private static int appId = 1;
   
   private ApplicationSubmissionContext submissionContext = null;
@@ -268,6 +276,9 @@ public class TestRMAppAttemptTransitions {
     rmDispatcher.register(AMLauncherEventType.class, 
         new TestAMLauncherEventDispatcher());
 
+    rmnodeEventHandler = mock(RMNodeImpl.class);
+    rmDispatcher.register(RMNodeEventType.class, rmnodeEventHandler);
+
     rmDispatcher.init(conf);
     rmDispatcher.start();
     
@@ -575,6 +586,8 @@ public class TestRMAppAttemptTransitions {
     }
     assertEquals(finishedContainerCount, applicationAttempt
         .getJustFinishedContainers().size());
+    Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt)
+        .size());
     assertEquals(container, applicationAttempt.getMasterContainer());
     assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
     verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
@@ -704,7 +717,8 @@ public class TestRMAppAttemptTransitions {
     application.handle(new 
RMAppRunningOnNodeEvent(application.getApplicationId(),
         container.getNodeId()));
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
-        applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class)));
+        applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class),
+        container.getNodeId()));
     // complete AM
     String diagnostics = "Successful";
     FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
@@ -752,10 +766,11 @@ public class TestRMAppAttemptTransitions {
     when(appResUsgRpt.getMemorySeconds()).thenReturn(223456L);
     when(appResUsgRpt.getVcoreSeconds()).thenReturn(75544L);
     sendAttemptUpdateSavedEvent(applicationAttempt);
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
         attemptId, 
         ContainerStatus.newInstance(
-            amContainer.getId(), ContainerState.COMPLETE, "", 0)));
+            amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId));
 
     when(scheduler.getSchedulerAppInfo(eq(attemptId))).thenReturn(null);
 
@@ -857,8 +872,9 @@ public class TestRMAppAttemptTransitions {
             SchedulerUtils.LOST_CONTAINER);
     // send CONTAINER_FINISHED event at SCHEDULED state,
     // The state should be FINAL_SAVING with previous state SCHEDULED
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
-        applicationAttempt.getAppAttemptId(), cs));
+        applicationAttempt.getAppAttemptId(), cs, anyNodeId));
     // createApplicationAttemptState will return previous state (SCHEDULED),
     // if the current state is FINAL_SAVING.
     assertEquals(YarnApplicationAttemptState.SCHEDULED,
@@ -904,8 +920,9 @@ public class TestRMAppAttemptTransitions {
     ContainerStatus cs =
         BuilderUtils.newContainerStatus(amContainer.getId(),
           ContainerState.COMPLETE, containerDiagMsg, exitCode);
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
-      applicationAttempt.getAppAttemptId(), cs));
+      applicationAttempt.getAppAttemptId(), cs, anyNodeId));
     assertEquals(YarnApplicationAttemptState.ALLOCATED,
         applicationAttempt.createApplicationAttemptState());
     sendAttemptUpdateSavedEvent(applicationAttempt);
@@ -928,16 +945,17 @@ public class TestRMAppAttemptTransitions {
     ContainerStatus cs = BuilderUtils.newContainerStatus(amContainer.getId(),
         ContainerState.COMPLETE, containerDiagMsg, exitCode);
     ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
-        appAttemptId, cs));
+        appAttemptId, cs, anyNodeId));
 
     // ignored ContainerFinished and Expire at FinalSaving if we were supposed
     // to Failed state.
     assertEquals(RMAppAttemptState.FINAL_SAVING,
-      applicationAttempt.getAppAttemptState()); 
+      applicationAttempt.getAppAttemptState());
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
       applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
-        amContainer.getId(), ContainerState.COMPLETE, "", 0)));
+        amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId));
     applicationAttempt.handle(new RMAppAttemptEvent(
       applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE));
     assertEquals(RMAppAttemptState.FINAL_SAVING,
@@ -947,7 +965,7 @@ public class TestRMAppAttemptTransitions {
     sendAttemptUpdateSavedEvent(applicationAttempt);
     assertEquals(RMAppAttemptState.FAILED,
         applicationAttempt.getAppAttemptState());
-    assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
+    assertEquals(2, applicationAttempt.getJustFinishedContainers().size());
     assertEquals(amContainer, applicationAttempt.getMasterContainer());
     assertEquals(0, application.getRanNodes().size());
     String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
@@ -971,10 +989,11 @@ public class TestRMAppAttemptTransitions {
     // ignored ContainerFinished and Expire at FinalSaving if we were supposed
     // to Killed state.
     assertEquals(RMAppAttemptState.FINAL_SAVING,
-      applicationAttempt.getAppAttemptState()); 
+      applicationAttempt.getAppAttemptState());
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
       applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
-        amContainer.getId(), ContainerState.COMPLETE, "", 0)));
+        amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId));
     applicationAttempt.handle(new RMAppAttemptEvent(
       applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE));
     assertEquals(RMAppAttemptState.FINAL_SAVING,
@@ -984,7 +1003,7 @@ public class TestRMAppAttemptTransitions {
     sendAttemptUpdateSavedEvent(applicationAttempt);
     assertEquals(RMAppAttemptState.KILLED,
         applicationAttempt.getAppAttemptState());
-    assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
+    assertEquals(1,applicationAttempt.getJustFinishedContainers().size());
     assertEquals(amContainer, applicationAttempt.getMasterContainer());
     assertEquals(0, application.getRanNodes().size());
     String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
@@ -1144,13 +1163,14 @@ public class TestRMAppAttemptTransitions {
     unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
         diagnostics);
     // container must be AM container to move from FINISHING to FINISHED
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(
         new RMAppAttemptContainerFinishedEvent(
             applicationAttempt.getAppAttemptId(),
             BuilderUtils.newContainerStatus(
                 BuilderUtils.newContainerId(
                     applicationAttempt.getAppAttemptId(), 42),
-                ContainerState.COMPLETE, "", 0)));
+                ContainerState.COMPLETE, "", 0), anyNodeId));
     testAppAttemptFinishingState(amContainer, finalStatus, trackingUrl,
         diagnostics);
   }
@@ -1165,13 +1185,14 @@ public class TestRMAppAttemptTransitions {
     String diagnostics = "Successful";
     unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
         diagnostics);
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(
         new RMAppAttemptContainerFinishedEvent(
             applicationAttempt.getAppAttemptId(),
             BuilderUtils.newContainerStatus(amContainer.getId(),
-                ContainerState.COMPLETE, "", 0)));
+                ContainerState.COMPLETE, "", 0), anyNodeId));
     testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
-        diagnostics, 0, false);
+        diagnostics, 1, false);
   }
 
   // While attempt is at FINAL_SAVING, Contaienr_Finished event may come before
@@ -1195,15 +1216,16 @@ public class TestRMAppAttemptTransitions {
     assertEquals(YarnApplicationAttemptState.RUNNING,
         applicationAttempt.createApplicationAttemptState());
     // Container_finished event comes before Attempt_Saved event.
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
       applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
-        amContainer.getId(), ContainerState.COMPLETE, "", 0)));
+        amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId));
     assertEquals(RMAppAttemptState.FINAL_SAVING,
       applicationAttempt.getAppAttemptState());
     // send attempt_saved
     sendAttemptUpdateSavedEvent(applicationAttempt);
     testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
-      diagnostics, 0, false);
+      diagnostics, 1, false);
   }
 
   // While attempt is at FINAL_SAVING, Expire event may come before
@@ -1235,6 +1257,71 @@ public class TestRMAppAttemptTransitions {
       diagnostics, 0, false);
   }
 
+  @Test
+  public void testFinishedContainer() {
+    Container amContainer = allocateApplicationAttempt();
+    launchApplicationAttempt(amContainer);
+    runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
+
+    // Complete one container
+    ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt
+        .getAppAttemptId(), 2);
+    Container container1 = mock(Container.class);
+    ContainerStatus containerStatus1 = mock(ContainerStatus.class);
+    when(container1.getId()).thenReturn(
+        containerId1);
+    when(containerStatus1.getContainerId()).thenReturn(containerId1);
+    when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
+
+    application.handle(new RMAppRunningOnNodeEvent(application
+        .getApplicationId(),
+        container1.getNodeId()));
+    applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
+        applicationAttempt.getAppAttemptId(), containerStatus1,
+        container1.getNodeId()));
+
+    ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor =
+        ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class);
+
+    // Verify justFinishedContainers
+    Assert.assertEquals(1, applicationAttempt.getJustFinishedContainers()
+        .size());
+    Assert.assertEquals(container1.getId(), applicationAttempt
+        .getJustFinishedContainers().get(0).getContainerId());
+    Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt)
+        .size());
+
+    // Verify finishedContainersSentToAM gets container after pull
+    List<ContainerStatus> containerStatuses = applicationAttempt
+        .pullJustFinishedContainers();
+    Assert.assertEquals(1, containerStatuses.size());
+    Mockito.verify(rmnodeEventHandler, never()).handle(Mockito
+        .any(RMNodeEvent.class));
+    
Assert.assertTrue(applicationAttempt.getJustFinishedContainers().isEmpty());
+    Assert.assertEquals(1, getFinishedContainersSentToAM(applicationAttempt)
+        .size());
+
+    // Verify container is acked to NM via the RMNodeEvent after second pull
+    containerStatuses = applicationAttempt.pullJustFinishedContainers();
+    Assert.assertEquals(0, containerStatuses.size());
+    Mockito.verify(rmnodeEventHandler).handle(captor.capture());
+    Assert.assertEquals(container1.getId(), captor.getValue().getContainers()
+        .get(0));
+    
Assert.assertTrue(applicationAttempt.getJustFinishedContainers().isEmpty());
+    Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt)
+        .size());
+  }
+
+  private static List<ContainerStatus> getFinishedContainersSentToAM(
+      RMAppAttempt applicationAttempt) {
+    List<ContainerStatus> containers = new ArrayList<ContainerStatus>();
+    for (List<ContainerStatus> containerStatuses: applicationAttempt
+        .getFinishedContainersSentToAMReference().values()) {
+      containers.addAll(containerStatuses);
+    }
+    return containers;
+  }
+
   // this is to test user can get client tokens only after the client token
   // master key is saved in the state store and also registered in
   // ClientTokenSecretManager
@@ -1281,8 +1368,9 @@ public class TestRMAppAttemptTransitions {
         ContainerStatus.newInstance(amContainer.getId(),
           ContainerState.COMPLETE, "some error", 123);
     ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
-      appAttemptId, cs1));
+      appAttemptId, cs1, anyNodeId));
     assertEquals(YarnApplicationAttemptState.RUNNING,
         applicationAttempt.createApplicationAttemptState());
     sendAttemptUpdateSavedEvent(applicationAttempt);
@@ -1293,15 +1381,21 @@ public class TestRMAppAttemptTransitions {
     verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
 
     // failed attempt captured the container finished event.
-    assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
+    assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
     ContainerStatus cs2 =
         ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2),
           ContainerState.COMPLETE, "", 0);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
-      appAttemptId, cs2));
-    assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
-    assertEquals(cs2.getContainerId(), applicationAttempt
-      .getJustFinishedContainers().get(0).getContainerId());
+      appAttemptId, cs2, anyNodeId));
+    assertEquals(2, applicationAttempt.getJustFinishedContainers().size());
+    boolean found = false;
+    for (ContainerStatus containerStatus:applicationAttempt
+        .getJustFinishedContainers()) {
+      if (cs2.getContainerId().equals(containerStatus.getContainerId())) {
+        found = true;
+      }
+    }
+    assertTrue(found);
   }
 
 
@@ -1322,8 +1416,9 @@ public class TestRMAppAttemptTransitions {
         ContainerStatus.newInstance(amContainer.getId(),
           ContainerState.COMPLETE, "some error", 123);
     ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
-      appAttemptId, cs1));
+      appAttemptId, cs1, anyNodeId));
     assertEquals(YarnApplicationAttemptState.RUNNING,
         applicationAttempt.createApplicationAttemptState());
     sendAttemptUpdateSavedEvent(applicationAttempt);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
index b3dc35f..a0d5d84 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
@@ -161,7 +161,7 @@ public class TestAMRMTokens {
           .getEventHandler()
           .handle(
               new RMAppAttemptContainerFinishedEvent(applicationAttemptId,
-                  containerStatus));
+                  containerStatus, nm1.getNodeId()));
 
       // Make sure the RMAppAttempt is at Finished State.
       // Both AMRMToken and ClientToAMToken have been removed.

Reply via email to