TEZ-164. Fix handling of completed, running, pending tasks in AMContainer. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/9f0d4c3c Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/9f0d4c3c Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/9f0d4c3c Branch: refs/heads/master Commit: 9f0d4c3cb4a8b4b3b6ea89e2c0cb0dc85bc91545 Parents: a74b436 Author: Siddharth Seth <[email protected]> Authored: Wed May 29 16:35:40 2013 -0700 Committer: Siddharth Seth <[email protected]> Committed: Wed May 29 16:35:40 2013 -0700 ---------------------------------------------------------------------- .../tez/dag/app/rm/container/AMContainer.java | 3 +- .../tez/dag/app/rm/container/AMContainerImpl.java | 100 +++++++-------- .../tez/dag/app/rm/container/TestAMContainer.java | 93 ++++++------- 3 files changed, 93 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9f0d4c3c/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java index 3c00620..e62d033 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java @@ -30,8 +30,7 @@ public interface AMContainer extends EventHandler<AMContainerEvent>{ public AMContainerState getState(); public ContainerId getContainerId(); public Container getContainer(); - //TODO Rename - CompletedTaskAttempts, ideally means FAILED / KILLED as well. - public List<TezTaskAttemptID> getCompletedTaskAttempts(); + public List<TezTaskAttemptID> getAllTaskAttempts(); public TezTaskAttemptID getRunningTaskAttempt(); public List<TezTaskAttemptID> getQueuedTaskAttempts(); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9f0d4c3c/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index 51246dd..58e9d50 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -18,7 +18,6 @@ package org.apache.tez.dag.app.rm.container; -import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -48,13 +47,13 @@ import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.dag.event.DiagnosableEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated; import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate; import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed; import org.apache.tez.dag.app.rm.AMSchedulerEventContainerCompleted; import org.apache.tez.dag.app.rm.AMSchedulerEventDeallocateContainer; import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent; import org.apache.tez.dag.records.TezTaskAttemptID; +//import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate; @SuppressWarnings("rawtypes") public class AMContainerImpl implements AMContainer { @@ -222,10 +221,19 @@ public class AMContainerImpl implements AMContainer { } @Override - public List<TezTaskAttemptID> getCompletedTaskAttempts() { + public List<TezTaskAttemptID> getAllTaskAttempts() { readLock.lock(); try { - return new ArrayList<TezTaskAttemptID>(this.completedAttempts); + List<TezTaskAttemptID> allAttempts = new LinkedList<TezTaskAttemptID>(); + allAttempts.addAll(this.completedAttempts); + allAttempts.addAll(this.failedAssignments); + if (this.pendingAttempt != null) { + allAttempts.add(this.pendingAttempt); + } + if (this.runningAttempt != null) { + allAttempts.add(this.runningAttempt); + } + return allAttempts; } finally { readLock.unlock(); } @@ -359,6 +367,7 @@ public class AMContainerImpl implements AMContainer { public void transition(AMContainerImpl container, AMContainerEvent cEvent) { AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent; container.inError = true; + container.registerFailedAttempt(event.getTaskAttemptId()); container.maybeSendNodeFailureForFailedAssignment(event .getTaskAttemptId()); container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(), @@ -378,7 +387,6 @@ public class AMContainerImpl implements AMContainer { public void transition(AMContainerImpl container, AMContainerEvent cEvent) { AMContainerEventCompleted event = (AMContainerEventCompleted)cEvent; container.sendCompletedToScheduler(); - container.sendDiagUpdateOnContainerComplete(event); String diag = event.getContainerStatus().getDiagnostics(); if (!(diag == null || diag.equals(""))) { LOG.info("Container " + container.getContainerId() @@ -402,10 +410,11 @@ public class AMContainerImpl implements AMContainer { } } - protected static class NodeFailedAtAllocatedTransition implements - SingleArcTransition<AMContainerImpl, AMContainerEvent> { + protected static class NodeFailedAtAllocatedTransition extends + NodeFailedBaseTransition { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { + super.transition(container, cEvent); container.sendCompletedToScheduler(); container.deAllocate(); } @@ -414,6 +423,7 @@ public class AMContainerImpl implements AMContainer { protected static class ErrorTransition extends ErrorBaseTransition { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { + super.transition(container, cEvent); container.sendCompletedToScheduler(); container.deAllocate(); LOG.info( @@ -464,7 +474,7 @@ public class AMContainerImpl implements AMContainer { public void transition(AMContainerImpl container, AMContainerEvent cEvent) { if (container.pendingAttempt != null) { AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent; - container.sendTerminatingToTA(container.pendingAttempt, + container.sendTerminatingToTaskAttempt(container.pendingAttempt, event.getMessage()); } container.unregisterFromTAListener(); @@ -481,9 +491,10 @@ public class AMContainerImpl implements AMContainer { String errorMessage = getMessage(container, event); container.sendTerminatedToTaskAttempt(container.pendingAttempt, errorMessage); + container.registerFailedAttempt(container.pendingAttempt); + container.pendingAttempt = null; LOG.warn(errorMessage); } - container.sendDiagUpdateOnContainerComplete(event); container.unregisterFromTAListener(); container.sendCompletedToScheduler(); String diag = event.getContainerStatus().getDiagnostics(); @@ -507,7 +518,7 @@ public class AMContainerImpl implements AMContainer { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { if (container.pendingAttempt != null) { - container.sendTerminatingToTA(container.pendingAttempt, + container.sendTerminatingToTaskAttempt(container.pendingAttempt, getMessage(container, cEvent)); } container.unregisterFromTAListener(); @@ -541,12 +552,14 @@ public class AMContainerImpl implements AMContainer { } if (container.pendingAttempt != null) { + // Will be null in COMPLETED state. container.sendNodeFailureToTA(container.pendingAttempt, errorMessage); - container.sendTerminatingToTA(container.pendingAttempt, "Node failure"); + container.sendTerminatingToTaskAttempt(container.pendingAttempt, "Node failure"); } if (container.runningAttempt != null) { + // Will be null in COMPLETED state. container.sendNodeFailureToTA(container.runningAttempt, errorMessage); - container.sendTerminatingToTA(container.runningAttempt, "Node failure"); + container.sendTerminatingToTaskAttempt(container.runningAttempt, "Node failure"); } } } @@ -562,16 +575,17 @@ public class AMContainerImpl implements AMContainer { } protected static class ErrorAtLaunchingTransition - extends ErrorTransition { + extends ErrorBaseTransition { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { super.transition(container, cEvent); if (container.pendingAttempt != null) { - container.sendTerminatedToTaskAttempt(container.pendingAttempt, + container.sendTerminatingToTaskAttempt(container.pendingAttempt, "Container " + container.getContainerId() + " hit an invalid transition - " + cEvent.getType() + " at " + container.getState()); } + container.sendStopRequestToNM(); container.unregisterFromTAListener(); } } @@ -625,7 +639,6 @@ public class AMContainerImpl implements AMContainer { public void transition(AMContainerImpl container, AMContainerEvent cEvent) { super.transition(container, cEvent); container.unregisterFromContainerListener(); - container.sendDiagUpdateOnContainerComplete((AMContainerEventCompleted)cEvent); } @Override @@ -707,6 +720,8 @@ public class AMContainerImpl implements AMContainer { container.sendTerminatedToTaskAttempt(container.runningAttempt, getMessage(container, event)); container.unregisterAttemptFromListener(container.runningAttempt); + container.registerFailedAttempt(container.runningAttempt); + container.runningAttempt = null; super.transition(container, cEvent); } } @@ -716,7 +731,7 @@ public class AMContainerImpl implements AMContainer { public void transition(AMContainerImpl container, AMContainerEvent cEvent) { container.unregisterAttemptFromListener(container.runningAttempt); - container.sendTerminatingToTA(container.runningAttempt, + container.sendTerminatingToTaskAttempt(container.runningAttempt, " Container" + container.getContainerId() + " received a STOP_REQUEST"); super.transition(container, cEvent); @@ -749,7 +764,7 @@ public class AMContainerImpl implements AMContainer { public void transition(AMContainerImpl container, AMContainerEvent cEvent) { super.transition(container, cEvent); container.unregisterAttemptFromListener(container.runningAttempt); - container.sendTerminatedToTaskAttempt(container.runningAttempt, + container.sendTerminatingToTaskAttempt(container.runningAttempt, "Container " + container.getContainerId() + " hit an invalid transition - " + cEvent.getType() + " at " + container.getState()); @@ -766,8 +781,8 @@ public class AMContainerImpl implements AMContainer { " cannot be allocated to container: " + container.getContainerId() + " in " + container.getState() + " state"; container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId()); - container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage); - container.registerFailedTAAssignment(event.getTaskAttemptId()); + container.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage); + container.registerFailedAttempt(event.getTaskAttemptId()); } } @@ -788,14 +803,18 @@ public class AMContainerImpl implements AMContainer { public void transition(AMContainerImpl container, AMContainerEvent cEvent) { AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent; String diag = event.getContainerStatus().getDiagnostics(); + for (TezTaskAttemptID taId : container.failedAssignments) { + container.sendTerminatedToTaskAttempt(taId, diag); + } if (container.pendingAttempt != null) { container.sendTerminatedToTaskAttempt(container.pendingAttempt, diag); + container.registerFailedAttempt(container.pendingAttempt); + container.pendingAttempt = null; } if (container.runningAttempt != null) { container.sendTerminatedToTaskAttempt(container.runningAttempt, diag); - } - for (TezTaskAttemptID taId : container.failedAssignments) { - container.sendTerminatedToTaskAttempt(taId, diag); + container.registerFailedAttempt(container.runningAttempt); + container.runningAttempt = null; } if (!(diag == null || diag.equals(""))) { LOG.info("Container " + container.getContainerId() @@ -822,10 +841,9 @@ public class AMContainerImpl implements AMContainer { } protected static class ErrorAtNMStopRequestedTransition - extends ErrorAtStoppingTransition { + extends ErrorBaseTransition { public void transition(AMContainerImpl container, AMContainerEvent cEvent) { super.transition(container, cEvent); - container.deAllocate(); } } @@ -833,15 +851,6 @@ public class AMContainerImpl implements AMContainer { extends ErrorBaseTransition { public void transition(AMContainerImpl container, AMContainerEvent cEvent) { super.transition(container, cEvent); - if (container.pendingAttempt != null) { - container.sendTerminatedToTaskAttempt(container.pendingAttempt, null); - } - if (container.runningAttempt != null) { - container.sendTerminatedToTaskAttempt(container.runningAttempt, null); - } - for (TezTaskAttemptID taId : container.failedAssignments) { - container.sendTerminatedToTaskAttempt(taId, null); - } container.sendCompletedToScheduler(); } } @@ -873,7 +882,7 @@ public class AMContainerImpl implements AMContainer { container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId()); container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(), errorMessage); - container.registerFailedTAAssignment(event.getTaskAttemptId()); + container.registerFailedAttempt(event.getTaskAttemptId()); } } @@ -886,17 +895,16 @@ public class AMContainerImpl implements AMContainer { ". Attempts: " + currentTaId + ", " + event.getTaskAttemptId() + ". Current state: " + this.getState(); this.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId()); - this.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage); - this.sendTerminatingToTA(currentTaId, errorMessage); - this.registerFailedTAAssignment(event.getTaskAttemptId()); + this.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage); + this.sendTerminatingToTaskAttempt(currentTaId, errorMessage); + this.registerFailedAttempt(event.getTaskAttemptId()); LOG.warn(errorMessage); this.sendStopRequestToNM(); this.unregisterFromTAListener(); this.unregisterFromContainerListener(); } - - protected void registerFailedTAAssignment(TezTaskAttemptID taId) { + protected void registerFailedAttempt(TezTaskAttemptID taId) { failedAssignments.add(taId); } @@ -908,23 +916,13 @@ public class AMContainerImpl implements AMContainer { sendEvent(new AMSchedulerEventContainerCompleted(containerId)); } - protected void sendDiagUpdateOnContainerComplete( - AMContainerEventCompleted cEvent) { - String diag = cEvent.getContainerStatus().getDiagnostics(); - if (pendingAttempt != null) { - sendEvent(new TaskAttemptEventDiagnosticsUpdate(pendingAttempt, diag)); - } - if (runningAttempt != null) { - sendEvent(new TaskAttemptEventDiagnosticsUpdate(runningAttempt, diag)); - } - } - protected void sendTerminatedToTaskAttempt( TezTaskAttemptID taId, String message) { sendEvent(new TaskAttemptEventContainerTerminated(taId, message)); } - protected void sendTerminatingToTA(TezTaskAttemptID taId, String message) { + protected void sendTerminatingToTaskAttempt(TezTaskAttemptID taId, + String message) { sendEvent(new TaskAttemptEventContainerTerminating(taId, message)); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9f0d4c3c/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index 574a2c5..4541a2e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -67,7 +67,6 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.engine.common.security.JobTokenIdentifier; -import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -113,7 +112,7 @@ public class TestAMContainer { .getTaskAttemptId()); assertEquals(wc.taskAttemptID, wc.amContainer.getRunningTaskAttempt()); assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); - + wc.taskAttemptSucceeded(wc.taskAttemptID); wc.verifyState(AMContainerState.IDLE); wc.verifyNoOutgoingEvents(); @@ -127,7 +126,7 @@ public class TestAMContainer { verify(wc.tal).unregisterRunningContainer(wc.containerID); verify(wc.chh).unregister(wc.containerID); - assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size()); + assertEquals(1, wc.amContainer.getAllTaskAttempts().size()); assertFalse(wc.amContainer.isInErrorState()); } @@ -183,7 +182,7 @@ public class TestAMContainer { verify(wc.tal).unregisterRunningContainer(wc.containerID); verify(wc.chh).unregister(wc.containerID); - assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size()); + assertEquals(1, wc.amContainer.getAllTaskAttempts().size()); assertFalse(wc.amContainer.isInErrorState()); } @@ -219,7 +218,7 @@ public class TestAMContainer { assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); assertNull(wc.amContainer.getRunningTaskAttempt()); - assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size()); + assertEquals(1, wc.amContainer.getAllTaskAttempts().size()); assertFalse(wc.amContainer.isInErrorState()); } @@ -258,7 +257,7 @@ public class TestAMContainer { assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); assertNull(wc.amContainer.getRunningTaskAttempt()); - assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size()); + assertEquals(1, wc.amContainer.getAllTaskAttempts().size()); assertFalse(wc.amContainer.isInErrorState()); } @@ -297,8 +296,8 @@ public class TestAMContainer { AMSchedulerEventType.S_CONTAINER_COMPLETED); assertNull(wc.amContainer.getRunningTaskAttempt()); -// assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly. -// assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly. + assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); + assertEquals(2, wc.amContainer.getAllTaskAttempts().size()); } @SuppressWarnings("rawtypes") @@ -306,16 +305,16 @@ public class TestAMContainer { public void testAllocationAtRunning() { WrappedContainer wc = new WrappedContainer(); List<Event> outgoingEvents; - + wc.launchContainer(); wc.containerLaunched(); wc.assignTaskAttempt(wc.taskAttemptID); wc.pullTaskToRun(); wc.verifyState(AMContainerState.RUNNING); - + TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2); wc.assignTaskAttempt(taID2); - + wc.verifyState(AMContainerState.STOP_REQUESTED); verify(wc.tal).unregisterRunningContainer(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -326,7 +325,7 @@ public class TestAMContainer { TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATING); assertTrue(wc.amContainer.isInErrorState()); - + wc.nmStopSent(); wc.containerCompleted(); // 1 Inform scheduler. 2 TERMINATED to TaskAttempt. @@ -335,10 +334,10 @@ public class TestAMContainer { TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, AMSchedulerEventType.S_CONTAINER_COMPLETED); - -// assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO Set/Unset properly. -// assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly. -// assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly. + + assertNull(wc.amContainer.getRunningTaskAttempt()); + assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); + assertEquals(2, wc.amContainer.getAllTaskAttempts().size()); } @SuppressWarnings("rawtypes") @@ -346,15 +345,15 @@ public class TestAMContainer { public void testMultipleAllocationsAtLaunching() { WrappedContainer wc = new WrappedContainer(); List<Event> outgoingEvents; - + wc.launchContainer(); wc.assignTaskAttempt(wc.taskAttemptID); wc.pullTaskToRun(); wc.verifyState(AMContainerState.LAUNCHING); - + TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2); wc.assignTaskAttempt(taID2); - + wc.verifyState(AMContainerState.STOP_REQUESTED); verify(wc.tal).unregisterRunningContainer(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -365,7 +364,7 @@ public class TestAMContainer { TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATING); assertTrue(wc.amContainer.isInErrorState()); - + wc.nmStopSent(); wc.containerCompleted(); // 1 Inform scheduler. 2 TERMINATED to TaskAttempt. @@ -374,10 +373,10 @@ public class TestAMContainer { TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, AMSchedulerEventType.S_CONTAINER_COMPLETED); - -// assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO Set/Unset properly. -// assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly. -// assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly. + + assertNull(wc.amContainer.getRunningTaskAttempt()); + assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); + assertEquals(2, wc.amContainer.getAllTaskAttempts().size()); } @SuppressWarnings("rawtypes") @@ -385,13 +384,13 @@ public class TestAMContainer { public void testContainerTimedOutAtRunning() { WrappedContainer wc = new WrappedContainer(); List<Event> outgoingEvents; - + wc.launchContainer(); wc.containerLaunched(); wc.assignTaskAttempt(wc.taskAttemptID); wc.pullTaskToRun(); wc.verifyState(AMContainerState.RUNNING); - + wc.containerTimedOut(); wc.verifyState(AMContainerState.STOP_REQUESTED); verify(wc.tal).unregisterRunningContainer(wc.containerID); @@ -402,18 +401,18 @@ public class TestAMContainer { TaskAttemptEventType.TA_CONTAINER_TERMINATING, NMCommunicatorEventType.CONTAINER_STOP_REQUEST); // TODO Should this be an RM DE-ALLOCATE instead ? - + wc.containerCompleted(); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); verifyUnOrderedOutgoingEventTypes(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATED, AMSchedulerEventType.S_CONTAINER_COMPLETED); - + assertFalse(wc.amContainer.isInErrorState()); - -// assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO Set/Unset properly. -// assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly. -// assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly. + + assertNull(wc.amContainer.getRunningTaskAttempt()); + assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); + assertEquals(1, wc.amContainer.getAllTaskAttempts().size()); } @SuppressWarnings("rawtypes") @@ -461,7 +460,6 @@ public class TestAMContainer { assertFalse(wc.amContainer.isInErrorState()); } - @Ignore @SuppressWarnings("rawtypes") @Test // Verify that incoming NM launched events to COMPLETED containers are @@ -484,7 +482,6 @@ public class TestAMContainer { verifyUnOrderedOutgoingEventTypes(outgoingEvents, AMSchedulerEventType.S_CONTAINER_COMPLETED, TaskAttemptEventType.TA_CONTAINER_TERMINATED); - // TODO Failing because of an extra diagnostic event. assertFalse(wc.amContainer.isInErrorState()); @@ -495,7 +492,6 @@ public class TestAMContainer { assertFalse(wc.amContainer.isInErrorState()); } - @Ignore @SuppressWarnings("rawtypes") @Test public void testContainerCompletedAtIdle() { @@ -519,7 +515,6 @@ public class TestAMContainer { verifyUnOrderedOutgoingEventTypes(outgoingEvents, AMSchedulerEventType.S_CONTAINER_COMPLETED, TaskAttemptEventType.TA_CONTAINER_TERMINATED); - // TODO Failing because of two extra diagnostic event. assertFalse(wc.amContainer.isInErrorState()); @@ -531,8 +526,7 @@ public class TestAMContainer { assertFalse(wc.amContainer.isInErrorState()); } - - @Ignore + @SuppressWarnings("rawtypes") @Test public void testContainerCompletedAtRunning() { @@ -557,7 +551,6 @@ public class TestAMContainer { verifyUnOrderedOutgoingEventTypes(outgoingEvents, AMSchedulerEventType.S_CONTAINER_COMPLETED, TaskAttemptEventType.TA_CONTAINER_TERMINATED); - // TODO Failing because of two extra diagnostic event. assertFalse(wc.amContainer.isInErrorState()); @@ -693,7 +686,7 @@ public class TestAMContainer { assertNull(wc.amContainer.getRunningTaskAttempt()); assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); - assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); + assertEquals(2, wc.amContainer.getAllTaskAttempts().size()); } @SuppressWarnings("rawtypes") @@ -701,18 +694,18 @@ public class TestAMContainer { public void testNodeFailedAtRunningMultipleAttempts() { WrappedContainer wc = new WrappedContainer(); List<Event> outgoingEvents; - + wc.launchContainer(); wc.containerLaunched(); wc.assignTaskAttempt(wc.taskAttemptID); wc.pullTaskToRun(); wc.taskAttemptSucceeded(wc.taskAttemptID); - + TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2); wc.assignTaskAttempt(taID2); wc.pullTaskToRun(); wc.verifyState(AMContainerState.RUNNING); - + wc.nodeFailed(); // Expecting a complete event from the RM wc.verifyState(AMContainerState.STOPPING); @@ -722,14 +715,14 @@ public class TestAMContainer { TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, AMSchedulerEventType.S_CONTAINER_DEALLOCATE); - + for (Event event : outgoingEvents) { if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) { TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event; assertEquals("nodeFailed", nfEvent.getDiagnosticInfo()); } } - + wc.containerCompleted(); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -737,11 +730,11 @@ public class TestAMContainer { AMSchedulerEventType.S_CONTAINER_COMPLETED); assertFalse(wc.amContainer.isInErrorState()); -// assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO. Set/Unset properly. -// assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly. -// assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly. + assertNull(wc.amContainer.getRunningTaskAttempt()); + assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); + assertEquals(2, wc.amContainer.getAllTaskAttempts().size()); } - + @SuppressWarnings("rawtypes") @Test public void testNodeFailedAtCompletedMultipleSuccessfulTAs() { @@ -771,7 +764,7 @@ public class TestAMContainer { assertNull(wc.amContainer.getRunningTaskAttempt()); assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); - assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); + assertEquals(2, wc.amContainer.getAllTaskAttempts().size()); } @SuppressWarnings("rawtypes")
