Repository: tez Updated Branches: refs/heads/branch-0.5 3e5991a5c -> 3b8a480c1
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java index 0b59e79..eba8119 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java @@ -63,6 +63,7 @@ import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; import org.apache.tez.dag.history.events.TaskFinishedEvent; import org.apache.tez.dag.history.events.TaskStartedEvent; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; @@ -282,7 +283,8 @@ public class TestTaskRecovery { restoreFromTaskStartEvent(); TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - 0L, 0L, TaskAttemptState.KILLED, "", new TezCounters())); + 0L, 0L, TaskAttemptState.KILLED, + TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"", new TezCounters())); task.handle(new TaskEventRecoverTask(task.getTaskId())); // wait for the second task attempt is scheduled dispatcher.await(); @@ -303,7 +305,8 @@ public class TestTaskRecovery { restoreFromTaskStartEvent(); TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - 0L, 0L, TaskAttemptState.FAILED, "", new TezCounters())); + 0L, 0L, TaskAttemptState.FAILED, + TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"", new TezCounters())); task.handle(new TaskEventRecoverTask(task.getTaskId())); // wait for the second task attempt is scheduled dispatcher.await(); @@ -325,7 +328,7 @@ public class TestTaskRecovery { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); try { task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - 0L, 0L, TaskAttemptState.SUCCEEDED, "", new TezCounters())); + 0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters())); fail("Should fail due to no TaskAttemptStartedEvent but with TaskAttemptFinishedEvent(Succeeded)"); } catch (TezUncheckedException e) { assertTrue(e.getMessage().contains("Could not find task attempt when trying to recover")); @@ -367,8 +370,8 @@ public class TestTaskRecovery { long taFinishTime = taStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, + "", new TezCounters())); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -399,8 +402,8 @@ public class TestTaskRecovery { long taFinishTime = taStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.FAILED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.FAILED, null, + "", new TezCounters())); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -432,8 +435,8 @@ public class TestTaskRecovery { long taFinishTime = taStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.KILLED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.KILLED, null, + "", new TezCounters())); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -467,8 +470,8 @@ public class TestTaskRecovery { long taFinishTime = taStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, + "", new TezCounters())); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -509,8 +512,8 @@ public class TestTaskRecovery { long taFinishTime = taStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, + "", new TezCounters())); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -521,8 +524,8 @@ public class TestTaskRecovery { // it is possible for TaskAttempt transit from SUCCEEDED to FAILURE due to output failure. recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.FAILED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.FAILED, null, + "", new TezCounters())); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -556,8 +559,8 @@ public class TestTaskRecovery { long taFinishTime = taStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, + "", new TezCounters())); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -568,8 +571,8 @@ public class TestTaskRecovery { // it is possible for TaskAttempt transit from SUCCEEDED to KILLED due to node failure. recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.KILLED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.KILLED, null, + "", new TezCounters())); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -607,8 +610,8 @@ public class TestTaskRecovery { long taFinishTime = taStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, + "", new TezCounters())); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -647,8 +650,8 @@ public class TestTaskRecovery { long taFinishTime = taStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, + "", new TezCounters())); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -728,8 +731,8 @@ public class TestTaskRecovery { long taFinishTime = taStartTime + 100L; recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.KILLED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.KILLED, null, + "", new TezCounters())); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(TaskAttemptStateInternal.NEW, ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState()); @@ -770,7 +773,7 @@ public class TestTaskRecovery { task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, mock(ContainerId.class), mock(NodeId.class), "", "", "")); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, - 0, TaskAttemptState.KILLED, "", null)); + 0, TaskAttemptState.KILLED, null, "", null)); } assertEquals(maxFailedAttempts, task.getAttempts().size()); assertEquals(0, task.failedAttempts); @@ -800,7 +803,7 @@ public class TestTaskRecovery { task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, mock(ContainerId.class), mock(NodeId.class), "", "", "")); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, - 0, TaskAttemptState.FAILED, "", null)); + 0, TaskAttemptState.FAILED, null, "", null)); } assertEquals(maxFailedAttempts, task.getAttempts().size()); assertEquals(maxFailedAttempts, task.failedAttempts); @@ -830,7 +833,7 @@ public class TestTaskRecovery { task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, mock(ContainerId.class), mock(NodeId.class), "", "", "")); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, - 0, TaskAttemptState.FAILED, "", null)); + 0, TaskAttemptState.FAILED, null, "", null)); } assertEquals(maxFailedAttempts - 1, task.getAttempts().size()); assertEquals(maxFailedAttempts - 1, task.failedAttempts); http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index e868100..f391508 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -57,6 +57,9 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -102,12 +105,15 @@ import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto; import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.ClusterInfo; +import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.RootInputInitializerManager; import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.app.dag.Task; +import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.VertexState; @@ -116,6 +122,8 @@ import org.apache.tez.dag.app.dag.VertexTerminationCause; import org.apache.tez.dag.app.dag.event.DAGEvent; import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEvent; import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; @@ -134,10 +142,13 @@ import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo; import org.apache.tez.dag.app.dag.impl.TestVertexImpl.VertexManagerWithException.VMExceptionLocation; import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler; import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.app.rm.container.AMContainerMap; +import org.apache.tez.dag.app.rm.container.ContainerContextMatcher; import org.apache.tez.dag.history.HistoryEventHandler; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; @@ -155,6 +166,7 @@ import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.events.InputInitializerEvent; import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent; +import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.test.EdgeManagerForTest; import org.apache.tez.test.VertexManagerPluginForTest; @@ -2945,7 +2957,113 @@ public class TestVertexImpl { Assert.assertEquals(0, committer.commitCounter); Assert.assertEquals(1, committer.abortCounter); } + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testVertexTaskAttemptProcessorFailure() { + initAllVertices(VertexState.INITED); + + VertexImpl v = vertices.get("vertex1"); + + startVertex(v); + TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next(); + ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2)); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + new ContainerContextMatcher(), appContext); + containers.addContainerIfNew(container); + doReturn(containers).when(appContext).getAllContainers(); + + ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null)); + Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState()); + + dispatcher.getEventHandler().handle( + new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent( + new TaskAttemptFailedEvent("Failed"), new EventMetaData( + EventProducerConsumerType.PROCESSOR, v.getName(), null, ta.getID()))))); + dispatcher.await(); + Assert.assertEquals(VertexState.RUNNING, v.getState()); + Assert.assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, ta.getTerminationCause()); + } + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testVertexTaskAttemptInputFailure() { + initAllVertices(VertexState.INITED); + + VertexImpl v = vertices.get("vertex1"); + + startVertex(v); + TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next(); + ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2)); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + new ContainerContextMatcher(), appContext); + containers.addContainerIfNew(container); + doReturn(containers).when(appContext).getAllContainers(); + + ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null)); + Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState()); + + dispatcher.getEventHandler().handle( + new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent( + new TaskAttemptFailedEvent("Failed"), new EventMetaData( + EventProducerConsumerType.INPUT, v.getName(), null, ta.getID()))))); + dispatcher.await(); + Assert.assertEquals(VertexState.RUNNING, v.getState()); + Assert.assertEquals(TaskAttemptTerminationCause.INPUT_READ_ERROR, ta.getTerminationCause()); + } + + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testVertexTaskAttemptOutputFailure() { + initAllVertices(VertexState.INITED); + + VertexImpl v = vertices.get("vertex1"); + + startVertex(v); + TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next(); + ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2)); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + new ContainerContextMatcher(), appContext); + containers.addContainerIfNew(container); + doReturn(containers).when(appContext).getAllContainers(); + + ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null)); + Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState()); + + dispatcher.getEventHandler().handle( + new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent( + new TaskAttemptFailedEvent("Failed"), new EventMetaData( + EventProducerConsumerType.OUTPUT, v.getName(), null, ta.getID()))))); + dispatcher.await(); + Assert.assertEquals(VertexState.RUNNING, v.getState()); + Assert.assertEquals(TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR, ta.getTerminationCause()); + } + @Test(timeout = 5000) public void testSourceVertexStartHandling() { LOG.info("Testing testSourceVertexStartHandling"); @@ -2962,21 +3080,6 @@ public class TestVertexImpl { } @Test(timeout = 5000) - public void testCounters() { - // FIXME need to test counters at vertex level - } - - @Test(timeout = 5000) - public void testDiagnostics() { - // FIXME need to test diagnostics in various cases - } - - @Test(timeout = 5000) - public void testTaskAttemptCompletionEvents() { - // FIXME need to test handling of task attempt events - } - - @Test(timeout = 5000) public void testSourceTaskAttemptCompletionEvents() { LOG.info("Testing testSourceTaskAttemptCompletionEvents"); initAllVertices(VertexState.INITED); http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java index 4ec1916..d2dece3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java @@ -49,6 +49,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted; import org.apache.tez.dag.app.rm.container.AMContainerEventType; import org.apache.tez.dag.app.rm.container.AMContainerMap; import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; import org.junit.Assert; import org.junit.Before; @@ -179,6 +180,8 @@ public class TestTaskSchedulerEventHandler { Assert.assertEquals("Container preempted externally. Container preempted by RM.", completedEvent.getDiagnostics()); Assert.assertTrue(completedEvent.isPreempted()); + Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION, + completedEvent.getTerminationCause()); Assert.assertFalse(completedEvent.isDiskFailed()); schedulerHandler.stop(); @@ -186,6 +189,31 @@ public class TestTaskSchedulerEventHandler { } @Test (timeout = 5000) + public void testContainerInternalPreempted() throws IOException { + Configuration conf = new Configuration(false); + schedulerHandler.init(conf); + schedulerHandler.start(); + + ContainerId mockCId = mock(ContainerId.class); + verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId)any()); + schedulerHandler.preemptContainer(mockCId); + verify(mockTaskScheduler, times(1)).deallocateContainer(mockCId); + Assert.assertEquals(1, mockEventHandler.events.size()); + Event event = mockEventHandler.events.get(0); + Assert.assertEquals(AMContainerEventType.C_COMPLETED, event.getType()); + AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event; + Assert.assertEquals(mockCId, completedEvent.getContainerId()); + Assert.assertEquals("Container preempted internally", completedEvent.getDiagnostics()); + Assert.assertFalse(completedEvent.isPreempted()); + Assert.assertFalse(completedEvent.isDiskFailed()); + Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION, + completedEvent.getTerminationCause()); + + schedulerHandler.stop(); + schedulerHandler.close(); + } + + @Test (timeout = 5000) public void testContainerDiskFailed() throws IOException { Configuration conf = new Configuration(false); schedulerHandler.init(conf); @@ -211,6 +239,8 @@ public class TestTaskSchedulerEventHandler { completedEvent.getDiagnostics()); Assert.assertFalse(completedEvent.isPreempted()); Assert.assertTrue(completedEvent.isDiskFailed()); + Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR, + completedEvent.getTerminationCause()); schedulerHandler.stop(); schedulerHandler.close(); http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index c0be044..f273896 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -67,17 +67,21 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminatedBySystem; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating; import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.rm.AMSchedulerEventType; import org.apache.tez.dag.app.rm.NMCommunicatorEventType; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.HistoryEventHandler; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.impl.TaskSpec; +import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; @@ -87,9 +91,7 @@ import com.google.common.collect.Maps; public class TestAMContainer { - - - @Test + @Test (timeout=5000) // Assign before launch. public void tetSingleSuccessfulTaskFlow() { WrappedContainer wc = new WrappedContainer(); @@ -135,7 +137,7 @@ public class TestAMContainer { assertNull(wc.amContainer.getRunningTaskAttempt()); verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); @@ -146,7 +148,7 @@ public class TestAMContainer { assertFalse(wc.amContainer.isInErrorState()); } - @Test + @Test (timeout=5000) // Assign after launch. public void testSingleSuccessfulTaskFlow2() { WrappedContainer wc = new WrappedContainer(); @@ -191,7 +193,7 @@ public class TestAMContainer { assertNull(wc.amContainer.getRunningTaskAttempt()); verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); @@ -202,7 +204,7 @@ public class TestAMContainer { assertFalse(wc.amContainer.isInErrorState()); } - @Test + @Test (timeout=5000) public void testSingleSuccessfulTaskFlowStopRequest() { WrappedContainer wc = new WrappedContainer(); @@ -225,7 +227,7 @@ public class TestAMContainer { wc.verifyState(AMContainerState.STOPPING); wc.verifyNoOutgoingEvents(); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); @@ -238,7 +240,7 @@ public class TestAMContainer { assertFalse(wc.amContainer.isInErrorState()); } - @Test + @Test (timeout=5000) public void testSingleSuccessfulTaskFlowFailedNMStopRequest() { WrappedContainer wc = new WrappedContainer(); @@ -264,7 +266,7 @@ public class TestAMContainer { assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() == AMSchedulerEventType.S_CONTAINER_DEALLOCATE); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); @@ -278,7 +280,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testMultipleAllocationsAtIdle() { WrappedContainer wc = new WrappedContainer(); List<Event> outgoingEvents; @@ -303,7 +305,7 @@ public class TestAMContainer { assertTrue(wc.amContainer.isInErrorState()); wc.nmStopSent(); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); // 1 Inform scheduler. 2 TERMINATED to TaskAttempt. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); @@ -317,7 +319,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testAllocationAtRunning() { WrappedContainer wc = new WrappedContainer(); List<Event> outgoingEvents; @@ -343,7 +345,7 @@ public class TestAMContainer { assertTrue(wc.amContainer.isInErrorState()); wc.nmStopSent(); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); // 1 Inform scheduler. 2 TERMINATED to TaskAttempt. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); @@ -357,7 +359,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testMultipleAllocationsAtLaunching() { WrappedContainer wc = new WrappedContainer(); List<Event> outgoingEvents; @@ -382,7 +384,7 @@ public class TestAMContainer { assertTrue(wc.amContainer.isInErrorState()); wc.nmStopSent(); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); // 1 Inform scheduler. 2 TERMINATED to TaskAttempt. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); @@ -396,7 +398,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testContainerTimedOutAtRunning() { WrappedContainer wc = new WrappedContainer(); List<Event> outgoingEvents; @@ -418,7 +420,7 @@ public class TestAMContainer { NMCommunicatorEventType.CONTAINER_STOP_REQUEST); // TODO Should this be an RM DE-ALLOCATE instead ? - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -432,7 +434,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testLaunchFailure() { WrappedContainer wc = new WrappedContainer(); List<Event> outgoingEvents; @@ -449,22 +451,28 @@ public class TestAMContainer { verifyUnOrderedOutgoingEventTypes(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATING, AMSchedulerEventType.S_CONTAINER_DEALLOCATE); + for (Event e : outgoingEvents) { + if (e.getType() == TaskAttemptEventType.TA_CONTAINER_TERMINATING) { + Assert.assertEquals(TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED, + ((TaskAttemptEventContainerTerminating)e).getTerminationCause()); + } + } - wc.containerCompleted(false); + wc.containerCompleted(); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATED); - + // Valid transition. Container complete, but not with an error. assertFalse(wc.amContainer.isInErrorState()); } - @Test + @Test (timeout=5000) public void testContainerCompletedAtAllocated() { WrappedContainer wc = new WrappedContainer(); wc.verifyState(AMContainerState.ALLOCATED); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); @@ -472,7 +480,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) // Verify that incoming NM launched events to COMPLETED containers are // handled. public void testContainerCompletedAtLaunching() { @@ -484,7 +492,7 @@ public class TestAMContainer { wc.assignTaskAttempt(wc.taskAttemptID); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID); verify(wc.tal).unregisterRunningContainer(wc.containerID); @@ -492,6 +500,8 @@ public class TestAMContainer { outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATED); + Assert.assertEquals(TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED, + ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause()); assertFalse(wc.amContainer.isInErrorState()); @@ -501,9 +511,71 @@ public class TestAMContainer { assertFalse(wc.amContainer.isInErrorState()); } + + @SuppressWarnings("rawtypes") + @Test (timeout=5000) + public void testContainerCompletedAtLaunchingSpecificClusterError() { + WrappedContainer wc = new WrappedContainer(); + List<Event> outgoingEvents; + + wc.launchContainer(); + + + wc.assignTaskAttempt(wc.taskAttemptID); + + wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR); + wc.verifyState(AMContainerState.COMPLETED); + verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID); + + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM); + Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR, + ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause()); + assertFalse(wc.amContainer.isInErrorState()); + + // Container launched generated by NM call. + wc.containerLaunched(); + wc.verifyNoOutgoingEvents(); + + assertFalse(wc.amContainer.isInErrorState()); + } + @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) + public void testContainerCompletedAtLaunchingSpecificError() { + WrappedContainer wc = new WrappedContainer(); + List<Event> outgoingEvents; + + wc.launchContainer(); + + + wc.assignTaskAttempt(wc.taskAttemptID); + + wc.containerCompleted(ContainerExitStatus.ABORTED, TaskAttemptTerminationCause.NODE_FAILED); + wc.verifyState(AMContainerState.COMPLETED); + verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID); + + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_CONTAINER_TERMINATED); + Assert.assertEquals(TaskAttemptTerminationCause.NODE_FAILED, + ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause()); + + assertFalse(wc.amContainer.isInErrorState()); + + // Container launched generated by NM call. + wc.containerLaunched(); + wc.verifyNoOutgoingEvents(); + + assertFalse(wc.amContainer.isInErrorState()); + } + + @SuppressWarnings("rawtypes") + @Test (timeout=5000) public void testContainerCompletedAtIdle() { WrappedContainer wc = new WrappedContainer(); List<Event> outgoingEvents; @@ -514,7 +586,7 @@ public class TestAMContainer { wc.containerLaunched(); wc.verifyState(AMContainerState.IDLE); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID); verify(wc.tal).unregisterRunningContainer(wc.containerID); @@ -538,7 +610,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testContainerCompletedAtRunning() { WrappedContainer wc = new WrappedContainer(); List<Event> outgoingEvents; @@ -550,7 +622,7 @@ public class TestAMContainer { wc.pullTaskToRun(); wc.verifyState(AMContainerState.RUNNING); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID); verify(wc.tal).unregisterRunningContainer(wc.containerID); @@ -574,7 +646,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testContainerPreemptedAtRunning() { WrappedContainer wc = new WrappedContainer(); List<Event> outgoingEvents; @@ -586,7 +658,7 @@ public class TestAMContainer { wc.pullTaskToRun(); wc.verifyState(AMContainerState.RUNNING); - wc.containerCompleted(ContainerExitStatus.PREEMPTED); + wc.containerCompleted(ContainerExitStatus.PREEMPTED, TaskAttemptTerminationCause.EXTERNAL_PREEMPTION); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID); verify(wc.tal).unregisterRunningContainer(wc.containerID); @@ -594,6 +666,8 @@ public class TestAMContainer { verify(wc.chh).unregister(wc.containerID); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION, + ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause()); verifyUnOrderedOutgoingEventTypes(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM); @@ -608,9 +682,47 @@ public class TestAMContainer { assertFalse(wc.amContainer.isInErrorState()); } + + @SuppressWarnings("rawtypes") + @Test (timeout=5000) + public void testContainerInternallyPreemptedAtRunning() { + WrappedContainer wc = new WrappedContainer(); + List<Event> outgoingEvents; + + wc.launchContainer(); + + wc.assignTaskAttempt(wc.taskAttemptID); + wc.containerLaunched(); + wc.pullTaskToRun(); + wc.verifyState(AMContainerState.RUNNING); + + wc.containerCompleted(ContainerExitStatus.INVALID, TaskAttemptTerminationCause.INTERNAL_PREEMPTION); + wc.verifyState(AMContainerState.COMPLETED); + verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.chh).register(wc.containerID); + verify(wc.chh).unregister(wc.containerID); + + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION, + ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause()); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_CONTAINER_TERMINATED); + + assertFalse(wc.amContainer.isInErrorState()); + + // Pending task complete. (Ideally, container should be dead at this point + // and this event should not be generated. Network timeout on NM-RM heartbeat + // can cause it to be genreated) + wc.taskAttemptSucceeded(wc.taskAttemptID); + wc.verifyNoOutgoingEvents(); + wc.verifyHistoryStopEvent(); + + assertFalse(wc.amContainer.isInErrorState()); + } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testContainerDiskFailedAtRunning() { WrappedContainer wc = new WrappedContainer(); List<Event> outgoingEvents; @@ -622,7 +734,7 @@ public class TestAMContainer { wc.pullTaskToRun(); wc.verifyState(AMContainerState.RUNNING); - wc.containerCompleted(ContainerExitStatus.DISKS_FAILED); + wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID); verify(wc.tal).unregisterRunningContainer(wc.containerID); @@ -630,6 +742,8 @@ public class TestAMContainer { verify(wc.chh).unregister(wc.containerID); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR, + ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause()); verifyUnOrderedOutgoingEventTypes(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM); @@ -646,7 +760,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testTaskAssignedToCompletedContainer() { WrappedContainer wc = new WrappedContainer(); List<Event> outgoingEvents; @@ -657,7 +771,7 @@ public class TestAMContainer { wc.pullTaskToRun(); wc.taskAttemptSucceeded(wc.taskAttemptID); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2); @@ -677,7 +791,7 @@ public class TestAMContainer { assertTrue(wc.amContainer.isInErrorState()); } - @Test + @Test (timeout=5000) public void testTaskPullAtLaunching() { WrappedContainer wc = new WrappedContainer(); @@ -690,7 +804,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testNodeFailedAtIdle() { WrappedContainer wc = new WrappedContainer(); List<Event> outgoingEvents; @@ -712,11 +826,11 @@ public class TestAMContainer { for (Event event : outgoingEvents) { if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) { TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event; - assertEquals("nodeFailed", nfEvent.getDiagnosticInfo()); + assertTrue(nfEvent.getDiagnosticInfo().contains("nodeFailed")); } } - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -726,7 +840,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testNodeFailedAtIdleMultipleAttempts() { WrappedContainer wc = new WrappedContainer(); List<Event> outgoingEvents; @@ -756,13 +870,13 @@ public class TestAMContainer { for (Event event : outgoingEvents) { if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) { TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event; - assertEquals("nodeFailed", nfEvent.getDiagnosticInfo()); + assertTrue(nfEvent.getDiagnosticInfo().contains("nodeFailed")); } } assertFalse(wc.amContainer.isInErrorState()); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyNoOutgoingEvents(); wc.verifyHistoryStopEvent(); @@ -772,7 +886,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testNodeFailedAtRunningMultipleAttempts() { WrappedContainer wc = new WrappedContainer(); List<Event> outgoingEvents; @@ -801,11 +915,11 @@ public class TestAMContainer { for (Event event : outgoingEvents) { if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) { TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event; - assertEquals("nodeFailed", nfEvent.getDiagnosticInfo()); + assertTrue(nfEvent.getDiagnosticInfo().contains("nodeFailed")); } } - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -818,7 +932,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testNodeFailedAtCompletedMultipleSuccessfulTAs() { WrappedContainer wc = new WrappedContainer(); List<Event> outgoingEvents; @@ -835,7 +949,7 @@ public class TestAMContainer { wc.taskAttemptSucceeded(taID2); wc.stopRequest(); wc.nmStopSent(); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); wc.nodeFailed(); @@ -849,7 +963,7 @@ public class TestAMContainer { assertEquals(2, wc.amContainer.getAllTaskAttempts().size()); } - @Test + @Test (timeout=5000) public void testDuplicateCompletedEvents() { WrappedContainer wc = new WrappedContainer(); @@ -865,17 +979,17 @@ public class TestAMContainer { wc.taskAttemptSucceeded(taID2); wc.stopRequest(); wc.nmStopSent(); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyNoOutgoingEvents(); wc.verifyHistoryStopEvent(); } - @Test + @Test (timeout=5000) public void testLocalResourceAddition() { WrappedContainer wc = new WrappedContainer(); @@ -926,13 +1040,13 @@ public class TestAMContainer { wc.taskAttemptSucceeded(taID3); // Verify references are cleared after a container completes. - wc.containerCompleted(false); + wc.containerCompleted(); assertNull(wc.amContainer.containerLocalResources); assertNull(wc.amContainer.additionalLocalResources); } @SuppressWarnings("unchecked") - @Test + @Test (timeout=5000) public void testCredentialsTransfer() { WrappedContainerMultipleDAGs wc = new WrappedContainerMultipleDAGs(); @@ -1183,15 +1297,15 @@ public class TestAMContainer { AMContainerEventType.C_NM_STOP_FAILED)); } - public void containerCompleted(boolean preempted) { + public void containerCompleted() { reset(eventHandler); - amContainer.handle(new AMContainerEventCompleted(containerID, - (preempted ? ContainerExitStatus.PREEMPTED : ContainerExitStatus.SUCCESS), null)); + amContainer.handle(new AMContainerEventCompleted(containerID, ContainerExitStatus.SUCCESS, null, + TaskAttemptTerminationCause.CONTAINER_EXITED)); } - public void containerCompleted(int exitStatus) { + public void containerCompleted(int exitStatus, TaskAttemptTerminationCause errCause) { reset(eventHandler); - amContainer.handle(new AMContainerEventCompleted(containerID, exitStatus, null)); + amContainer.handle(new AMContainerEventCompleted(containerID, exitStatus, null, errCause)); } public void containerTimedOut() { http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index bc0a642..0c450ab 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -21,6 +21,7 @@ package org.apache.tez.dag.history.events; import static org.junit.Assert.fail; import java.nio.ByteBuffer; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -45,6 +46,7 @@ import org.apache.tez.dag.app.dag.impl.VertexStats; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.SummaryEvent; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; @@ -472,7 +474,7 @@ public class TestHistoryEventsProtoConversion { TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1), "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, - null, null); + null, null, null); TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent) testProtoConversion(event); Assert.assertEquals(event.getTaskAttemptID(), @@ -492,7 +494,7 @@ public class TestHistoryEventsProtoConversion { TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1), "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, - "diagnose", new TezCounters()); + TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters()); TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent) testProtoConversion(event); Assert.assertEquals(event.getTaskAttemptID(), @@ -505,6 +507,8 @@ public class TestHistoryEventsProtoConversion { deserializedEvent.getState()); Assert.assertEquals(event.getCounters(), deserializedEvent.getCounters()); + Assert.assertEquals(event.getTaskAttemptError(), + deserializedEvent.getTaskAttemptError()); logEvents(event, deserializedEvent); } } http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java index d9e1a38..4349740 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java @@ -59,6 +59,7 @@ import org.apache.tez.dag.history.events.VertexInitializedEvent; import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; import org.apache.tez.dag.history.events.VertexStartedEvent; import org.apache.tez.dag.history.utils.DAGUtils; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; @@ -155,7 +156,7 @@ public class TestHistoryEventJsonConversion { break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), - random.nextInt(), TaskAttemptState.FAILED, null, null); + random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null); break; case CONTAINER_LAUNCHED: event = new ContainerLaunchedEvent(containerId, random.nextInt(), http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index 3e355e6..f353890 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -393,6 +393,9 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); + if (event.getTaskAttemptError() != null) { + atsEntity.addOtherInfo(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, event.getTaskAttemptError().name()); + } atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); atsEntity.addOtherInfo(ATSConstants.COUNTERS, DAGUtils.convertCountersToATSMap(event.getCounters())); http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index f32704e..4023c1c 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -66,6 +66,7 @@ import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; import org.apache.tez.dag.history.events.VertexStartedEvent; import org.apache.tez.dag.history.logging.EntityTypes; import org.apache.tez.dag.history.utils.DAGUtils; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; @@ -160,7 +161,7 @@ public class TestHistoryEventTimelineConversion { break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), - random.nextInt(), TaskAttemptState.FAILED, null, null); + random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null); break; case CONTAINER_LAUNCHED: event = new ContainerLaunchedEvent(containerId, random.nextInt(), @@ -436,11 +437,13 @@ public class TestHistoryEventTimelineConversion { long finishTime = startTime + 1234; TaskAttemptState state = TaskAttemptState .values()[random.nextInt(TaskAttemptState.values().length)]; + TaskAttemptTerminationCause error = TaskAttemptTerminationCause + .values()[random.nextInt(TaskAttemptTerminationCause.values().length)]; String diagnostics = "random diagnostics message"; TezCounters counters = new TezCounters(); TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName, - startTime, finishTime, state, diagnostics, counters); + startTime, finishTime, state, error, diagnostics, counters); TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId()); Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType()); @@ -467,6 +470,7 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME)); Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN)); Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS)); + Assert.assertEquals(error.name(), otherInfo.get(ATSConstants.TASK_ATTEMPT_ERROR_ENUM)); Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS)); Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS)); }
