TEZ-113. Add unit tests for AMContainer. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/a74b4367 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/a74b4367 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/a74b4367 Branch: refs/heads/master Commit: a74b43673dddf2f9d488f70b3bc337577b1c8dfa Parents: 2340884 Author: Siddharth Seth <[email protected]> Authored: Wed May 29 14:18:00 2013 -0700 Committer: Siddharth Seth <[email protected]> Committed: Wed May 29 14:18:00 2013 -0700 ---------------------------------------------------------------------- .../java/org/apache/tez/dag/app/AppContext.java | 2 +- .../tez/dag/app/TaskAttemptListenerImpTezDag.java | 4 +- .../dag/app/rm/container/AMContainerHelpers.java | 1 - .../tez/dag/app/rm/container/AMContainerImpl.java | 19 +- .../tez/dag/app/rm/container/TestAMContainer.java | 1008 +++++++++++++++ 5 files changed, 1024 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a74b4367/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java index 041de56..03e1059 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java @@ -53,7 +53,7 @@ public interface AppContext { long getStartTime(); - CharSequence getUser(); + String getUser(); DAG getDAG(); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a74b4367/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index d974cd8..7600db4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -504,9 +504,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements return COMPLETION_RESPONSE_NO_WAIT; } - - - // TODO EVENTUALLY remove all mrv2 ids. + @Override public void unregisterTaskAttempt(TezTaskAttemptID attemptId) { attemptToContainerIdMap.remove(attemptId); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a74b4367/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java index 6a9ae2e..357eeda 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java @@ -172,7 +172,6 @@ public class AMContainerHelpers { Map<String, String> myEnv = new HashMap<String, String>(env.size()); myEnv.putAll(env); myEnv.putAll(vertexEnv); - // TODO TEZ-38 MRChildJVM2.setEnv should become a no-op // Set up the launch command List<String> commands = TezEngineChildJVM.getVMCommand( http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a74b4367/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index 1089013..51246dd 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -54,8 +54,6 @@ import org.apache.tez.dag.app.rm.AMSchedulerEventContainerCompleted; import org.apache.tez.dag.app.rm.AMSchedulerEventDeallocateContainer; import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent; -import org.apache.tez.dag.history.DAGHistoryEvent; -import org.apache.tez.dag.history.events.ContainerLaunchedEvent; import org.apache.tez.dag.records.TezTaskAttemptID; @SuppressWarnings("rawtypes") @@ -156,7 +154,7 @@ public class AMContainerImpl implements AMContainer { .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, new TimedOutAtRunningTransition()) .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtRunningTransition()) .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), new ErrorAtRunningTransition()) - + .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition()) .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition()) .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition()) @@ -168,11 +166,12 @@ public class AMContainerImpl implements AMContainer { .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition()) .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition()) + // TODO This transition is wrong. Should be a noop / error. .addTransition(AMContainerState.STOPPING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition()) .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition()) .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT)) .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_REQUEST, new ErrorAtStoppingTransition()) - + .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtCompletedTransition()) .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition()) .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition()) @@ -236,7 +235,11 @@ public class AMContainerImpl implements AMContainer { public List<TezTaskAttemptID> getQueuedTaskAttempts() { readLock.lock(); try { - return Collections.singletonList(this.pendingAttempt); + if (pendingAttempt != null) { + return Collections.singletonList(this.pendingAttempt); + } else { + return Collections.emptyList(); + } } finally { readLock.unlock(); } @@ -261,6 +264,10 @@ public class AMContainerImpl implements AMContainer { readLock.unlock(); } } + + public boolean isInErrorState() { + return inError; + } @Override public void handle(AMContainerEvent event) { @@ -333,6 +340,8 @@ public class AMContainerImpl implements AMContainer { container.taskAttemptListener, event.getCredentials(), event.shouldProfile(), container.appContext); + // Registering now, so that in case of delayed NM response, the child + // task is not told to die since the TAL does not know about the container. container.registerWithTAListener(); container.sendStartRequestToNM(); LOG.info("Sending Launch Request for Container with id: " + http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a74b4367/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java new file mode 100644 index 0000000..574a2c5 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -0,0 +1,1008 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.rm.container; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.tez.common.TezTaskContext; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.ContainerHeartbeatHandler; +import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; +import org.apache.tez.dag.app.rm.AMSchedulerEventType; +import org.apache.tez.dag.app.rm.NMCommunicatorEventType; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.engine.common.security.JobTokenIdentifier; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +public class TestAMContainer { + + + @Test + // Assign before launch. + public void tetSingleSuccessfulTaskFlow() { + WrappedContainer wc = new WrappedContainer(); + + wc.verifyState(AMContainerState.ALLOCATED); + + // Launch request. + wc.launchContainer(); + wc.verifyState(AMContainerState.LAUNCHING); + // 1 Launch request. + wc.verifyCountAndGetOutgoingEvents(1); + + // Assign task. + wc.assignTaskAttempt(wc.taskAttemptID); + wc.verifyState(AMContainerState.LAUNCHING); + wc.verifyNoOutgoingEvents(); + assertEquals(wc.taskAttemptID, wc.amContainer.getQueuedTaskAttempts() + .get(0)); + + // Container Launched + wc.containerLaunched(); + wc.verifyState(AMContainerState.IDLE); + wc.verifyNoOutgoingEvents(); + assertEquals(wc.taskAttemptID, wc.amContainer.getQueuedTaskAttempts() + .get(0)); + assertNull(wc.amContainer.getRunningTaskAttempt()); + verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.chh).register(wc.containerID); + + // Pull TA + AMContainerTask pulledTask = wc.pullTaskToRun(); + wc.verifyState(AMContainerState.RUNNING); + wc.verifyNoOutgoingEvents(); + assertFalse(pulledTask.shouldDie()); + assertEquals(wc.tezTaskContext.getTaskAttemptId(), pulledTask.getTask() + .getTaskAttemptId()); + assertEquals(wc.taskAttemptID, wc.amContainer.getRunningTaskAttempt()); + assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); + + wc.taskAttemptSucceeded(wc.taskAttemptID); + wc.verifyState(AMContainerState.IDLE); + wc.verifyNoOutgoingEvents(); + assertNull(wc.amContainer.getRunningTaskAttempt()); + verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID); + + wc.containerCompleted(); + wc.verifyState(AMContainerState.COMPLETED); + // 1 Scheduler completed event. + wc.verifyCountAndGetOutgoingEvents(1); + verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.chh).unregister(wc.containerID); + + assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size()); + assertFalse(wc.amContainer.isInErrorState()); + } + + @Test + // Assign after launch. + public void testSingleSuccessfulTaskFlow2() { + WrappedContainer wc = new WrappedContainer(); + + wc.verifyState(AMContainerState.ALLOCATED); + + // Launch request. + wc.launchContainer(); + wc.verifyState(AMContainerState.LAUNCHING); + // 1 Launch request. + wc.verifyCountAndGetOutgoingEvents(1); + + // Container Launched + wc.containerLaunched(); + wc.verifyState(AMContainerState.IDLE); + wc.verifyNoOutgoingEvents(); + assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); + verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.chh).register(wc.containerID); + + // Assign task. + wc.assignTaskAttempt(wc.taskAttemptID); + wc.verifyState(AMContainerState.IDLE); + wc.verifyNoOutgoingEvents(); + assertEquals(wc.taskAttemptID, wc.amContainer.getQueuedTaskAttempts() + .get(0)); + assertNull(wc.amContainer.getRunningTaskAttempt()); + + // Pull TA + AMContainerTask pulledTask = wc.pullTaskToRun(); + wc.verifyState(AMContainerState.RUNNING); + wc.verifyNoOutgoingEvents(); + assertFalse(pulledTask.shouldDie()); + assertEquals(wc.tezTaskContext.getTaskAttemptId(), pulledTask.getTask() + .getTaskAttemptId()); + assertEquals(wc.taskAttemptID, wc.amContainer.getRunningTaskAttempt()); + assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); + + wc.taskAttemptSucceeded(wc.taskAttemptID); + wc.verifyState(AMContainerState.IDLE); + wc.verifyNoOutgoingEvents(); + assertNull(wc.amContainer.getRunningTaskAttempt()); + verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID); + + wc.containerCompleted(); + wc.verifyState(AMContainerState.COMPLETED); + // 1 Scheduler completed event. + wc.verifyCountAndGetOutgoingEvents(1); + verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.chh).unregister(wc.containerID); + + assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size()); + assertFalse(wc.amContainer.isInErrorState()); + } + + @Test + public void testSingleSuccessfulTaskFlowStopRequest() { + WrappedContainer wc = new WrappedContainer(); + + wc.verifyState(AMContainerState.ALLOCATED); + + wc.launchContainer(); + wc.assignTaskAttempt(wc.taskAttemptID); + wc.containerLaunched(); + wc.pullTaskToRun(); + wc.taskAttemptSucceeded(wc.taskAttemptID); + + wc.stopRequest(); + wc.verifyState(AMContainerState.STOP_REQUESTED); + // Event to NM to stop the container. + wc.verifyCountAndGetOutgoingEvents(1); + assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() == + NMCommunicatorEventType.CONTAINER_STOP_REQUEST); + + wc.nmStopSent(); + wc.verifyState(AMContainerState.STOPPING); + wc.verifyNoOutgoingEvents(); + + wc.containerCompleted(); + wc.verifyState(AMContainerState.COMPLETED); + // 1 Scheduler completed event. + wc.verifyCountAndGetOutgoingEvents(1); + verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.chh).unregister(wc.containerID); + + assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); + assertNull(wc.amContainer.getRunningTaskAttempt()); + assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size()); + assertFalse(wc.amContainer.isInErrorState()); + } + + @Test + public void testSingleSuccessfulTaskFlowFailedNMStopRequest() { + WrappedContainer wc = new WrappedContainer(); + + wc.verifyState(AMContainerState.ALLOCATED); + + wc.launchContainer(); + wc.assignTaskAttempt(wc.taskAttemptID); + wc.containerLaunched(); + wc.pullTaskToRun(); + wc.taskAttemptSucceeded(wc.taskAttemptID); + + wc.stopRequest(); + wc.verifyState(AMContainerState.STOP_REQUESTED); + // Event to NM to stop the container. + wc.verifyCountAndGetOutgoingEvents(1); + assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() == + NMCommunicatorEventType.CONTAINER_STOP_REQUEST); + + wc.nmStopFailed(); + wc.verifyState(AMContainerState.STOPPING); + // Event to ask a RM container release. + wc.verifyCountAndGetOutgoingEvents(1); + assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() == + AMSchedulerEventType.S_CONTAINER_DEALLOCATE); + + wc.containerCompleted(); + wc.verifyState(AMContainerState.COMPLETED); + // 1 Scheduler completed event. + wc.verifyCountAndGetOutgoingEvents(1); + verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.chh).unregister(wc.containerID); + + assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); + assertNull(wc.amContainer.getRunningTaskAttempt()); + assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size()); + assertFalse(wc.amContainer.isInErrorState()); + } + + @SuppressWarnings("rawtypes") + @Test + public void testMultipleAllocationsAtIdle() { + WrappedContainer wc = new WrappedContainer(); + List<Event> outgoingEvents; + + wc.launchContainer(); + wc.containerLaunched(); + wc.assignTaskAttempt(wc.taskAttemptID); + wc.verifyState(AMContainerState.IDLE); + + TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2); + wc.assignTaskAttempt(taID2); + + wc.verifyState(AMContainerState.STOP_REQUESTED); + verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.chh).unregister(wc.containerID); + // 1 for NM stop request. 2 TERMINATING to TaskAttempt. + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + NMCommunicatorEventType.CONTAINER_STOP_REQUEST, + TaskAttemptEventType.TA_CONTAINER_TERMINATING, + TaskAttemptEventType.TA_CONTAINER_TERMINATING); + assertTrue(wc.amContainer.isInErrorState()); + + wc.nmStopSent(); + wc.containerCompleted(); + // 1 Inform scheduler. 2 TERMINATED to TaskAttempt. + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_CONTAINER_TERMINATED, + TaskAttemptEventType.TA_CONTAINER_TERMINATED, + AMSchedulerEventType.S_CONTAINER_COMPLETED); + + assertNull(wc.amContainer.getRunningTaskAttempt()); +// assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly. +// assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly. + } + + @SuppressWarnings("rawtypes") + @Test + public void testAllocationAtRunning() { + WrappedContainer wc = new WrappedContainer(); + List<Event> outgoingEvents; + + wc.launchContainer(); + wc.containerLaunched(); + wc.assignTaskAttempt(wc.taskAttemptID); + wc.pullTaskToRun(); + wc.verifyState(AMContainerState.RUNNING); + + TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2); + wc.assignTaskAttempt(taID2); + + wc.verifyState(AMContainerState.STOP_REQUESTED); + verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.chh).unregister(wc.containerID); + // 1 for NM stop request. 2 TERMINATING to TaskAttempt. + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + NMCommunicatorEventType.CONTAINER_STOP_REQUEST, + TaskAttemptEventType.TA_CONTAINER_TERMINATING, + TaskAttemptEventType.TA_CONTAINER_TERMINATING); + assertTrue(wc.amContainer.isInErrorState()); + + wc.nmStopSent(); + wc.containerCompleted(); + // 1 Inform scheduler. 2 TERMINATED to TaskAttempt. + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_CONTAINER_TERMINATED, + TaskAttemptEventType.TA_CONTAINER_TERMINATED, + AMSchedulerEventType.S_CONTAINER_COMPLETED); + +// assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO Set/Unset properly. +// assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly. +// assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly. + } + + @SuppressWarnings("rawtypes") + @Test + public void testMultipleAllocationsAtLaunching() { + WrappedContainer wc = new WrappedContainer(); + List<Event> outgoingEvents; + + wc.launchContainer(); + wc.assignTaskAttempt(wc.taskAttemptID); + wc.pullTaskToRun(); + wc.verifyState(AMContainerState.LAUNCHING); + + TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2); + wc.assignTaskAttempt(taID2); + + wc.verifyState(AMContainerState.STOP_REQUESTED); + verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.chh).unregister(wc.containerID); + // 1 for NM stop request. 2 TERMINATING to TaskAttempt. + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + NMCommunicatorEventType.CONTAINER_STOP_REQUEST, + TaskAttemptEventType.TA_CONTAINER_TERMINATING, + TaskAttemptEventType.TA_CONTAINER_TERMINATING); + assertTrue(wc.amContainer.isInErrorState()); + + wc.nmStopSent(); + wc.containerCompleted(); + // 1 Inform scheduler. 2 TERMINATED to TaskAttempt. + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_CONTAINER_TERMINATED, + TaskAttemptEventType.TA_CONTAINER_TERMINATED, + AMSchedulerEventType.S_CONTAINER_COMPLETED); + +// assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO Set/Unset properly. +// assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly. +// assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly. + } + + @SuppressWarnings("rawtypes") + @Test + public void testContainerTimedOutAtRunning() { + WrappedContainer wc = new WrappedContainer(); + List<Event> outgoingEvents; + + wc.launchContainer(); + wc.containerLaunched(); + wc.assignTaskAttempt(wc.taskAttemptID); + wc.pullTaskToRun(); + wc.verifyState(AMContainerState.RUNNING); + + wc.containerTimedOut(); + wc.verifyState(AMContainerState.STOP_REQUESTED); + verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.chh).unregister(wc.containerID); + // 1 to TA, 1 for RM de-allocate. + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_CONTAINER_TERMINATING, + NMCommunicatorEventType.CONTAINER_STOP_REQUEST); + // TODO Should this be an RM DE-ALLOCATE instead ? + + wc.containerCompleted(); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_CONTAINER_TERMINATED, + AMSchedulerEventType.S_CONTAINER_COMPLETED); + + assertFalse(wc.amContainer.isInErrorState()); + +// assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO Set/Unset properly. +// assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly. +// assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly. + } + + @SuppressWarnings("rawtypes") + @Test + public void testLaunchFailure() { + WrappedContainer wc = new WrappedContainer(); + List<Event> outgoingEvents; + + wc.launchContainer(); + wc.assignTaskAttempt(wc.taskAttemptID); + wc.verifyState(AMContainerState.LAUNCHING); + wc.launchFailed(); + wc.verifyState(AMContainerState.STOPPING); + verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID); + + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_CONTAINER_TERMINATING, + AMSchedulerEventType.S_CONTAINER_DEALLOCATE); + + wc.containerCompleted(); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_CONTAINER_TERMINATED, + AMSchedulerEventType.S_CONTAINER_COMPLETED); + + // Valid transition. Container complete, but not with an error. + assertFalse(wc.amContainer.isInErrorState()); + } + + @SuppressWarnings("rawtypes") + @Test + public void testContainerCompletedAtAllocated() { + WrappedContainer wc = new WrappedContainer(); + List<Event> outgoingEvents; + wc.verifyState(AMContainerState.ALLOCATED); + + wc.containerCompleted(); + wc.verifyState(AMContainerState.COMPLETED); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + AMSchedulerEventType.S_CONTAINER_COMPLETED); + + assertFalse(wc.amContainer.isInErrorState()); + } + + @Ignore + @SuppressWarnings("rawtypes") + @Test + // Verify that incoming NM launched events to COMPLETED containers are + // handled. + public void testContainerCompletedAtLaunching() { + WrappedContainer wc = new WrappedContainer(); + List<Event> outgoingEvents; + + wc.launchContainer(); + + + wc.assignTaskAttempt(wc.taskAttemptID); + + wc.containerCompleted(); + wc.verifyState(AMContainerState.COMPLETED); + verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID); + + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + AMSchedulerEventType.S_CONTAINER_COMPLETED, + TaskAttemptEventType.TA_CONTAINER_TERMINATED); + // TODO Failing because of an extra diagnostic event. + + assertFalse(wc.amContainer.isInErrorState()); + + // Container launched generated by NM call. + wc.containerLaunched(); + wc.verifyNoOutgoingEvents(); + + assertFalse(wc.amContainer.isInErrorState()); + } + + @Ignore + @SuppressWarnings("rawtypes") + @Test + public void testContainerCompletedAtIdle() { + WrappedContainer wc = new WrappedContainer(); + List<Event> outgoingEvents; + + wc.launchContainer(); + + wc.assignTaskAttempt(wc.taskAttemptID); + wc.containerLaunched(); + wc.verifyState(AMContainerState.IDLE); + + wc.containerCompleted(); + wc.verifyState(AMContainerState.COMPLETED); + verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.chh).register(wc.containerID); + verify(wc.chh).unregister(wc.containerID); + + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + AMSchedulerEventType.S_CONTAINER_COMPLETED, + TaskAttemptEventType.TA_CONTAINER_TERMINATED); + // TODO Failing because of two extra diagnostic event. + + assertFalse(wc.amContainer.isInErrorState()); + + // Pending pull request. (Ideally, container should be dead at this point + // and this event should not be generated. Network timeout on NM-RM heartbeat + // can cause it to be genreated) + wc.pullTaskToRun(); + wc.verifyNoOutgoingEvents(); + + assertFalse(wc.amContainer.isInErrorState()); + } + + @Ignore + @SuppressWarnings("rawtypes") + @Test + public void testContainerCompletedAtRunning() { + WrappedContainer wc = new WrappedContainer(); + List<Event> outgoingEvents; + + wc.launchContainer(); + + wc.assignTaskAttempt(wc.taskAttemptID); + wc.containerLaunched(); + wc.pullTaskToRun(); + wc.verifyState(AMContainerState.RUNNING); + + wc.containerCompleted(); + wc.verifyState(AMContainerState.COMPLETED); + verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.chh).register(wc.containerID); + verify(wc.chh).unregister(wc.containerID); + + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + AMSchedulerEventType.S_CONTAINER_COMPLETED, + TaskAttemptEventType.TA_CONTAINER_TERMINATED); + // TODO Failing because of two extra diagnostic event. + + assertFalse(wc.amContainer.isInErrorState()); + + // Pending task complete. (Ideally, container should be dead at this point + // and this event should not be generated. Network timeout on NM-RM heartbeat + // can cause it to be genreated) + wc.taskAttemptSucceeded(wc.taskAttemptID); + wc.verifyNoOutgoingEvents(); + + assertFalse(wc.amContainer.isInErrorState()); + } + + @SuppressWarnings("rawtypes") + @Test + public void testTaskAssignedToCompletedContainer() { + WrappedContainer wc = new WrappedContainer(); + List<Event> outgoingEvents; + + wc.launchContainer(); + wc.containerLaunched(); + wc.assignTaskAttempt(wc.taskAttemptID); + wc.pullTaskToRun(); + wc.taskAttemptSucceeded(wc.taskAttemptID); + + wc.containerCompleted(); + wc.verifyState(AMContainerState.COMPLETED); + + TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2); + + wc.assignTaskAttempt(taID2); + + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_CONTAINER_TERMINATED); + TaskAttemptEventContainerTerminated ctEvent = + (TaskAttemptEventContainerTerminated) outgoingEvents.get(0); + assertEquals(taID2, ctEvent.getTaskAttemptID()); + + // Allocation to a completed Container is considered an error. + // TODO Is this valid ? + assertTrue(wc.amContainer.isInErrorState()); + } + + @Test + public void testTaskPullAtLaunching() { + WrappedContainer wc = new WrappedContainer(); + + wc.launchContainer(); + AMContainerTask pulledTask = wc.pullTaskToRun(); + wc.verifyState(AMContainerState.LAUNCHING); + wc.verifyNoOutgoingEvents(); + assertFalse(pulledTask.shouldDie()); + assertNull(pulledTask.getTask()); + } + + @SuppressWarnings("rawtypes") + @Test + public void testNodeFailedAtIdle() { + WrappedContainer wc = new WrappedContainer(); + List<Event> outgoingEvents; + + wc.launchContainer(); + wc.containerLaunched(); + wc.assignTaskAttempt(wc.taskAttemptID); + wc.verifyState(AMContainerState.IDLE); + + wc.nodeFailed(); + // Expecting a complete event from the RM + wc.verifyState(AMContainerState.STOPPING); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_NODE_FAILED, + TaskAttemptEventType.TA_CONTAINER_TERMINATING, + AMSchedulerEventType.S_CONTAINER_DEALLOCATE); + + for (Event event : outgoingEvents) { + if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) { + TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event; + assertEquals("nodeFailed", nfEvent.getDiagnosticInfo()); + } + } + + wc.containerCompleted(); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_CONTAINER_TERMINATED, + AMSchedulerEventType.S_CONTAINER_COMPLETED); + + assertFalse(wc.amContainer.isInErrorState()); + } + + @SuppressWarnings("rawtypes") + @Test + public void testNodeFailedAtIdleMultipleAttempts() { + WrappedContainer wc = new WrappedContainer(); + List<Event> outgoingEvents; + + wc.launchContainer(); + wc.containerLaunched(); + wc.assignTaskAttempt(wc.taskAttemptID); + wc.pullTaskToRun(); + wc.taskAttemptSucceeded(wc.taskAttemptID); + wc.verifyState(AMContainerState.IDLE); + + TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2); + wc.assignTaskAttempt(taID2); + wc.pullTaskToRun(); + wc.taskAttemptSucceeded(taID2); + wc.verifyState(AMContainerState.IDLE); + + wc.nodeFailed(); + // Expecting a complete event from the RM + wc.verifyState(AMContainerState.STOPPING); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_NODE_FAILED, + TaskAttemptEventType.TA_NODE_FAILED, + AMSchedulerEventType.S_CONTAINER_DEALLOCATE); + + for (Event event : outgoingEvents) { + if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) { + TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event; + assertEquals("nodeFailed", nfEvent.getDiagnosticInfo()); + } + } + + assertFalse(wc.amContainer.isInErrorState()); + + wc.containerCompleted(); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + AMSchedulerEventType.S_CONTAINER_COMPLETED); + + assertNull(wc.amContainer.getRunningTaskAttempt()); + assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); + assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); + } + + @SuppressWarnings("rawtypes") + @Test + public void testNodeFailedAtRunningMultipleAttempts() { + WrappedContainer wc = new WrappedContainer(); + List<Event> outgoingEvents; + + wc.launchContainer(); + wc.containerLaunched(); + wc.assignTaskAttempt(wc.taskAttemptID); + wc.pullTaskToRun(); + wc.taskAttemptSucceeded(wc.taskAttemptID); + + TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2); + wc.assignTaskAttempt(taID2); + wc.pullTaskToRun(); + wc.verifyState(AMContainerState.RUNNING); + + wc.nodeFailed(); + // Expecting a complete event from the RM + wc.verifyState(AMContainerState.STOPPING); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(4); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_NODE_FAILED, + TaskAttemptEventType.TA_NODE_FAILED, + TaskAttemptEventType.TA_CONTAINER_TERMINATING, + AMSchedulerEventType.S_CONTAINER_DEALLOCATE); + + for (Event event : outgoingEvents) { + if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) { + TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event; + assertEquals("nodeFailed", nfEvent.getDiagnosticInfo()); + } + } + + wc.containerCompleted(); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_CONTAINER_TERMINATED, + AMSchedulerEventType.S_CONTAINER_COMPLETED); + + assertFalse(wc.amContainer.isInErrorState()); +// assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO. Set/Unset properly. +// assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly. +// assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly. + } + + @SuppressWarnings("rawtypes") + @Test + public void testNodeFailedAtCompletedMultipleSuccessfulTAs() { + WrappedContainer wc = new WrappedContainer(); + List<Event> outgoingEvents; + + wc.launchContainer(); + wc.containerLaunched(); + wc.assignTaskAttempt(wc.taskAttemptID); + wc.pullTaskToRun(); + wc.taskAttemptSucceeded(wc.taskAttemptID); + + TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2); + wc.assignTaskAttempt(taID2); + wc.pullTaskToRun(); + wc.taskAttemptSucceeded(taID2); + wc.stopRequest(); + wc.nmStopSent(); + wc.containerCompleted(); + wc.verifyState(AMContainerState.COMPLETED); + + wc.nodeFailed(); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_NODE_FAILED, + TaskAttemptEventType.TA_NODE_FAILED); + + assertNull(wc.amContainer.getRunningTaskAttempt()); + assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); + assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); + } + + @SuppressWarnings("rawtypes") + @Test + public void testDuplicateCompletedEvents() { + WrappedContainer wc = new WrappedContainer(); + List<Event> outgoingEvents; + + wc.launchContainer(); + wc.containerLaunched(); + wc.assignTaskAttempt(wc.taskAttemptID); + wc.pullTaskToRun(); + wc.taskAttemptSucceeded(wc.taskAttemptID); + + TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2); + wc.assignTaskAttempt(taID2); + wc.pullTaskToRun(); + wc.taskAttemptSucceeded(taID2); + wc.stopRequest(); + wc.nmStopSent(); + wc.containerCompleted(); + wc.verifyState(AMContainerState.COMPLETED); + + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + AMSchedulerEventType.S_CONTAINER_COMPLETED); + + wc.containerCompleted(); + wc.verifyNoOutgoingEvents(); + } + + + // TODO Verify diagnostics in most of the tests. + + private static class WrappedContainer { + + long rmIdentifier = 2000; + ApplicationId applicationID; + ApplicationAttemptId appAttemptID; + ContainerId containerID; + NodeId nodeID; + String nodeHttpAddress; + Resource resource; + Priority priority; + Container container; + ContainerHeartbeatHandler chh; + TaskAttemptListener tal; + + @SuppressWarnings("rawtypes") + EventHandler eventHandler; + + AppContext appContext; + + TezDAGID dagID; + TezVertexID vertexID; + TezTaskID taskID; + TezTaskAttemptID taskAttemptID; + + TezTaskContext tezTaskContext; + + Token<JobTokenIdentifier> jobToken; + + public AMContainerImpl amContainer; + + @SuppressWarnings("unchecked") + public WrappedContainer() { + applicationID = BuilderUtils.newApplicationId(rmIdentifier, 1); + appAttemptID = BuilderUtils.newApplicationAttemptId(applicationID, 1); + containerID = BuilderUtils.newContainerId(appAttemptID, 1); + nodeID = BuilderUtils.newNodeId("host", 12500); + nodeHttpAddress = "host:12501"; + resource = BuilderUtils.newResource(1024, 1); + priority = BuilderUtils.newPriority(1); + container = BuilderUtils.newContainer(containerID, nodeID, + nodeHttpAddress, resource, priority, null, rmIdentifier); + chh = mock(ContainerHeartbeatHandler.class); + + InetSocketAddress addr = new InetSocketAddress("localhost", 0); + tal = mock(TaskAttemptListener.class); + doReturn(addr).when(tal).getAddress(); + + eventHandler = mock(EventHandler.class); + + appContext = mock(AppContext.class); + doReturn(new HashMap<ApplicationAccessType, String>()).when(appContext) + .getApplicationACLs(); + doReturn(eventHandler).when(appContext).getEventHandler(); + + dagID = new TezDAGID(applicationID, 1); + vertexID = new TezVertexID(dagID, 1); + taskID = new TezTaskID(vertexID, 1); + taskAttemptID = new TezTaskAttemptID(taskID, 1); + + tezTaskContext = mock(TezTaskContext.class); + doReturn(taskAttemptID).when(tezTaskContext).getTaskAttemptId(); + + + jobToken = (Token<JobTokenIdentifier>) mock(Token.class); + + amContainer = new AMContainerImpl(container, chh, tal, + appContext); + } + + /** + * Verifies no additional outgoing events generated by the last incoming + * event to the AMContainer. + */ + @SuppressWarnings("unchecked") + public void verifyNoOutgoingEvents() { + verify(eventHandler, never()).handle(any(Event.class)); + } + + /** + * Returns a list of outgoing events generated by the last incoming event to + * the AMContainer. + * @param invocations number of expected invocations. + * + * @return a list of outgoing events from the AMContainer. + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public List<Event> verifyCountAndGetOutgoingEvents(int invocations) { + ArgumentCaptor<Event> args = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(invocations)).handle(args.capture()); + return args.getAllValues(); + } + + public void launchContainer() { + reset(eventHandler); + amContainer.handle(new AMContainerEventLaunchRequest(containerID, vertexID, + jobToken, new Credentials(), false, new TezConfiguration(), + new HashMap<String, LocalResource>(), new HashMap<String, String>(), + null)); + } + + public void assignTaskAttempt(TezTaskAttemptID taID) { + reset(eventHandler); + amContainer.handle(new AMContainerEventAssignTA(containerID, taID, + tezTaskContext)); + } + + public AMContainerTask pullTaskToRun() { + reset(eventHandler); + return amContainer.pullTaskContext(); + } + + public void containerLaunched() { + reset(eventHandler); + amContainer.handle(new AMContainerEventLaunched(containerID, 3000)); + } + + public void taskAttemptSucceeded(TezTaskAttemptID taID) { + reset(eventHandler); + amContainer.handle(new AMContainerEventTASucceeded(containerID, taID)); + } + + public void stopRequest() { + reset(eventHandler); + amContainer.handle(new AMContainerEvent(containerID, + AMContainerEventType.C_STOP_REQUEST)); + } + + public void nmStopSent() { + reset(eventHandler); + amContainer.handle(new AMContainerEvent(containerID, + AMContainerEventType.C_NM_STOP_SENT)); + } + + public void nmStopFailed() { + reset(eventHandler); + amContainer.handle(new AMContainerEvent(containerID, + AMContainerEventType.C_NM_STOP_FAILED)); + } + + public void containerCompleted() { + reset(eventHandler); + ContainerStatus cStatus = ContainerStatus.newInstance(containerID, + ContainerState.COMPLETE, "", 100); + amContainer.handle(new AMContainerEventCompleted(cStatus)); + } + + public void containerTimedOut() { + reset(eventHandler); + amContainer.handle(new AMContainerEvent(containerID, + AMContainerEventType.C_TIMED_OUT)); + } + + public void launchFailed() { + reset(eventHandler); + amContainer.handle(new AMContainerEventLaunchFailed(containerID, + "launchFailed")); + } + + public void nodeFailed() { + reset(eventHandler); + amContainer.handle(new AMContainerEventNodeFailed(containerID, + "nodeFailed")); + } + + public void verifyState(AMContainerState state) { + assertEquals( + "Expected state: " + state + ", but found: " + amContainer.getState(), + state, amContainer.getState()); + } + } + + @SuppressWarnings("rawtypes") + private void verifyUnOrderedOutgoingEventTypes(List<Event> events, + Enum<?>... expectedTypes) { + + List<Enum<?>> expectedTypeList = new LinkedList<Enum<?>>(); + for (Enum<?> expectedType : expectedTypes) { + expectedTypeList.add(expectedType); + } + List<Event> eventsCopy = new LinkedList<Event>(events); + + Iterator<Enum<?>> expectedTypeIterator = expectedTypeList.iterator(); + while (expectedTypeIterator.hasNext()) { + Enum<?> expectedType = expectedTypeIterator.next(); + Iterator<Event> iter = eventsCopy.iterator(); + while (iter.hasNext()) { + Event e = iter.next(); + if (e.getType() == expectedType) { + iter.remove(); + expectedTypeIterator.remove(); + break; + } + } + } + assertTrue("Did not find types : " + expectedTypeList + + " in outgoing event list", expectedTypeList.isEmpty()); + assertTrue("Found unexpected events: " + eventsCopy + + " in outgoing event list", eventsCopy.isEmpty()); + } +}
