Updated Branches: refs/heads/TEZ-1 4c89e15c6 -> d660948d7
TEZ-128. Add additional unit tests for TaskAttempt. Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/d660948d Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/d660948d Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/d660948d Branch: refs/heads/TEZ-1 Commit: d660948d7eaa3536a3bcd97e16524aa626b4ad7c Parents: 4c89e15 Author: Siddharth Seth <[email protected]> Authored: Tue May 14 15:31:07 2013 -0700 Committer: Siddharth Seth <[email protected]> Committed: Tue May 14 15:31:07 2013 -0700 ---------------------------------------------------------------------- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 5 - .../tez/dag/app/dag/impl/TestTaskAttempt.java | 409 ++++++++++++--- .../apache/tez/dag/app/dag/impl/TestTaskImpl.java | 2 + 3 files changed, 325 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d660948d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 2564ffa..264b3ad 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -116,7 +116,6 @@ public class TaskAttemptImpl implements TaskAttempt, protected EventHandler eventHandler; private final TezTaskAttemptID attemptId; private final Clock clock; -// private final TaskAttemptListener taskAttemptListener; private final List<String> diagnostics = new ArrayList<String>(); private final Lock readLock; private final Lock writeLock; @@ -923,10 +922,6 @@ public class TaskAttemptImpl implements TaskAttempt, // Send out events to the Task - indicating TaskAttemptTermination(F/K) ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, helper .getTaskEventType())); - - if (event instanceof DiagnosableEvent) { - ta.addDiagnosticInfo(((DiagnosableEvent) event).getDiagnosticInfo()); - } } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d660948d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index a18bd3b..369bced 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -20,13 +20,18 @@ package org.apache.tez.dag.app.dag.impl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; import java.net.InetSocketAddress; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.hadoop.fs.FileStatus; @@ -61,10 +66,14 @@ import org.apache.tez.dag.app.dag.event.DAGEvent; import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating; import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest; import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; +import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; +import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded; +import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest; import org.apache.tez.dag.app.rm.container.AMContainerMap; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -72,6 +81,7 @@ import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.engine.common.security.JobTokenIdentifier; import org.junit.Test; +import org.mockito.ArgumentCaptor; @SuppressWarnings({ "unchecked", "rawtypes" }) public class TestTaskAttempt { @@ -102,88 +112,96 @@ public class TestTaskAttempt { // testMRAppHistory(app); // } - // @Test - // // Verifies that the launch request is based on the hosts. - // // TODO Move to the client. - // // TODO Add a test that verifies that the LocationHint is used as it should - // be. - // public void testSingleRackRequest() throws Exception { - // TaskAttemptImpl.ScheduleTaskattemptTransition sta = - // new TaskAttemptImpl.ScheduleTaskattemptTransition(); - // - // EventHandler eventHandler = mock(EventHandler.class); - // String[] hosts = new String[3]; - // hosts[0] = "host1"; - // hosts[1] = "host2"; - // hosts[2] = "host3"; - // TaskSplitMetaInfo splitInfo = new TaskSplitMetaInfo(hosts, 0, - // 128 * 1024 * 1024l); - // - // TaskAttemptImpl mockTaskAttempt = createMapTaskAttemptImpl2ForTest( - // eventHandler, splitInfo); - // TaskAttemptEventSchedule mockTAEvent = - // mock(TaskAttemptEventSchedule.class); - // doReturn(false).when(mockTAEvent).isRescheduled(); - // - // sta.transition(mockTaskAttempt, mockTAEvent); - // - // ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); - // verify(eventHandler, times(2)).handle(arg.capture()); - // if (!(arg.getAllValues().get(1) instanceof - // AMSchedulerTALaunchRequestEvent)) { - // Assert.fail("Second Event not of type ContainerRequestEvent"); - // } - // AMSchedulerTALaunchRequestEvent tlrE = (AMSchedulerTALaunchRequestEvent) - // arg - // .getAllValues().get(1); - // String[] requestedRacks = tlrE.getRacks(); - // // Only a single occurrence of /DefaultRack - // assertEquals(1, requestedRacks.length); - // } + @Test + public void testLocalityRequest() { - // @Test - // // Tests that an attempt is made to resolve the localized hosts to racks. - // // TODO Move to the client. - // public void testHostResolveAttempt() throws Exception { - // TaskAttemptImpl.ScheduleTaskattemptTransition sta = - // new TaskAttemptImpl.ScheduleTaskattemptTransition(); - // - // EventHandler eventHandler = mock(EventHandler.class); - // String hosts[] = new String[] {"192.168.1.1", "host2", "host3"}; - // String resolved[] = new String[] {"host1", "host2", "host3"}; - // TaskSplitMetaInfo splitInfo = - // new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l); - // - // TaskAttemptImpl mockTaskAttempt = - // createMapTaskAttemptImpl2ForTest(eventHandler, splitInfo); - // TaskAttemptImpl spyTa = spy(mockTaskAttempt); - // when(spyTa.resolveHosts(hosts)).thenReturn(resolved); - // - // TaskAttemptEventSchedule mockTAEvent = - // mock(TaskAttemptEventSchedule.class); - // doReturn(false).when(mockTAEvent).isRescheduled(); - // - // sta.transition(spyTa, mockTAEvent); - // verify(spyTa).resolveHosts(hosts); - // ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); - // verify(eventHandler, times(2)).handle(arg.capture()); - // if (!(arg.getAllValues().get(1) instanceof - // AMSchedulerTALaunchRequestEvent)) { - // Assert.fail("Second Event not of type ContainerRequestEvent"); - // } - // Map<String, Boolean> expected = new HashMap<String, Boolean>(); - // expected.put("host1", true); - // expected.put("host2", true); - // expected.put("host3", true); - // AMSchedulerTALaunchRequestEvent cre = - // (AMSchedulerTALaunchRequestEvent) arg.getAllValues().get(1); - // String[] requestedHosts = cre.getHosts(); - // for (String h : requestedHosts) { - // expected.remove(h); - // } - // assertEquals(0, expected.size()); - // } - // + TaskAttemptImpl.ScheduleTaskattemptTransition sta = + new TaskAttemptImpl.ScheduleTaskattemptTransition(); + + EventHandler eventHandler = mock(EventHandler.class); + String[] hosts = new String[3]; + hosts[0] = "host1"; + hosts[1] = "host2"; + hosts[2] = "host3"; + TaskLocationHint locationHint = new TaskLocationHint(hosts, null); + + TezTaskID taskID = new TezTaskID( + new TezVertexID(new TezDAGID("1", 1, 1), 1), 1); + TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + mock(TaskAttemptListener.class), 1, new TezConfiguration(), + mock(Token.class), new Credentials(), new SystemClock(), + mock(TaskHeartbeatHandler.class), mock(AppContext.class), + MAP_PROCESSOR_NAME, locationHint, BuilderUtils.newResource(1024, 1), + new HashMap<String, LocalResource>(), new HashMap<String, String>(), + "", false); + + TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class); + + sta.transition(taImpl, sEvent); + + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(1)).handle(arg.capture()); + if (!(arg.getAllValues().get(0) instanceof AMSchedulerEventTALaunchRequest)) { + fail("Second event not of type " + + AMSchedulerEventTALaunchRequest.class.getName()); + } + // TODO Move the Rack request check to the client after TEZ-125 is fixed. + AMSchedulerEventTALaunchRequest lre = (AMSchedulerEventTALaunchRequest) arg + .getAllValues().get(0); + String[] requestedRacks = lre.getRacks(); + assertEquals(1, requestedRacks.length); + assertEquals(3, lre.getHosts().length); + for (int i = 0; i < 3; i++) { + assertEquals("host" + (i + 1), lre.getHosts()[i]); + } + } + + + @Test + // Tests that an attempt is made to resolve the localized hosts to racks. + // TODO Move to the client post TEZ-125. + public void testHostResolveAttempt() throws Exception { + TaskAttemptImpl.ScheduleTaskattemptTransition sta = + new TaskAttemptImpl.ScheduleTaskattemptTransition(); + + EventHandler eventHandler = mock(EventHandler.class); + String hosts[] = new String[] { "192.168.1.1", "host2", "host3" }; + String resolved[] = new String[] { "host1", "host2", "host3" }; + TaskLocationHint locationHint = new TaskLocationHint(hosts, null); + + TezTaskID taskID = new TezTaskID( + new TezVertexID(new TezDAGID("1", 1, 1), 1), 1); + TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + mock(TaskAttemptListener.class), 1, new TezConfiguration(), + mock(Token.class), new Credentials(), new SystemClock(), + mock(TaskHeartbeatHandler.class), mock(AppContext.class), + MAP_PROCESSOR_NAME, locationHint, BuilderUtils.newResource(1024, 1), + new HashMap<String, LocalResource>(), new HashMap<String, String>(), + "", false); + TaskAttemptImpl spyTa = spy(taImpl); + when(spyTa.resolveHosts(hosts)).thenReturn(resolved); + + TaskAttemptEventSchedule mockTAEvent = mock(TaskAttemptEventSchedule.class); + + sta.transition(spyTa, mockTAEvent); + verify(spyTa).resolveHosts(hosts); + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(1)).handle(arg.capture()); + if (!(arg.getAllValues().get(0) instanceof AMSchedulerEventTALaunchRequest)) { + fail("Second Event not of type ContainerRequestEvent"); + } + Map<String, Boolean> expected = new HashMap<String, Boolean>(); + expected.put("host1", true); + expected.put("host2", true); + expected.put("host3", true); + AMSchedulerEventTALaunchRequest cre = (AMSchedulerEventTALaunchRequest) arg + .getAllValues().get(0); + String[] requestedHosts = cre.getHosts(); + for (String h : requestedHosts) { + expected.remove(h); + } + assertEquals(0, expected.size()); + } // @Test // // Verifies accounting of slot_milli counters. Time spent in running tasks. @@ -325,10 +343,105 @@ public class TestTaskAttempt { // null)); assertFalse(eventHandler.internalError); } + + @Test + // Ensure ContainerTerminating and ContainerTerminated is handled correctly by + // the TaskAttempt + public void testContainerTerminationWhileRunning() throws Exception { + ApplicationId appId = BuilderUtils.newApplicationId(1, 2); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 0); + TezDAGID dagID = new TezDAGID(appId, 1); + TezVertexID vertexID = new TezVertexID(dagID, 1); + TezTaskID taskID = new TezTaskID(vertexID, 1); + TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0); - // TODO Add a similar test for TERMINATING. - // Ensure ContainerTerminated is handled correctly by the TaskAttempt + MockEventHandler eventHandler = spy(new MockEventHandler()); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + TezConfiguration tezConf = new TezConfiguration(); + tezConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + tezConf.setBoolean("fs.file.impl.disable.cache", true); + + TaskLocationHint locationHint = new TaskLocationHint( + new String[] { "127.0.0.1" }, null); + Resource resource = BuilderUtils.newResource(1024, 1); + Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); + Map<String, String> environment = new HashMap<String, String>(); + String javaOpts = ""; + + NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); + ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + AppContext appCtx = mock(AppContext.class); + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + appCtx); + containers.addContainerIfNew(container); + + doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); + doReturn(containers).when(appCtx).getAllContainers(); + + TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + taListener, 1, tezConf, mock(Token.class), new Credentials(), + new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx, + MAP_PROCESSOR_NAME, locationHint, resource, localResources, + environment, javaOpts, false); + + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + + taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null)); + // At state STARTING. + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, + null, -1)); + assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), + TaskAttemptState.RUNNING); + + int expectedEventsAtRunning = 3; + verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); + + taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID, + "Terminating")); + assertFalse( + "InternalError occurred trying to handle TA_CONTAINER_TERMINATING", + eventHandler.internalError); + + assertEquals("Task attempt is not in the FAILED state", taImpl.getState(), + TaskAttemptState.FAILED); + + assertEquals(1, taImpl.getDiagnostics().size()); + assertEquals("Terminating", taImpl.getDiagnostics().get(0)); + + int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3; + arg = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); + + verifyEventType( + arg.getAllValues().subList(expectedEventsAtRunning, + expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1); + verifyEventType( + arg.getAllValues().subList(expectedEventsAtRunning, + expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); + + taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, + "Terminated")); + int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0; + arg = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture()); + + assertEquals(2, taImpl.getDiagnostics().size()); + assertEquals("Terminated", taImpl.getDiagnostics().get(1)); + } + + @Test + // Ensure ContainerTerminated is handled correctly by the TaskAttempt public void testContainerTerminatedWhileRunning() throws Exception { ApplicationId appId = BuilderUtils.newApplicationId(1, 2); ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( @@ -382,11 +495,14 @@ public class TestTaskAttempt { null, -1)); assertEquals("Task attempt is not in running state", taImpl.getState(), TaskAttemptState.RUNNING); - taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, null)); + taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "Terminated")); assertFalse( "InternalError occurred trying to handle TA_CONTAINER_TERMINATED", eventHandler.internalError); - // TODO Verify diagnostics + + assertEquals("Terminated", taImpl.getDiagnostics().get(0)); + + // TODO Ensure TA_TERMINATING after this is ingored. } @Test @@ -456,6 +572,98 @@ public class TestTaskAttempt { } @Test + // Ensure ContainerTerminating and ContainerTerminated is handled correctly by + // the TaskAttempt + public void testContainerTerminatedAfterSuccess() throws Exception { + ApplicationId appId = BuilderUtils.newApplicationId(1, 2); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 0); + TezDAGID dagID = new TezDAGID(appId, 1); + TezVertexID vertexID = new TezVertexID(dagID, 1); + TezTaskID taskID = new TezTaskID(vertexID, 1); + TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0); + + MockEventHandler eventHandler = spy(new MockEventHandler()); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + TezConfiguration tezConf = new TezConfiguration(); + tezConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + tezConf.setBoolean("fs.file.impl.disable.cache", true); + + TaskLocationHint locationHint = new TaskLocationHint( + new String[] { "127.0.0.1" }, null); + Resource resource = BuilderUtils.newResource(1024, 1); + Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); + Map<String, String> environment = new HashMap<String, String>(); + String javaOpts = ""; + + NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); + ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + AppContext appCtx = mock(AppContext.class); + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + appCtx); + containers.addContainerIfNew(container); + + doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); + doReturn(containers).when(appCtx).getAllContainers(); + + TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + taListener, 1, tezConf, mock(Token.class), new Credentials(), + new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx, + MAP_PROCESSOR_NAME, locationHint, resource, localResources, + environment, javaOpts, false); + + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + + taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null)); + // At state STARTING. + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, + null, -1)); + assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), + TaskAttemptState.RUNNING); + + int expectedEventsAtRunning = 3; + verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); + + taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE)); + + assertEquals("Task attempt is not in the SUCCEEDED state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + + assertEquals(0, taImpl.getDiagnostics().size()); + + int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3; + arg = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); + + verifyEventType( + arg.getAllValues().subList(expectedEventsAtRunning, + expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1); + verifyEventType( + arg.getAllValues().subList(expectedEventsAtRunning, + expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); + + taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, + "Terminated")); + int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0; + arg = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture()); + + // Verify that the diagnostic message included in the Terminated event is not + // captured - TA already succeeded. + assertEquals(0, taImpl.getDiagnostics().size()); + } + + + @Test // Verifies that multiple TooManyFetchFailures are handled correctly by the // TaskAttempt. public void testMultipleTooManyFetchFailures() throws Exception { @@ -467,7 +675,8 @@ public class TestTaskAttempt { TezTaskID taskID = new TezTaskID(vertexID, 1); TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0); - MockEventHandler eventHandler = new MockEventHandler(); + MockEventHandler mockEh = new MockEventHandler(); + MockEventHandler eventHandler = spy(mockEh); TaskAttemptListener taListener = mock(TaskAttemptListener.class); when(taListener.getAddress()).thenReturn( new InetSocketAddress("localhost", 0)); @@ -513,8 +722,20 @@ public class TestTaskAttempt { TaskAttemptEventType.TA_DONE)); assertEquals("Task attempt is not in succeeded state", taImpl.getState(), TaskAttemptState.SUCCEEDED); + + int expectedEventsTillSucceeded = 6; + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(expectedEventsTillSucceeded)).handle(arg.capture()); + verifyEventType(arg.getAllValues(), TaskEventTAUpdate.class, 2); + taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES)); + int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 3; + verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle(arg.capture()); + verifyEventType( + arg.getAllValues().subList(expectedEventsTillSucceeded, + expectedEventsAfterFetchFailure), TaskEventTAUpdate.class, 1); + assertEquals("Task attempt is not in FAILED state", taImpl.getState(), TaskAttemptState.FAILED); taImpl.handle(new TaskAttemptEvent(taskAttemptID, @@ -524,8 +745,24 @@ public class TestTaskAttempt { assertFalse( "InternalError occurred trying to handle TA_TOO_MANY_FETCH_FAILURES", eventHandler.internalError); + // No new events. + verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle( + arg.capture()); } + private void verifyEventType(List<Event> events, + Class<? extends Event> eventClass, int expectedOccurences) { + int count = 0; + for (Event e : events) { + if (eventClass.isInstance(e)) { + count++; + } + } + assertEquals( + "Mismatch in num occurences of event: " + eventClass.getCanonicalName(), + expectedOccurences, count); + } + public static class MockEventHandler implements EventHandler { public boolean internalError; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d660948d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index 516ea5e..100f2aa 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -363,6 +363,8 @@ public class TestTaskImpl { assertTaskSucceededState(); } + + // TODO Add test to validate the correct commit attempt. @SuppressWarnings("rawtypes") private class MockTaskImpl extends TaskImpl {
