TEZ-164. Fix handling of completed, running, pending tasks in
AMContainer. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/9f0d4c3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/9f0d4c3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/9f0d4c3c

Branch: refs/heads/master
Commit: 9f0d4c3cb4a8b4b3b6ea89e2c0cb0dc85bc91545
Parents: a74b436
Author: Siddharth Seth <[email protected]>
Authored: Wed May 29 16:35:40 2013 -0700
Committer: Siddharth Seth <[email protected]>
Committed: Wed May 29 16:35:40 2013 -0700

----------------------------------------------------------------------
 .../tez/dag/app/rm/container/AMContainer.java      |    3 +-
 .../tez/dag/app/rm/container/AMContainerImpl.java  |  100 +++++++--------
 .../tez/dag/app/rm/container/TestAMContainer.java  |   93 ++++++-------
 3 files changed, 93 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9f0d4c3c/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
index 3c00620..e62d033 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
@@ -30,8 +30,7 @@ public interface AMContainer extends 
EventHandler<AMContainerEvent>{
   public AMContainerState getState();
   public ContainerId getContainerId();
   public Container getContainer();
-  //TODO Rename - CompletedTaskAttempts, ideally means FAILED / KILLED as well.
-  public List<TezTaskAttemptID> getCompletedTaskAttempts();
+  public List<TezTaskAttemptID> getAllTaskAttempts();
   public TezTaskAttemptID getRunningTaskAttempt();
   public List<TezTaskAttemptID> getQueuedTaskAttempts();
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9f0d4c3c/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 51246dd..58e9d50 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -18,7 +18,6 @@
 
 package org.apache.tez.dag.app.rm.container;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -48,13 +47,13 @@ import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
 import org.apache.tez.dag.app.rm.AMSchedulerEventContainerCompleted;
 import org.apache.tez.dag.app.rm.AMSchedulerEventDeallocateContainer;
 import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
 import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+//import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
 
 @SuppressWarnings("rawtypes")
 public class AMContainerImpl implements AMContainer {
@@ -222,10 +221,19 @@ public class AMContainerImpl implements AMContainer {
   }
 
   @Override
-  public List<TezTaskAttemptID> getCompletedTaskAttempts() {
+  public List<TezTaskAttemptID> getAllTaskAttempts() {
     readLock.lock();
     try {
-      return new ArrayList<TezTaskAttemptID>(this.completedAttempts);
+      List<TezTaskAttemptID> allAttempts = new LinkedList<TezTaskAttemptID>();
+      allAttempts.addAll(this.completedAttempts);
+      allAttempts.addAll(this.failedAssignments);
+      if (this.pendingAttempt != null) {
+        allAttempts.add(this.pendingAttempt);
+      }
+      if (this.runningAttempt != null) {
+        allAttempts.add(this.runningAttempt);
+      }
+      return allAttempts;
     } finally {
       readLock.unlock();
     }
@@ -359,6 +367,7 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
       container.inError = true;
+      container.registerFailedAttempt(event.getTaskAttemptId());
       container.maybeSendNodeFailureForFailedAssignment(event
           .getTaskAttemptId());
       container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
@@ -378,7 +387,6 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       AMContainerEventCompleted event = (AMContainerEventCompleted)cEvent;
       container.sendCompletedToScheduler();
-      container.sendDiagUpdateOnContainerComplete(event);
       String diag = event.getContainerStatus().getDiagnostics();
       if (!(diag == null || diag.equals(""))) {
         LOG.info("Container " + container.getContainerId()
@@ -402,10 +410,11 @@ public class AMContainerImpl implements AMContainer {
     }
   }
 
-  protected static class NodeFailedAtAllocatedTransition implements
-      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+  protected static class NodeFailedAtAllocatedTransition extends
+      NodeFailedBaseTransition {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
+      super.transition(container, cEvent);
       container.sendCompletedToScheduler();
       container.deAllocate();
     }
@@ -414,6 +423,7 @@ public class AMContainerImpl implements AMContainer {
   protected static class ErrorTransition extends ErrorBaseTransition {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
+      super.transition(container, cEvent);
       container.sendCompletedToScheduler();
       container.deAllocate();
       LOG.info(
@@ -464,7 +474,7 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       if (container.pendingAttempt != null) {
         AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) 
cEvent;
-        container.sendTerminatingToTA(container.pendingAttempt,
+        container.sendTerminatingToTaskAttempt(container.pendingAttempt,
             event.getMessage());
       }
       container.unregisterFromTAListener();
@@ -481,9 +491,10 @@ public class AMContainerImpl implements AMContainer {
         String errorMessage = getMessage(container, event);
         container.sendTerminatedToTaskAttempt(container.pendingAttempt,
             errorMessage);
+        container.registerFailedAttempt(container.pendingAttempt);
+        container.pendingAttempt = null;
         LOG.warn(errorMessage);
       }
-      container.sendDiagUpdateOnContainerComplete(event);
       container.unregisterFromTAListener();
       container.sendCompletedToScheduler();
       String diag = event.getContainerStatus().getDiagnostics();
@@ -507,7 +518,7 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       if (container.pendingAttempt != null) {
-        container.sendTerminatingToTA(container.pendingAttempt,
+        container.sendTerminatingToTaskAttempt(container.pendingAttempt,
             getMessage(container, cEvent));
       }
       container.unregisterFromTAListener();
@@ -541,12 +552,14 @@ public class AMContainerImpl implements AMContainer {
       }
 
       if (container.pendingAttempt != null) {
+        // Will be null in COMPLETED state.
         container.sendNodeFailureToTA(container.pendingAttempt, errorMessage);
-        container.sendTerminatingToTA(container.pendingAttempt, "Node 
failure");
+        container.sendTerminatingToTaskAttempt(container.pendingAttempt, "Node 
failure");
       }
       if (container.runningAttempt != null) {
+        // Will be null in COMPLETED state.
         container.sendNodeFailureToTA(container.runningAttempt, errorMessage);
-        container.sendTerminatingToTA(container.runningAttempt, "Node 
failure");
+        container.sendTerminatingToTaskAttempt(container.runningAttempt, "Node 
failure");
       }
     }
   }
@@ -562,16 +575,17 @@ public class AMContainerImpl implements AMContainer {
   }
 
   protected static class ErrorAtLaunchingTransition
-      extends ErrorTransition {
+      extends ErrorBaseTransition {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       super.transition(container, cEvent);
       if (container.pendingAttempt != null) {
-        container.sendTerminatedToTaskAttempt(container.pendingAttempt,
+        container.sendTerminatingToTaskAttempt(container.pendingAttempt, 
             "Container " + container.getContainerId() +
                 " hit an invalid transition - " + cEvent.getType() + " at " +
                 container.getState());
       }
+      container.sendStopRequestToNM();
       container.unregisterFromTAListener();
     }
   }
@@ -625,7 +639,6 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       super.transition(container, cEvent);
       container.unregisterFromContainerListener();
-      
container.sendDiagUpdateOnContainerComplete((AMContainerEventCompleted)cEvent);
     }
 
     @Override
@@ -707,6 +720,8 @@ public class AMContainerImpl implements AMContainer {
       container.sendTerminatedToTaskAttempt(container.runningAttempt,
           getMessage(container, event));
       container.unregisterAttemptFromListener(container.runningAttempt);
+      container.registerFailedAttempt(container.runningAttempt);
+      container.runningAttempt = null;
       super.transition(container, cEvent);
     }
   }
@@ -716,7 +731,7 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
 
       container.unregisterAttemptFromListener(container.runningAttempt);
-      container.sendTerminatingToTA(container.runningAttempt,
+      container.sendTerminatingToTaskAttempt(container.runningAttempt,
           " Container" + container.getContainerId() +
               " received a STOP_REQUEST");
       super.transition(container, cEvent);
@@ -749,7 +764,7 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       super.transition(container, cEvent);
       container.unregisterAttemptFromListener(container.runningAttempt);
-      container.sendTerminatedToTaskAttempt(container.runningAttempt,
+      container.sendTerminatingToTaskAttempt(container.runningAttempt,
           "Container " + container.getContainerId() +
               " hit an invalid transition - " + cEvent.getType() + " at " +
               container.getState());
@@ -766,8 +781,8 @@ public class AMContainerImpl implements AMContainer {
           " cannot be allocated to container: " + container.getContainerId() +
           " in " + container.getState() + " state";
       
container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
-      container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
-      container.registerFailedTAAssignment(event.getTaskAttemptId());
+      container.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), 
errorMessage);
+      container.registerFailedAttempt(event.getTaskAttemptId());
     }
   }
   
@@ -788,14 +803,18 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
       String diag = event.getContainerStatus().getDiagnostics();
+      for (TezTaskAttemptID taId : container.failedAssignments) {
+        container.sendTerminatedToTaskAttempt(taId, diag);
+      }
       if (container.pendingAttempt != null) {
         container.sendTerminatedToTaskAttempt(container.pendingAttempt, diag);
+        container.registerFailedAttempt(container.pendingAttempt);
+        container.pendingAttempt = null;
       }
       if (container.runningAttempt != null) {
         container.sendTerminatedToTaskAttempt(container.runningAttempt, diag);
-      }
-      for (TezTaskAttemptID taId : container.failedAssignments) {
-        container.sendTerminatedToTaskAttempt(taId, diag);
+        container.registerFailedAttempt(container.runningAttempt);
+        container.runningAttempt = null;
       }
       if (!(diag == null || diag.equals(""))) {
         LOG.info("Container " + container.getContainerId()
@@ -822,10 +841,9 @@ public class AMContainerImpl implements AMContainer {
   }
 
   protected static class ErrorAtNMStopRequestedTransition
-      extends ErrorAtStoppingTransition {
+      extends ErrorBaseTransition {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       super.transition(container, cEvent);
-      container.deAllocate();
     }
   }
 
@@ -833,15 +851,6 @@ public class AMContainerImpl implements AMContainer {
       extends ErrorBaseTransition {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       super.transition(container, cEvent);
-      if (container.pendingAttempt != null) {
-        container.sendTerminatedToTaskAttempt(container.pendingAttempt, null);
-      }
-      if (container.runningAttempt != null) {
-        container.sendTerminatedToTaskAttempt(container.runningAttempt, null);
-      }
-      for (TezTaskAttemptID taId : container.failedAssignments) {
-        container.sendTerminatedToTaskAttempt(taId, null);
-      }
       container.sendCompletedToScheduler();
     }
   }
@@ -873,7 +882,7 @@ public class AMContainerImpl implements AMContainer {
       
container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
       container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
           errorMessage);
-      container.registerFailedTAAssignment(event.getTaskAttemptId());
+      container.registerFailedAttempt(event.getTaskAttemptId());
     }
   }
 
@@ -886,17 +895,16 @@ public class AMContainerImpl implements AMContainer {
         ". Attempts: " + currentTaId + ", " + event.getTaskAttemptId() +
         ". Current state: " + this.getState();
     this.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
-    this.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
-    this.sendTerminatingToTA(currentTaId, errorMessage);
-    this.registerFailedTAAssignment(event.getTaskAttemptId());
+    this.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage);
+    this.sendTerminatingToTaskAttempt(currentTaId, errorMessage);
+    this.registerFailedAttempt(event.getTaskAttemptId());
     LOG.warn(errorMessage);
     this.sendStopRequestToNM();
     this.unregisterFromTAListener();
     this.unregisterFromContainerListener();
   }
 
-
-  protected void registerFailedTAAssignment(TezTaskAttemptID taId) {
+  protected void registerFailedAttempt(TezTaskAttemptID taId) {
     failedAssignments.add(taId);
   }
   
@@ -908,23 +916,13 @@ public class AMContainerImpl implements AMContainer {
     sendEvent(new AMSchedulerEventContainerCompleted(containerId));
   }
 
-  protected void sendDiagUpdateOnContainerComplete(
-      AMContainerEventCompleted cEvent) {
-    String diag = cEvent.getContainerStatus().getDiagnostics();
-    if (pendingAttempt != null) {
-      sendEvent(new TaskAttemptEventDiagnosticsUpdate(pendingAttempt, diag));
-    }
-    if (runningAttempt != null) {
-      sendEvent(new TaskAttemptEventDiagnosticsUpdate(runningAttempt, diag));
-    }
-  }
-
   protected void sendTerminatedToTaskAttempt(
       TezTaskAttemptID taId, String message) {
     sendEvent(new TaskAttemptEventContainerTerminated(taId, message));
   }
 
-  protected void sendTerminatingToTA(TezTaskAttemptID taId, String message) {
+  protected void sendTerminatingToTaskAttempt(TezTaskAttemptID taId,
+      String message) {
     sendEvent(new TaskAttemptEventContainerTerminating(taId, message));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9f0d4c3c/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 574a2c5..4541a2e 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -67,7 +67,6 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
@@ -113,7 +112,7 @@ public class TestAMContainer {
         .getTaskAttemptId());
     assertEquals(wc.taskAttemptID, wc.amContainer.getRunningTaskAttempt());
     assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
-    
+
     wc.taskAttemptSucceeded(wc.taskAttemptID);
     wc.verifyState(AMContainerState.IDLE);
     wc.verifyNoOutgoingEvents();
@@ -127,7 +126,7 @@ public class TestAMContainer {
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
-    assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size());
+    assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
     assertFalse(wc.amContainer.isInErrorState());
   }
   
@@ -183,7 +182,7 @@ public class TestAMContainer {
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
     
-    assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size());
+    assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
     assertFalse(wc.amContainer.isInErrorState());
   }
   
@@ -219,7 +218,7 @@ public class TestAMContainer {
 
     assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
     assertNull(wc.amContainer.getRunningTaskAttempt());
-    assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size());
+    assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
     assertFalse(wc.amContainer.isInErrorState());
   }
   
@@ -258,7 +257,7 @@ public class TestAMContainer {
 
     assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
     assertNull(wc.amContainer.getRunningTaskAttempt());
-    assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size());
+    assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
     assertFalse(wc.amContainer.isInErrorState());
   }
   
@@ -297,8 +296,8 @@ public class TestAMContainer {
         AMSchedulerEventType.S_CONTAINER_COMPLETED);
     
     assertNull(wc.amContainer.getRunningTaskAttempt());
-//    assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. 
Set/Unset properly.
-//    assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // 
TODO. Set/Unset properly.
+    assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
+    assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
   }
   
   @SuppressWarnings("rawtypes")
@@ -306,16 +305,16 @@ public class TestAMContainer {
   public void testAllocationAtRunning() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
-    
+
     wc.launchContainer();
     wc.containerLaunched();
     wc.assignTaskAttempt(wc.taskAttemptID);
     wc.pullTaskToRun();
     wc.verifyState(AMContainerState.RUNNING);
-    
+
     TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
     wc.assignTaskAttempt(taID2);
-    
+
     wc.verifyState(AMContainerState.STOP_REQUESTED);
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
@@ -326,7 +325,7 @@ public class TestAMContainer {
         TaskAttemptEventType.TA_CONTAINER_TERMINATING,
         TaskAttemptEventType.TA_CONTAINER_TERMINATING);
     assertTrue(wc.amContainer.isInErrorState());
-    
+
     wc.nmStopSent();
     wc.containerCompleted();
     // 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
@@ -335,10 +334,10 @@ public class TestAMContainer {
         TaskAttemptEventType.TA_CONTAINER_TERMINATED,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED,
         AMSchedulerEventType.S_CONTAINER_COMPLETED);
-    
-//  assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO Set/Unset 
properly.
-//  assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. 
Set/Unset properly.
-//  assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // 
TODO. Set/Unset properly.
+
+    assertNull(wc.amContainer.getRunningTaskAttempt());
+    assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
+    assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
   }
   
   @SuppressWarnings("rawtypes")
@@ -346,15 +345,15 @@ public class TestAMContainer {
   public void testMultipleAllocationsAtLaunching() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
-    
+
     wc.launchContainer();
     wc.assignTaskAttempt(wc.taskAttemptID);
     wc.pullTaskToRun();
     wc.verifyState(AMContainerState.LAUNCHING);
-    
+
     TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
     wc.assignTaskAttempt(taID2);
-    
+
     wc.verifyState(AMContainerState.STOP_REQUESTED);
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
@@ -365,7 +364,7 @@ public class TestAMContainer {
         TaskAttemptEventType.TA_CONTAINER_TERMINATING,
         TaskAttemptEventType.TA_CONTAINER_TERMINATING);
     assertTrue(wc.amContainer.isInErrorState());
-    
+
     wc.nmStopSent();
     wc.containerCompleted();
     // 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
@@ -374,10 +373,10 @@ public class TestAMContainer {
         TaskAttemptEventType.TA_CONTAINER_TERMINATED,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED,
         AMSchedulerEventType.S_CONTAINER_COMPLETED);
-    
-//  assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO Set/Unset 
properly.
-//  assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. 
Set/Unset properly.
-//  assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // 
TODO. Set/Unset properly.
+
+    assertNull(wc.amContainer.getRunningTaskAttempt());
+    assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
+    assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
   }
 
   @SuppressWarnings("rawtypes")
@@ -385,13 +384,13 @@ public class TestAMContainer {
   public void testContainerTimedOutAtRunning() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
-    
+
     wc.launchContainer();
     wc.containerLaunched();
     wc.assignTaskAttempt(wc.taskAttemptID);
     wc.pullTaskToRun();
     wc.verifyState(AMContainerState.RUNNING);
-    
+
     wc.containerTimedOut();
     wc.verifyState(AMContainerState.STOP_REQUESTED);
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -402,18 +401,18 @@ public class TestAMContainer {
         TaskAttemptEventType.TA_CONTAINER_TERMINATING,
         NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
     // TODO Should this be an RM DE-ALLOCATE instead ?
-    
+
     wc.containerCompleted();
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED,
         AMSchedulerEventType.S_CONTAINER_COMPLETED);
-    
+
     assertFalse(wc.amContainer.isInErrorState());
-    
-//  assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO Set/Unset 
properly.
-//  assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. 
Set/Unset properly.
-//  assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size()); // 
TODO. Set/Unset properly.
+
+    assertNull(wc.amContainer.getRunningTaskAttempt());
+    assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
+    assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
   }
 
   @SuppressWarnings("rawtypes")
@@ -461,7 +460,6 @@ public class TestAMContainer {
     assertFalse(wc.amContainer.isInErrorState());
   }
 
-  @Ignore
   @SuppressWarnings("rawtypes")
   @Test
   // Verify that incoming NM launched events to COMPLETED containers are
@@ -484,7 +482,6 @@ public class TestAMContainer {
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         AMSchedulerEventType.S_CONTAINER_COMPLETED,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED);
-    // TODO Failing because of an extra diagnostic event.
     
     assertFalse(wc.amContainer.isInErrorState());
     
@@ -495,7 +492,6 @@ public class TestAMContainer {
     assertFalse(wc.amContainer.isInErrorState());
   }
 
-  @Ignore
   @SuppressWarnings("rawtypes")
   @Test
   public void testContainerCompletedAtIdle() {
@@ -519,7 +515,6 @@ public class TestAMContainer {
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         AMSchedulerEventType.S_CONTAINER_COMPLETED,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED);
-    // TODO Failing because of two extra diagnostic event.
     
     assertFalse(wc.amContainer.isInErrorState());
     
@@ -531,8 +526,7 @@ public class TestAMContainer {
     
     assertFalse(wc.amContainer.isInErrorState());
   }
-  
-  @Ignore
+
   @SuppressWarnings("rawtypes")
   @Test
   public void testContainerCompletedAtRunning() {
@@ -557,7 +551,6 @@ public class TestAMContainer {
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         AMSchedulerEventType.S_CONTAINER_COMPLETED,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED);
-    // TODO Failing because of two extra diagnostic event.
     
     assertFalse(wc.amContainer.isInErrorState());
     
@@ -693,7 +686,7 @@ public class TestAMContainer {
     
     assertNull(wc.amContainer.getRunningTaskAttempt());
     assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
-    assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size());
+    assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
   }
   
   @SuppressWarnings("rawtypes")
@@ -701,18 +694,18 @@ public class TestAMContainer {
   public void testNodeFailedAtRunningMultipleAttempts() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
-    
+
     wc.launchContainer();
     wc.containerLaunched();
     wc.assignTaskAttempt(wc.taskAttemptID);
     wc.pullTaskToRun();
     wc.taskAttemptSucceeded(wc.taskAttemptID);
-    
+
     TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
     wc.assignTaskAttempt(taID2);
     wc.pullTaskToRun();
     wc.verifyState(AMContainerState.RUNNING);
-    
+
     wc.nodeFailed();
     // Expecting a complete event from the RM
     wc.verifyState(AMContainerState.STOPPING);
@@ -722,14 +715,14 @@ public class TestAMContainer {
         TaskAttemptEventType.TA_NODE_FAILED,
         TaskAttemptEventType.TA_CONTAINER_TERMINATING,
         AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
-    
+
     for (Event event : outgoingEvents) {
       if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
         TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) 
event;
         assertEquals("nodeFailed", nfEvent.getDiagnosticInfo());
       }
     }
-    
+
     wc.containerCompleted();
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -737,11 +730,11 @@ public class TestAMContainer {
         AMSchedulerEventType.S_CONTAINER_COMPLETED);
 
     assertFalse(wc.amContainer.isInErrorState());
-//    assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO. Set/Unset 
properly.
-//    assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. 
Set/Unset properly.
-//    assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // 
TODO. Set/Unset properly.
+    assertNull(wc.amContainer.getRunningTaskAttempt());
+    assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
+    assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
   }
-  
+
   @SuppressWarnings("rawtypes")
   @Test
   public void testNodeFailedAtCompletedMultipleSuccessfulTAs() {
@@ -771,7 +764,7 @@ public class TestAMContainer {
     
     assertNull(wc.amContainer.getRunningTaskAttempt());
     assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
-    assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size());
+    assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
   }
 
   @SuppressWarnings("rawtypes")

Reply via email to