http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index 1ed7ecb..f88ab7c 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -31,6 +31,13 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import org.apache.tez.common.TezAbstractEvent; +import org.apache.tez.dag.app.dag.event.TaskEvent; +import org.apache.tez.dag.app.dag.event.TaskEventTAFailed; +import org.apache.tez.dag.app.dag.event.TaskEventTAKilled; +import org.apache.tez.dag.app.dag.event.TaskEventTALaunched; +import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventHandler; @@ -65,7 +72,6 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest; import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask; -import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; import org.apache.tez.dag.app.dag.event.TaskEventTermination; import org.apache.tez.dag.app.dag.event.TaskEventType; import org.apache.tez.dag.app.dag.event.VertexEventType; @@ -131,6 +137,7 @@ public class TestTaskImpl { @Before public void setup() { conf = new Configuration(); + conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 4); taskCommunicatorManagerInterface = mock(TaskCommunicatorManagerInterface.class); taskHeartbeatHandler = mock(TaskHeartbeatHandler.class); credentials = new Credentials(); @@ -180,6 +187,13 @@ public class TestTaskImpl { assertEquals(locationHint, mockTask.getTaskLocationHint()); } + private void scheduleTaskAttempt(TezTaskID taskId, TaskState expectedState) { + mockTask.handle(new TaskEventScheduleTask(taskId, mockTaskSpec, locationHint, false)); + assertEquals(expectedState, mockTask.getState()); + assertEquals(mockTaskSpec, mockTask.getBaseTaskSpec()); + assertEquals(locationHint, mockTask.getTaskLocationHint()); + } + private void sendTezEventsToTask(TezTaskID taskId, int numTezEvents) { EventMetaData eventMetaData = new EventMetaData(); DataMovementEvent dmEvent = DataMovementEvent.create(null); @@ -199,16 +213,44 @@ public class TestTaskImpl { assertTaskKillWaitState(); } + private TaskEventTAKilled createTaskTAKilledEvent(TezTaskAttemptID taskAttemptId) { + return createTaskTAKilledEvent(taskAttemptId, null); + } + + private TaskEventTAKilled createTaskTAKilledEvent(TezTaskAttemptID taskAttemptId, + TezAbstractEvent causalEvent) { + return new TaskEventTAKilled(taskAttemptId, causalEvent); + } + + private TaskEventTAFailed createTaskTAFailedEvent(TezTaskAttemptID taskAttemptId) { + return createTaskTAFailedEvent(taskAttemptId, TaskFailureType.NON_FATAL, null); + } + + private TaskEventTAFailed createTaskTAFailedEvent(TezTaskAttemptID taskAttemptId, + TaskFailureType taskFailureType, + TezAbstractEvent causalEvent) { + return new TaskEventTAFailed(taskAttemptId, taskFailureType, causalEvent); + } + + private TaskEventTALaunched createTaskTALauncherEvent(TezTaskAttemptID taskAttemptId) { + return new TaskEventTALaunched(taskAttemptId); + } + + private TaskEventTASucceeded createTaskTASucceededEvent(TezTaskAttemptID taskAttemptId) { + return new TaskEventTASucceeded(taskAttemptId); + } + + private TaskEvent createTaskTAAddSpecAttempt(TezTaskAttemptID taskAttemptId) { + return new TaskEvent(taskAttemptId.getTaskID(), TaskEventType.T_ADD_SPEC_ATTEMPT); + } private void killScheduledTaskAttempt(TezTaskAttemptID attemptId) { - mockTask.handle(new TaskEventTAUpdate(attemptId, - TaskEventType.T_ATTEMPT_KILLED)); + mockTask.handle(createTaskTAKilledEvent(attemptId)); assertTaskScheduledState(); } private void launchTaskAttempt(TezTaskAttemptID attemptId) { - mockTask.handle(new TaskEventTAUpdate(attemptId, - TaskEventType.T_ATTEMPT_LAUNCHED)); + mockTask.handle(createTaskTALauncherEvent(attemptId)); assertTaskRunningState(); } @@ -222,17 +264,21 @@ public class TestTaskImpl { } private void killRunningTaskAttempt(TezTaskAttemptID attemptId) { - mockTask.handle(new TaskEventTAUpdate(attemptId, - TaskEventType.T_ATTEMPT_KILLED)); + mockTask.handle(createTaskTAKilledEvent(attemptId)); assertTaskRunningState(); verify(mockTask.getVertex(), times(1)).incrementKilledTaskAttemptCount(); } private void failRunningTaskAttempt(TezTaskAttemptID attemptId) { + failRunningTaskAttempt(attemptId, true); + } + + private void failRunningTaskAttempt(TezTaskAttemptID attemptId, boolean verifyState) { int failedAttempts = mockTask.failedAttempts; - mockTask.handle(new TaskEventTAUpdate(attemptId, - TaskEventType.T_ATTEMPT_FAILED)); - assertTaskRunningState(); + mockTask.handle(createTaskTAFailedEvent(attemptId)); + if (verifyState) { + assertTaskRunningState(); + } Assert.assertEquals(failedAttempts + 1, mockTask.failedAttempts); verify(mockTask.getVertex(), times(failedAttempts + 1)).incrementFailedTaskAttemptCount(); } @@ -310,13 +356,50 @@ public class TestTaskImpl { scheduleTaskAttempt(taskId); launchTaskAttempt(mockTask.getLastAttempt().getID()); killTask(taskId); - mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), - TaskEventType.T_ATTEMPT_KILLED)); + mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID())); assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState()); verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED); } @Test(timeout = 5000) + public void testTooManyFailedAtetmpts() { + LOG.info("--- START: testTooManyFailedAttempts ---"); + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId, TaskState.SCHEDULED); + launchTaskAttempt(mockTask.getLastAttempt().getID()); + failRunningTaskAttempt(mockTask.getLastAttempt().getID()); + + scheduleTaskAttempt(taskId, TaskState.RUNNING); + launchTaskAttempt(mockTask.getLastAttempt().getID()); + failRunningTaskAttempt(mockTask.getLastAttempt().getID()); + + scheduleTaskAttempt(taskId, TaskState.RUNNING); + launchTaskAttempt(mockTask.getLastAttempt().getID()); + failRunningTaskAttempt(mockTask.getLastAttempt().getID()); + + scheduleTaskAttempt(taskId, TaskState.RUNNING); + launchTaskAttempt(mockTask.getLastAttempt().getID()); + failRunningTaskAttempt(mockTask.getLastAttempt().getID(), false); + + assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState()); + verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED); + } + + @Test(timeout = 5000) + public void testFailedAttemptWithFatalError() { + LOG.info("--- START: testFailedAttemptWithFatalError ---"); + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId, TaskState.SCHEDULED); + launchTaskAttempt(mockTask.getLastAttempt().getID()); + mockTask.handle( + createTaskTAFailedEvent(mockTask.getLastAttempt().getID(), TaskFailureType.FATAL, null)); + + assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState()); + assertEquals(1, mockTask.failedAttempts); + verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED); + } + + @Test(timeout = 5000) public void testKillRunningTaskPreviousKilledAttempts() { LOG.info("--- START: testKillRunningTaskPreviousKilledAttempts ---"); TezTaskID taskId = getNewTaskID(); @@ -325,8 +408,7 @@ public class TestTaskImpl { killRunningTaskAttempt(mockTask.getLastAttempt().getID()); assertEquals(TaskStateInternal.RUNNING, mockTask.getInternalState()); killTask(taskId); - mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), - TaskEventType.T_ATTEMPT_KILLED)); + mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID())); assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState()); verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED); @@ -342,8 +424,7 @@ public class TestTaskImpl { scheduleTaskAttempt(taskId); launchTaskAttempt(mockTask.getLastAttempt().getID()); killTask(taskId); - mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), - TaskEventType.T_ATTEMPT_SUCCEEDED)); + mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID())); assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState()); } @@ -357,8 +438,7 @@ public class TestTaskImpl { scheduleTaskAttempt(taskId); launchTaskAttempt(mockTask.getLastAttempt().getID()); killTask(taskId); - mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(createTaskTAFailedEvent(mockTask.getLastAttempt().getID())); assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState()); } @@ -415,15 +495,13 @@ public class TestTaskImpl { scheduleTaskAttempt(taskId); launchTaskAttempt(mockTask.getLastAttempt().getID()); killTask(taskId); - mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), - TaskEventType.T_ATTEMPT_KILLED)); + mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID())); assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState()); // Send duplicate kill for same attempt // This will not happen in practice but this is to simulate handling // of killed attempts in killed state. - mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), - TaskEventType.T_ATTEMPT_KILLED)); + mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID())); assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState()); } @@ -437,14 +515,12 @@ public class TestTaskImpl { TezTaskID taskId = getNewTaskID(); scheduleTaskAttempt(taskId); for (int i = 0; i < mockTask.maxFailedAttempts; ++i) { - mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(createTaskTAFailedEvent(mockTask.getLastAttempt().getID())); } assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState()); // Send kill for an attempt - mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), - TaskEventType.T_ATTEMPT_KILLED)); + mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID())); assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState()); } @@ -540,8 +616,7 @@ public class TestTaskImpl { mockTask.canCommit(mockTask.getLastAttempt().getID())); updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.SUCCEEDED); - mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), - TaskEventType.T_ATTEMPT_SUCCEEDED)); + mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID())); assertTaskSucceededState(); } @@ -562,8 +637,7 @@ public class TestTaskImpl { mockTask.canCommit(mockTask.getLastAttempt().getID())); updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.SUCCEEDED); - mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), - TaskEventType.T_ATTEMPT_SUCCEEDED)); + mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID())); assertTaskSucceededState(); } @@ -578,8 +652,7 @@ public class TestTaskImpl { TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID(); // Add a speculative task attempt that succeeds - mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), - TaskEventType.T_ADD_SPEC_ATTEMPT)); + mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID())); launchTaskAttempt(mockTask.getLastAttempt().getID()); updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); @@ -606,8 +679,7 @@ public class TestTaskImpl { mockTask.canCommit(mockTask.getAttemptList().get(0).getID())); updateAttemptState(mockTask.getAttemptList().get(0), TaskAttemptState.SUCCEEDED); - mockTask.handle(new TaskEventTAUpdate(mockTask.getAttemptList().get(0).getID(), - TaskEventType.T_ATTEMPT_SUCCEEDED)); + mockTask.handle(createTaskTASucceededEvent(mockTask.getAttemptList().get(0).getID())); assertTaskSucceededState(); } @@ -620,8 +692,7 @@ public class TestTaskImpl { launchTaskAttempt(mockTask.getLastAttempt().getID()); updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); - mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), - TaskEventType.T_ATTEMPT_SUCCEEDED)); + mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID())); // The task should now have succeeded assertTaskSucceededState(); @@ -644,8 +715,9 @@ public class TestTaskImpl { when(mockTezEvent.getSourceInfo()).thenReturn(meta); TaskAttemptEventOutputFailed outputFailedEvent = new TaskAttemptEventOutputFailed(mockDestId, mockTezEvent, 1); - mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt() - .getID(), TaskEventType.T_ATTEMPT_FAILED, outputFailedEvent)); + mockTask.handle( + createTaskTAFailedEvent(mockTask.getLastAttempt().getID(), TaskFailureType.NON_FATAL, + outputFailedEvent)); // The task should still be in the scheduled state assertTaskScheduledState(); @@ -669,8 +741,7 @@ public class TestTaskImpl { launchTaskAttempt(mockTask.getLastAttempt().getID()); updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); - mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), - TaskEventType.T_ATTEMPT_SUCCEEDED)); + mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID())); // The task should now have succeeded assertTaskSucceededState(); @@ -679,8 +750,7 @@ public class TestTaskImpl { eventHandler.events.clear(); // Now kill the attempt after it has succeeded - mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt() - .getID(), TaskEventType.T_ATTEMPT_KILLED)); + mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID())); // The task should still be in the scheduled state assertTaskScheduledState(); @@ -721,8 +791,7 @@ public class TestTaskImpl { updateAttemptState(firstAttempt, TaskAttemptState.RUNNING); // Add a speculative task attempt - mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), - TaskEventType.T_ADD_SPEC_ATTEMPT)); + mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID())); MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt(); launchTaskAttempt(specAttempt.getID()); updateAttemptState(specAttempt, TaskAttemptState.RUNNING); @@ -730,15 +799,13 @@ public class TestTaskImpl { // Fail the first attempt updateAttemptState(firstAttempt, TaskAttemptState.FAILED); - mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(), - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(createTaskTAFailedEvent(firstAttempt.getID())); assertEquals(TaskState.FAILED, mockTask.getState()); assertEquals(2, mockTask.getAttemptList().size()); // Now fail the speculative attempt updateAttemptState(specAttempt, TaskAttemptState.FAILED); - mockTask.handle(new TaskEventTAUpdate(specAttempt.getID(), - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(createTaskTAFailedEvent(specAttempt.getID())); assertEquals(TaskState.FAILED, mockTask.getState()); assertEquals(2, mockTask.getAttemptList().size()); } @@ -757,8 +824,7 @@ public class TestTaskImpl { updateAttemptState(firstAttempt, TaskAttemptState.RUNNING); // Add a speculative task attempt - mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), - TaskEventType.T_ADD_SPEC_ATTEMPT)); + mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID())); MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt(); launchTaskAttempt(specAttempt.getID()); updateAttemptState(specAttempt, TaskAttemptState.RUNNING); @@ -766,15 +832,13 @@ public class TestTaskImpl { // Fail the first attempt updateAttemptState(firstAttempt, TaskAttemptState.FAILED); - mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(), - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(createTaskTAFailedEvent(firstAttempt.getID())); assertEquals(TaskState.FAILED, mockTask.getState()); assertEquals(2, mockTask.getAttemptList().size()); // Now succeed the speculative attempt updateAttemptState(specAttempt, TaskAttemptState.SUCCEEDED); - mockTask.handle(new TaskEventTAUpdate(specAttempt.getID(), - TaskEventType.T_ATTEMPT_SUCCEEDED)); + mockTask.handle(createTaskTASucceededEvent(specAttempt.getID())); assertEquals(TaskState.FAILED, mockTask.getState()); assertEquals(2, mockTask.getAttemptList().size()); } @@ -788,8 +852,7 @@ public class TestTaskImpl { updateAttemptState(firstAttempt, TaskAttemptState.RUNNING); // Add a speculative task attempt - mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(), - TaskEventType.T_ADD_SPEC_ATTEMPT)); + mockTask.handle(createTaskTAAddSpecAttempt(firstAttempt.getID())); MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt(); launchTaskAttempt(specAttempt.getID()); updateAttemptState(specAttempt, TaskAttemptState.RUNNING); @@ -797,8 +860,7 @@ public class TestTaskImpl { // Have the first task succeed eventHandler.events.clear(); - mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(), - TaskEventType.T_ATTEMPT_SUCCEEDED)); + mockTask.handle(createTaskTASucceededEvent(firstAttempt.getID())); // The task should now have succeeded and sent kill to other attempt assertTaskSucceededState(); @@ -811,8 +873,7 @@ public class TestTaskImpl { ((TaskAttemptEventKillRequest) event).getTaskAttemptID()); // Emulate the spec attempt being killed - mockTask.handle(new TaskEventTAUpdate(specAttempt.getID(), - TaskEventType.T_ATTEMPT_KILLED)); + mockTask.handle(createTaskTAKilledEvent(specAttempt.getID())); assertTaskSucceededState(); // Now fail the attempt after it has succeeded @@ -823,8 +884,7 @@ public class TestTaskImpl { TaskAttemptEventOutputFailed outputFailedEvent = new TaskAttemptEventOutputFailed(mockDestId, mockTezEvent, 1); eventHandler.events.clear(); - mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(), - TaskEventType.T_ATTEMPT_FAILED, outputFailedEvent)); + mockTask.handle(createTaskTAFailedEvent(firstAttempt.getID(), TaskFailureType.NON_FATAL, outputFailedEvent)); // The task should still be in the scheduled state assertTaskScheduledState();
http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 7a20a37..d2d9a07 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -53,8 +53,10 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.counters.Limits; import org.apache.tez.common.counters.TezCounters; -import org.apache.tez.dag.app.rm.AMSchedulerEventType; +import org.apache.tez.dag.app.dag.event.TaskEventTALaunched; +import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded; import org.apache.tez.hadoop.shim.DefaultHadoopShim; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.VertexStatistics; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; @@ -147,7 +149,6 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEvent; import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask; -import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; import org.apache.tez.dag.app.dag.event.TaskEventType; import org.apache.tez.dag.app.dag.event.VertexEvent; import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed; @@ -3472,6 +3473,7 @@ public class TestVertexImpl { ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null)); Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState()); ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, + TaskFailureType.NON_FATAL, "diag", TaskAttemptTerminationCause.APPLICATION_ERROR)); dispatcher.await(); Assert.assertEquals(VertexState.RUNNING, v.getState()); @@ -3506,6 +3508,7 @@ public class TestVertexImpl { Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState()); ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, + TaskFailureType.NON_FATAL, "diag", TaskAttemptTerminationCause.INPUT_READ_ERROR)); dispatcher.await(); Assert.assertEquals(VertexState.RUNNING, v.getState()); @@ -3541,6 +3544,7 @@ public class TestVertexImpl { Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState()); ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, + TaskFailureType.NON_FATAL, "diag", TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR)); dispatcher.await(); Assert.assertEquals(VertexState.RUNNING, v.getState()); @@ -4643,7 +4647,7 @@ public class TestVertexImpl { TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0); TaskImpl task = (TaskImpl)v1.getTask(taskId); task.handle(new TaskEvent(taskId, TaskEventType.T_ATTEMPT_LAUNCHED)); - task.handle(new TaskEventTAUpdate(taskAttemptId, TaskEventType.T_ATTEMPT_SUCCEEDED)); + task.handle(new TaskEventTASucceeded(taskAttemptId)); v1.handle(new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED)); v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED)); dispatcher.await(); @@ -6242,7 +6246,7 @@ public class TestVertexImpl { // at least one task attempt is succeed, otherwise input initialize events won't been handled. dispatcher.getEventHandler().handle(new TaskEvent(t0_v1, TaskEventType.T_ATTEMPT_LAUNCHED)); - dispatcher.getEventHandler().handle(new TaskEventTAUpdate(ta0_t0_v1, TaskEventType.T_ATTEMPT_SUCCEEDED)); + dispatcher.getEventHandler().handle(new TaskEventTASucceeded(ta0_t0_v1)); dispatcher.getEventHandler() .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent))); dispatcher.await(); @@ -6323,10 +6327,8 @@ public class TestVertexImpl { TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0); - dispatcher.getEventHandler().handle(new TaskEventTAUpdate(TezTaskAttemptID.getInstance(t1, 0), - TaskEventType.T_ATTEMPT_LAUNCHED)); - dispatcher.getEventHandler().handle(new TaskEventTAUpdate(TezTaskAttemptID.getInstance(t1, 0), - TaskEventType.T_ATTEMPT_SUCCEEDED)); + dispatcher.getEventHandler().handle(new TaskEventTALaunched(TezTaskAttemptID.getInstance(t1, 0))); + dispatcher.getEventHandler().handle(new TaskEventTASucceeded(TezTaskAttemptID.getInstance(t1, 0))); dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); dispatcher.await(); http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index 38d9935..5e7e906 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -22,6 +22,7 @@ import static org.junit.Assert.fail; import java.nio.ByteBuffer; +import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -60,25 +61,19 @@ import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto; import org.apache.tez.runtime.api.InputSpecUpdate; -import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputDataInformationEvent; -import org.apache.tez.runtime.api.impl.EventMetaData; -import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; import org.apache.tez.runtime.api.impl.EventType; import org.apache.tez.runtime.api.impl.TezEvent; import org.junit.Assert; import org.junit.Test; -import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -494,12 +489,13 @@ public class TestHistoryEventsProtoConversion { logEvents(event, deserializedEvent); } + @SuppressWarnings("deprecation") private void testTaskAttemptFinishedEvent() throws Exception { { TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent( TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1), - "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, + "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, TaskFailureType.FATAL, null, null, null, null, null, 2048, TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 0), 1024, @@ -533,6 +529,8 @@ public class TestHistoryEventsProtoConversion { deserializedEvent.getNodeId()); Assert.assertEquals(event.getNodeHttpAddress(), deserializedEvent.getNodeHttpAddress()); + Assert.assertEquals(event.getTaskFailureType(), + deserializedEvent.getTaskFailureType()); logEvents(event, deserializedEvent); } { @@ -545,7 +543,7 @@ public class TestHistoryEventsProtoConversion { TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent( TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1), - "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, + "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, TaskFailureType.NON_FATAL, TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), events, null, 0, null, 0, ContainerId.newInstance( @@ -575,6 +573,53 @@ public class TestHistoryEventsProtoConversion { Assert.assertEquals(events.size(), event.getDataEvents().size()); Assert.assertEquals(events.get(0).getTimestamp(), event.getDataEvents().get(0).getTimestamp()); Assert.assertEquals(events.get(0).getTaskAttemptId(), event.getDataEvents().get(0).getTaskAttemptId()); + Assert.assertEquals(event.getTaskFailureType(), deserializedEvent.getTaskFailureType()); + logEvents(event, deserializedEvent); + } + { + TezTaskAttemptID taId = + TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( + TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 0), 0); + long timestamp = 1024L; + List<DataEventDependencyInfo> events = Lists.newArrayList(); + events.add(new DataEventDependencyInfo(timestamp, taId)); + events.add(new DataEventDependencyInfo(timestamp, taId)); + TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent( + TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( + TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1), + "vertex1", 10001l, 1000434444l, TaskAttemptState.KILLED, null, + TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), events, + null, 0, null, 0, + ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance( + "host1", 19999), "inProgress", "Completed", "nodeHttpAddress"); + TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent) + testProtoConversion(event); + Assert.assertEquals(event.getTaskAttemptID(), + deserializedEvent.getTaskAttemptID()); + Assert.assertEquals(event.getFinishTime(), + deserializedEvent.getFinishTime()); + Assert.assertEquals(event.getDiagnostics(), + deserializedEvent.getDiagnostics()); + Assert.assertEquals(event.getState(), + deserializedEvent.getState()); + Assert.assertEquals(event.getCounters(), + deserializedEvent.getCounters()); + Assert.assertEquals(event.getContainerId(), + deserializedEvent.getContainerId()); + Assert.assertEquals(event.getNodeId(), + deserializedEvent.getNodeId()); + Assert.assertEquals(event.getNodeHttpAddress(), + deserializedEvent.getNodeHttpAddress()); + Assert.assertEquals(event.getTaskAttemptError(), + deserializedEvent.getTaskAttemptError()); + Assert.assertEquals(events.size(), event.getDataEvents().size()); + Assert + .assertEquals(events.get(0).getTimestamp(), event.getDataEvents().get(0).getTimestamp()); + Assert.assertEquals(events.get(0).getTaskAttemptId(), + event.getDataEvents().get(0).getTaskAttemptId()); + Assert.assertEquals(event.getTaskFailureType(), deserializedEvent.getTaskFailureType()); logEvents(event, deserializedEvent); } } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java index ea683f7..5c596c5 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java @@ -164,7 +164,7 @@ public class TestHistoryEventJsonConversion { break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), - random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, + random.nextInt(), TaskAttemptState.KILLED, null, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null, null, null, 0, null, 0, containerId, nodeId, null, null, "nodeHttpAddress"); break; http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java index 5ae416a..9fe3ee0 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.List; import com.google.common.collect.Lists; +import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -50,16 +51,23 @@ public class TestUmbilical implements TezUmbilical { } } - public List<TezEvent> getEvents() { - return this.events; + @Override + public void signalFailure(TezTaskAttemptID taskAttemptID, TaskFailureType taskFailureType, Throwable t, + String message, EventMetaData sourceInfo) { + LOG.info("Received failure from task: " + taskAttemptID + + ", Message: " + message + + ", taskFailureType=" + taskFailureType); } @Override - public void signalFatalError(TezTaskAttemptID taskAttemptID, Throwable t, - String message, EventMetaData sourceInfo) { - LOG.info("Received fatal error from task: " + taskAttemptID + public void signalKillSelf(TezTaskAttemptID taskAttemptID, Throwable t, String message, + EventMetaData sourceInfo) { + LOG.info("Received kill from task: " + taskAttemptID + ", Message: " + message); + } + public List<TezEvent> getEvents() { + return this.events; } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index 26d4d98..1e77ce8 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -448,6 +448,9 @@ public class HistoryEventTimelineConversion { atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); + if (event.getTaskFailureType() != null) { + atsEntity.addOtherInfo(ATSConstants.TASK_FAILURE_TYPE, event.getTaskFailureType().name()); + } atsEntity.addOtherInfo(ATSConstants.CREATION_TIME, event.getCreationTime()); atsEntity.addOtherInfo(ATSConstants.ALLOCATION_TIME, event.getAllocationTime()); atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index c5badaa..abfd757 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -31,7 +31,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; -import org.apache.tez.client.CallerContext; import org.apache.tez.common.ATSConstants; import org.apache.tez.common.VersionInfo; import org.apache.tez.common.counters.TezCounters; @@ -83,6 +82,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.TaskFailureType; import org.codehaus.jettison.json.JSONException; import org.junit.Assert; import org.junit.Before; @@ -183,7 +183,7 @@ public class TestHistoryEventTimelineConversion { break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), - random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, + random.nextInt(), TaskAttemptState.FAILED, TaskFailureType.NON_FATAL, TaskAttemptTerminationCause.OUTPUT_LOST, null, null, null, null, 0, null, 0, containerId, nodeId, null, null, "nodeHttpAddress"); break; @@ -519,7 +519,7 @@ public class TestHistoryEventTimelineConversion { events.add(new DataEventDependencyInfo(lastDataEventTime, tezTaskAttemptID)); TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName, - startTime, finishTime, state, error, diagnostics, counters, events, null, creationTime, + startTime, finishTime, state, TaskFailureType.FATAL, error, diagnostics, counters, events, null, creationTime, tezTaskAttemptID, allocationTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress"); TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId()); @@ -543,7 +543,7 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals(finishTime, evt.getTimestamp()); final Map<String, Object> otherInfo = timelineEntity.getOtherInfo(); - Assert.assertEquals(16, otherInfo.size()); + Assert.assertEquals(17, otherInfo.size()); Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getOtherInfo().get(ATSConstants.CREATION_CAUSAL_ATTEMPT)); Assert.assertEquals(creationTime, timelineEntity.getOtherInfo().get(ATSConstants.CREATION_TIME)); @@ -552,6 +552,7 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME)); Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN)); Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS)); + Assert.assertEquals(TaskFailureType.FATAL.name(), otherInfo.get(ATSConstants.TASK_FAILURE_TYPE)); Assert.assertEquals(error.name(), otherInfo.get(ATSConstants.TASK_ATTEMPT_ERROR_ENUM)); Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS)); Map<String, Object> obj1 = (Map<String, Object>)otherInfo.get(ATSConstants.LAST_DATA_EVENTS); @@ -565,6 +566,17 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals(nodeId.toString(), otherInfo.get(ATSConstants.NODE_ID)); Assert.assertEquals(containerId.toString(), otherInfo.get(ATSConstants.CONTAINER_ID)); Assert.assertEquals("nodeHttpAddress", otherInfo.get(ATSConstants.NODE_HTTP_ADDRESS)); + + TaskAttemptFinishedEvent eventWithNullFailureType = + new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName, + startTime, finishTime, state, null, error, diagnostics, counters, events, null, + creationTime, + tezTaskAttemptID, allocationTime, containerId, nodeId, "inProgressURL", "logsURL", + "nodeHttpAddress"); + TimelineEntity timelineEntityWithNullFailureType = + HistoryEventTimelineConversion.convertToTimelineEntity(eventWithNullFailureType); + Assert.assertNull( + timelineEntityWithNullFailureType.getOtherInfo().get(ATSConstants.TASK_FAILURE_TYPE)); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/pom.xml ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/pom.xml b/tez-runtime-internals/pom.xml index 86869e3..bd90c77 100644 --- a/tez-runtime-internals/pom.xml +++ b/tez-runtime-internals/pom.xml @@ -105,7 +105,7 @@ <source> <directory>${basedir}/src/main/proto</directory> <includes> - <include>Events.proto</include> + <include>RuntimeEvents.proto</include> </includes> </source> <output>${project.build.directory}/generated-sources/java</output> http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/common/TezConverterUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/TezConverterUtils.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/TezConverterUtils.java index d4d7ca9..02dc69c 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/common/TezConverterUtils.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/TezConverterUtils.java @@ -24,12 +24,15 @@ import java.net.URISyntaxException; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.URL; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.runtime.api.TaskFailureType; +import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskFailureTypeProto; public class TezConverterUtils { /** * return a {@link URI} from a given url - * + * * @param url * url to convert * @return path from {@link URL} @@ -59,7 +62,30 @@ public class TezConverterUtils { return new TezLocalResource(getURIFromYarnURL(lr.getResource()), lr.getSize(), lr.getTimestamp()); } - + + public static TaskFailureType failureTypeFromProto(TaskFailureTypeProto proto) { + switch (proto) { + case FT_NON_FATAL: + return TaskFailureType.NON_FATAL; + case FT_FATAL: + return TaskFailureType.FATAL; + default: + throw new TezUncheckedException("Unknown FailureTypeProto: " + proto); + } + } + + public static TaskFailureTypeProto failureTypeToProto(TaskFailureType taskFailureType) { + switch (taskFailureType) { + + case NON_FATAL: + return TaskFailureTypeProto.FT_NON_FATAL; + case FATAL: + return TaskFailureTypeProto.FT_FATAL; + default: + throw new TezUncheckedException("Unknown FailureType: " + taskFailureType); + } + } + // @Private // public static void writeLocalResource(LocalResource lr, DataOutput out) // throws IOException { http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 07f92c2..a31136b 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -46,6 +46,7 @@ import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.tez.hadoop.shim.HadoopShim; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.TaskContext; import org.apache.tez.runtime.api.impl.TezProcessorContextImpl; import org.slf4j.Logger; @@ -386,6 +387,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } finally { setTaskDone(); + // Clear the interrupt status since the task execution is done. + Thread.interrupted(); if (eventRouterThread != null) { eventRouterThread.interrupt(); LOG.info("Joining on EventRouter"); @@ -719,13 +722,16 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } } catch (Throwable t) { LOG.warn("Failed to handle event", t); - setFatalError(t, "Failed to handle event"); + registerError(); EventMetaData sourceInfo = new EventMetaData( e.getDestinationInfo().getEventGenerator(), taskSpec.getVertexName(), e.getDestinationInfo().getEdgeVertexName(), getTaskAttemptID()); setFrameworkCounters(); - tezUmbilical.signalFatalError(getTaskAttemptID(), + // Signal such errors as RETRIABLE. The user code has an option to report this as something + // other than retriable before we get control back. + // TODO: Don't catch Throwables. + tezUmbilical.signalFailure(getTaskAttemptID(), TaskFailureType.NON_FATAL, t, ExceptionUtils.getStackTrace(t), sourceInfo); return false; } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java index 59c8104..7b86d4b 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java @@ -38,9 +38,7 @@ import com.google.common.collect.Maps; public abstract class RuntimeTask { - protected AtomicBoolean hasFatalError = new AtomicBoolean(false); - protected AtomicReference<Throwable> fatalError = new AtomicReference<Throwable>(); - protected String fatalErrorMessage = null; + protected final AtomicBoolean errorReported = new AtomicBoolean(false); protected float progress; protected final TezCounters tezCounters; private final Map<String, TezCounters> counterMap = Maps.newConcurrentMap(); @@ -99,10 +97,8 @@ public abstract class RuntimeTask { return taskSpec.getVertexName(); } - public void setFatalError(Throwable t, String message) { - hasFatalError.set(true); - this.fatalError.set(t); - this.fatalErrorMessage = message; + public void registerError() { + errorReported.set(true); } public final void notifyProgressInvocation() { @@ -113,13 +109,9 @@ public abstract class RuntimeTask { boolean retVal = progressNotified.getAndSet(false); return retVal; } - - public Throwable getFatalError() { - return this.fatalError.get(); - } - public boolean hadFatalError() { - return hasFatalError.get(); + public boolean wasErrorReported() { + return errorReported.get(); } public synchronized void setProgress(float progress) { http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptFailedEvent.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptFailedEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptFailedEvent.java index 935fdbb..5606663 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptFailedEvent.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptFailedEvent.java @@ -18,18 +18,25 @@ package org.apache.tez.runtime.api.events; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.Event; public class TaskAttemptFailedEvent extends Event { private final String diagnostics; + private final TaskFailureType taskFailureType; - public TaskAttemptFailedEvent(String diagnostics) { + public TaskAttemptFailedEvent(String diagnostics, TaskFailureType taskFailureType) { this.diagnostics = diagnostics; + this.taskFailureType = taskFailureType; } public String getDiagnostics() { return diagnostics; } + public TaskFailureType getTaskFailureType() { + return taskFailureType; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptKilledEvent.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptKilledEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptKilledEvent.java new file mode 100644 index 0000000..3f5b326 --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptKilledEvent.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.api.events; + +import org.apache.tez.runtime.api.Event; + +public class TaskAttemptKilledEvent extends Event { + private final String diagnostics; + + public TaskAttemptKilledEvent(String diagnostics) { + this.diagnostics = diagnostics; + } + + public String getDiagnostics() { + return diagnostics; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java index 6d4c902..cb247c9 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java @@ -21,6 +21,7 @@ package org.apache.tez.runtime.api.impl; public enum EventType { TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT, + TASK_ATTEMPT_KILLED_EVENT, DATA_MOVEMENT_EVENT, INPUT_READ_ERROR_EVENT, INPUT_FAILED_EVENT, http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java index 63e2b86..b3ce8c4 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java @@ -24,6 +24,7 @@ import java.io.IOException; import org.apache.hadoop.io.Writable; import org.apache.tez.common.ProtoConverters; +import org.apache.tez.common.TezConverterUtils; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; @@ -41,10 +42,12 @@ import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.events.InputInitializerEvent; import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; +import org.apache.tez.runtime.api.events.TaskAttemptKilledEvent; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptCompletedEventProto; import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptFailedEventProto; +import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptKilledEventProto; public class TezEvent implements Writable { @@ -79,6 +82,8 @@ public class TezEvent implements Writable { eventType = EventType.INPUT_READ_ERROR_EVENT; } else if (event instanceof TaskAttemptFailedEvent) { eventType = EventType.TASK_ATTEMPT_FAILED_EVENT; + } else if (event instanceof TaskAttemptKilledEvent) { + eventType = EventType.TASK_ATTEMPT_KILLED_EVENT; } else if (event instanceof TaskAttemptCompletedEvent) { eventType = EventType.TASK_ATTEMPT_COMPLETED_EVENT; } else if (event instanceof InputFailedEvent) { @@ -168,8 +173,14 @@ public class TezEvent implements Writable { TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event; eventBytes = TaskAttemptFailedEventProto.newBuilder() .setDiagnostics(tfEvt.getDiagnostics()) + .setTaskFailureType(TezConverterUtils.failureTypeToProto(tfEvt.getTaskFailureType())) .build().toByteArray(); break; + case TASK_ATTEMPT_KILLED_EVENT: + TaskAttemptKilledEvent tkEvent = (TaskAttemptKilledEvent) event; + eventBytes = TaskAttemptKilledEventProto.newBuilder() + .setDiagnostics(tkEvent.getDiagnostics()).build().toByteArray(); + break; case TASK_ATTEMPT_COMPLETED_EVENT: eventBytes = TaskAttemptCompletedEventProto.newBuilder() .build().toByteArray(); @@ -236,7 +247,12 @@ public class TezEvent implements Writable { case TASK_ATTEMPT_FAILED_EVENT: TaskAttemptFailedEventProto tfProto = TaskAttemptFailedEventProto.parseFrom(eventBytes); - event = new TaskAttemptFailedEvent(tfProto.getDiagnostics()); + event = new TaskAttemptFailedEvent(tfProto.getDiagnostics(), + TezConverterUtils.failureTypeFromProto(tfProto.getTaskFailureType())); + break; + case TASK_ATTEMPT_KILLED_EVENT: + TaskAttemptKilledEventProto tkProto = TaskAttemptKilledEventProto.parseFrom(eventBytes); + event = new TaskAttemptKilledEvent(tkProto.getDiagnostics()); break; case TASK_ATTEMPT_COMPLETED_EVENT: event = new TaskAttemptCompletedEvent(); @@ -293,4 +309,12 @@ public class TezEvent implements Writable { } } + @Override + public String toString() { + return "TezEvent{" + + "eventType=" + eventType + + ", sourceInfo=" + sourceInfo + + ", destinationInfo=" + destinationInfo + + '}'; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java index 4431150..afb78d9 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java @@ -39,6 +39,7 @@ import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.InputReadyTracker; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.InputStatisticsReporter; @@ -152,12 +153,24 @@ public class TezInputContextImpl extends TezTaskContextImpl return sourceVertexName; } + @SuppressWarnings("deprecation") @Override public void fatalError(Throwable exception, String message) { super.signalFatalError(exception, message, sourceInfo); } @Override + public void reportFailure(TaskFailureType taskFailureType, @Nullable Throwable exception, + @Nullable String message) { + super.signalFailure(taskFailureType, exception, message, sourceInfo); + } + + @Override + public void killSelf(@Nullable Throwable exception, @Nullable String message) { + super.signalKillSelf(exception, message, sourceInfo); + } + + @Override public void inputIsReady() { if (inputReadyTracker != null) { inputReadyTracker.setInputIsReady(inputs.get(sourceVertexName)); http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java index 1e5b6a5..1bd78d3 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java @@ -38,6 +38,7 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.ObjectRegistry; @@ -137,12 +138,24 @@ public class TezOutputContextImpl extends TezTaskContextImpl return destinationVertexName; } + @SuppressWarnings("deprecation") @Override public void fatalError(Throwable exception, String message) { super.signalFatalError(exception, message, sourceInfo); } @Override + public void reportFailure(TaskFailureType taskFailureType, @Nullable Throwable exception, + @Nullable String message) { + super.signalFailure(taskFailureType, exception, message, sourceInfo); + } + + @Override + public void killSelf(@Nullable Throwable exception, @Nullable String message) { + super.signalKillSelf(exception, message, sourceInfo); + } + + @Override public int getOutputIndex() { return outputIndex; } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java index 0c3283d..607bbf1 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java @@ -22,7 +22,6 @@ import com.google.common.base.Preconditions; import static com.google.common.base.Preconditions.checkNotNull; -import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -33,12 +32,12 @@ import java.util.Map; import javax.annotation.Nullable; import org.apache.hadoop.conf.Configuration; -import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.InputReadyTracker; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.Input; @@ -97,12 +96,24 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce notifyProgress(); } + @SuppressWarnings("deprecation") @Override public void fatalError(Throwable exception, String message) { super.signalFatalError(exception, message, sourceInfo); } @Override + public void reportFailure(TaskFailureType taskFailureType, @Nullable Throwable exception, + @Nullable String message) { + super.signalFailure(taskFailureType, exception, message, sourceInfo); + } + + @Override + public void killSelf(@Nullable Throwable exception, @Nullable String message) { + super.signalKillSelf(exception, message, sourceInfo); + } + + @Override public boolean canCommit() throws IOException { return tezUmbilical.canCommit(this.taskAttemptID); } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java index c12b334..35abd1e 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java @@ -37,6 +37,7 @@ import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.EntityDescriptor; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.MemoryUpdateCallback; import org.apache.tez.runtime.api.ObjectRegistry; @@ -214,9 +215,21 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable { } protected void signalFatalError(Throwable t, String message, EventMetaData sourceInfo) { + signalFailure(TaskFailureType.NON_FATAL, t, message, sourceInfo); + } + + protected void signalFailure(TaskFailureType taskFailureType, Throwable t, + String message, EventMetaData sourceInfo) { + Preconditions.checkNotNull(taskFailureType, "TaskFailureType must be specified"); + runtimeTask.setFrameworkCounters(); + runtimeTask.registerError(); + tezUmbilical.signalFailure(taskAttemptID, taskFailureType, t, message, sourceInfo); + } + + protected void signalKillSelf(Throwable t, String message, EventMetaData sourceInfo) { runtimeTask.setFrameworkCounters(); - runtimeTask.setFatalError(t, message); - tezUmbilical.signalFatalError(taskAttemptID, t, message, sourceInfo); + runtimeTask.registerError(); + tezUmbilical.signalKillSelf(taskAttemptID, t, message, sourceInfo); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezUmbilical.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezUmbilical.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezUmbilical.java index b45a9b2..b606dea 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezUmbilical.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezUmbilical.java @@ -21,15 +21,20 @@ package org.apache.tez.runtime.api.impl; import java.io.IOException; import java.util.Collection; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.TaskFailureType; +@Private public interface TezUmbilical { - public void addEvents(Collection<TezEvent> events); + void addEvents(Collection<TezEvent> events); - public void signalFatalError(TezTaskAttemptID taskAttemptID, Throwable t, String message, - EventMetaData sourceInfo); + void signalFailure(TezTaskAttemptID taskAttemptID, TaskFailureType taskFailureType, Throwable t, String message, + EventMetaData sourceInfo); - public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException; + void signalKillSelf(TezTaskAttemptID taskAttemptID, Throwable t, String message, EventMetaData sourceInfo); + + boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException; } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java index 9a5a3ab..b7d5fb5 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java @@ -20,6 +20,7 @@ import java.util.Collection; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.RuntimeTask; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.task.ErrorReporter; @@ -32,9 +33,15 @@ public interface TaskReporterInterface { boolean taskSucceeded(TezTaskAttemptID taskAttemptId) throws IOException, TezException; - boolean taskFailed(TezTaskAttemptID taskAttemptId, Throwable cause, String diagnostics, EventMetaData srcMeta) throws IOException, + boolean taskFailed(TezTaskAttemptID taskAttemptId, + TaskFailureType taskFailureType, + Throwable cause, + String diagnostics, EventMetaData srcMeta) throws IOException, TezException; + boolean taskKilled(TezTaskAttemptID taskAttemtpId, Throwable cause, String diagnostics, + EventMetaData srcMeta) throws IOException, TezException; + void addEvents(TezTaskAttemptID taskAttemptId, Collection<TezEvent> events); boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException; http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java index 8dc7a87..c2395e1 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java @@ -17,7 +17,8 @@ package org.apache.tez.runtime.task; public enum EndReason { SUCCESS(false), CONTAINER_STOP_REQUESTED(false), - KILL_REQUESTED(true), + KILL_REQUESTED(true), // External kill request + TASK_KILL_REQUEST(false), // Kill request originating from the task itself (self-kill) COMMUNICATION_FAILURE(false), TASK_ERROR(false); http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java index e5370d4..d1c1471 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java @@ -39,8 +39,10 @@ import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.RuntimeTask; +import org.apache.tez.runtime.api.*; import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; +import org.apache.tez.runtime.api.events.TaskAttemptKilledEvent; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.TaskStatistics; @@ -116,7 +118,6 @@ public class TaskReporter implements TaskReporterInterface { public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) { currentCallable.markComplete(); currentCallable = null; - // KKK Make sure the callable completes before proceeding } @Override @@ -184,7 +185,7 @@ public class TaskReporter implements TaskReporterInterface { @Override public Boolean call() throws Exception { // Heartbeat only for active tasks. Errors, etc will be reported directly. - while (!task.isTaskDone() && !task.hadFatalError()) { + while (!task.isTaskDone() && !task.wasErrorReported()) { ResponseWrapper response = heartbeat(null); if (response.shouldDie) { @@ -209,7 +210,7 @@ public class TaskReporter implements TaskReporterInterface { int pendingEventCount = eventsToSend.size(); if (pendingEventCount > 0) { // This is OK because the pending events will be sent via the succeeded/failed messages. - // TaskDone is set before taskSucceeded / taskFailed are sent out - which is what causes the + // TaskDone is set before taskSucceeded / taskTerminated are sent out - which is what causes the // thread to exit. LOG.warn("Exiting TaskReporter thread with pending queue size=" + pendingEventCount); } @@ -235,7 +236,7 @@ public class TaskReporter implements TaskReporterInterface { List<TezEvent> events = new ArrayList<TezEvent>(); eventsToSend.drainTo(events); - if (!task.isTaskDone() && !task.hadFatalError()) { + if (!task.isTaskDone() && !task.wasErrorReported()) { boolean sendCounters = false; /** * Increasing the heartbeat interval can delay the delivery of events. Sending just updated @@ -281,7 +282,7 @@ public class TaskReporter implements TaskReporterInterface { // The same umbilical is used by multiple tasks. Problematic in the case where multiple tasks // are running using the same umbilical. int numEventsReceived = 0; - if (task.isTaskDone() || task.hadFatalError()) { + if (task.isTaskDone() || task.wasErrorReported()) { if (response.getEvents() != null && !response.getEvents().isEmpty()) { LOG.info("Current task already complete, Ignoring all events in" + " heartbeat response, eventCount=" + response.getEvents().size()); @@ -364,6 +365,8 @@ public class TaskReporter implements TaskReporterInterface { /** * Sends out final events for task failure. * @param taskAttemptID + * @param isKilled + * @param taskFailureType * @param t * @param diagnostics * @param srcMeta @@ -373,8 +376,9 @@ public class TaskReporter implements TaskReporterInterface { * @throws TezException * indicates an exception somewhere in the AM. */ - private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, - EventMetaData srcMeta) throws IOException, TezException { + private boolean taskTerminated(TezTaskAttemptID taskAttemptID, boolean isKilled, TaskFailureType taskFailureType, + Throwable t, String diagnostics, + EventMetaData srcMeta) throws IOException, TezException { // Ensure only one final event is ever sent. if (!finalEventQueued.getAndSet(true)) { List<TezEvent> tezEvents = new ArrayList<TezEvent>(); @@ -383,13 +387,19 @@ public class TaskReporter implements TaskReporterInterface { } else { diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t); } - tezEvents.add(new TezEvent(new TaskAttemptFailedEvent(diagnostics), - srcMeta == null ? updateEventMetadata : srcMeta)); + if (isKilled) { + tezEvents.add(new TezEvent(new TaskAttemptKilledEvent(diagnostics), + srcMeta == null ? updateEventMetadata : srcMeta)); + } else { + tezEvents.add(new TezEvent(new TaskAttemptFailedEvent(diagnostics, + taskFailureType), + srcMeta == null ? updateEventMetadata : srcMeta)); + } try { tezEvents.add(new TezEvent(getStatusUpdateEvent(true), updateEventMetadata)); } catch (Exception e) { // Counter may exceed limitation - LOG.warn("Error when get constructing TaskStatusUpdateEvent"); + LOG.warn("Error when get constructing TaskStatusUpdateEvent. Not sending it out"); } return !heartbeat(tezEvents).shouldDie; } else { @@ -432,9 +442,18 @@ public class TaskReporter implements TaskReporterInterface { } @Override - public synchronized boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, - EventMetaData srcMeta) throws IOException, TezException { - return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta); + public synchronized boolean taskFailed(TezTaskAttemptID taskAttemptID, + TaskFailureType taskFailureType, + Throwable t, String diagnostics, + EventMetaData srcMeta) throws IOException, + TezException { + return currentCallable.taskTerminated(taskAttemptID, false, taskFailureType, t, diagnostics, srcMeta); + } + + @Override + public boolean taskKilled(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, + EventMetaData srcMeta) throws IOException, TezException { + return currentCallable.taskTerminated(taskAttemptID, true, null, t, diagnostics, srcMeta); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java index ffbed8c..8e634fa 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java @@ -40,6 +40,7 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas private final LogicalIOProcessorRuntimeTask task; private final UserGroupInformation ugi; private final AtomicBoolean stopRequested = new AtomicBoolean(false); + private final AtomicBoolean interruptAttempted = new AtomicBoolean(false); private volatile Thread ownThread; @@ -116,11 +117,17 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas } - public void interruptTask() { - // Ensure the task is only interrupted once. + public void abortTask() { if (!stopRequested.getAndSet(true)) { task.abortTask(); - if (ownThread != null) { + } + } + + public void interruptTask() { + if (!interruptAttempted.getAndSet(true)) { + LogicalIOProcessorRuntimeTask localTask = task; + // Send an interrupt only if the task is not done. + if (ownThread != null && (localTask != null && !localTask.isTaskDone())) { ownThread.interrupt(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java index 07b32ce..bf30615 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java @@ -14,21 +14,30 @@ package org.apache.tez.runtime.task; +import org.apache.tez.runtime.api.TaskFailureType; + public class TaskRunner2Result { final EndReason endReason; + final TaskFailureType taskFailureType; final Throwable error; final boolean containerShutdownRequested; - public TaskRunner2Result(EndReason endReason, Throwable error, boolean containerShutdownRequested) { + public TaskRunner2Result(EndReason endReason, TaskFailureType taskFailureType, + Throwable error, boolean containerShutdownRequested) { this.endReason = endReason; this.error = error; this.containerShutdownRequested = containerShutdownRequested; + this.taskFailureType = taskFailureType; } public EndReason getEndReason() { return endReason; } + public TaskFailureType getTaskFailureType() { + return taskFailureType; + } + public Throwable getError() { return error; } @@ -39,10 +48,15 @@ public class TaskRunner2Result { @Override public String toString() { - return "TaskRunner2Result{" + - "endReason=" + endReason + - ", error=" + error + - ", containerShutdownRequested=" + containerShutdownRequested + - '}'; + StringBuilder sb = new StringBuilder(); + sb.append("TaskRunner2Result{"); + sb.append("endReason=").append(endReason); + sb.append(", containerShutdownRequested=").append(containerShutdownRequested); + if (endReason != EndReason.SUCCESS) { + sb.append(", failureType=").append(taskFailureType); + sb.append(", error=").append(error); + } + sb.append("}"); + return sb.toString(); } }
