TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED (jlowe)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ca447ba5 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ca447ba5 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ca447ba5 Branch: refs/heads/TEZ-2980 Commit: ca447ba5c940d0ec8520166646695c49f2cd9dc3 Parents: 73e993c Author: Jason Lowe <[email protected]> Authored: Thu Jan 21 18:57:52 2016 +0000 Committer: Jason Lowe <[email protected]> Committed: Thu Jan 21 18:57:52 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/dag/app/dag/impl/TaskImpl.java | 4 +- .../tez/dag/app/dag/impl/TestTaskImpl.java | 73 ++++++++++++++++++++ 3 files changed, 78 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/ca447ba5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5d2c446..bec7dd4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.8.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED TEZ-2594. Fix LICENSE for missing entries for full and minimal tarballs. TEZ-3053. Containers timeout if they do not receive a task within the container timeout interval. TEZ-2898. tez tools : swimlanes.py is broken. @@ -318,6 +319,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES + TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED TEZ-2937. Can Processor.close() be called after closing inputs and outputs? TEZ-3037. History URL should be set regardless of which history logging service is enabled. TEZ-3032. DAG start time getting logged using system time instead of recorded time in startTime field. http://git-wip-us.apache.org/repos/asf/tez/blob/ca447ba5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index c00d674..9ec7ce8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -256,7 +256,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { TaskEventType.T_TERMINATE, TaskEventType.T_SCHEDULE, TaskEventType.T_ADD_SPEC_ATTEMPT, - TaskEventType.T_ATTEMPT_KILLED)) + TaskEventType.T_ATTEMPT_FAILED, + TaskEventType.T_ATTEMPT_KILLED, + TaskEventType.T_ATTEMPT_SUCCEEDED)) // Transitions from KILLED state // Ignorable event: T_ATTEMPT_KILLED http://git-wip-us.apache.org/repos/asf/tez/blob/ca447ba5/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index 0414c99..1274378 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.dag.api.TaskLocationHint; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.app.AppContext; @@ -661,6 +662,78 @@ public class TestTaskImpl { assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN.name())); } + @Test(timeout = 20000) + public void testFailedThenSpeculativeFailed() { + conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1); + mockTask = new MockTaskImpl(vertexId, partition, + eventHandler, conf, taskCommunicatorManagerInterface, clock, + taskHeartbeatHandler, appContext, leafVertex, + taskResource, containerContext, mock(Vertex.class)); + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt(); + launchTaskAttempt(firstAttempt.getID()); + updateAttemptState(firstAttempt, TaskAttemptState.RUNNING); + + // Add a speculative task attempt + mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt(); + launchTaskAttempt(specAttempt.getID()); + updateAttemptState(specAttempt, TaskAttemptState.RUNNING); + assertEquals(2, mockTask.getAttemptList().size()); + + // Fail the first attempt + updateAttemptState(firstAttempt, TaskAttemptState.FAILED); + mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(), + TaskEventType.T_ATTEMPT_FAILED)); + assertEquals(TaskState.FAILED, mockTask.getState()); + assertEquals(2, mockTask.getAttemptList().size()); + + // Now fail the speculative attempt + updateAttemptState(specAttempt, TaskAttemptState.FAILED); + mockTask.handle(new TaskEventTAUpdate(specAttempt.getID(), + TaskEventType.T_ATTEMPT_FAILED)); + assertEquals(TaskState.FAILED, mockTask.getState()); + assertEquals(2, mockTask.getAttemptList().size()); + } + + @Test(timeout = 20000) + public void testFailedThenSpeculativeSucceeded() { + conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1); + mockTask = new MockTaskImpl(vertexId, partition, + eventHandler, conf, taskCommunicatorManagerInterface, clock, + taskHeartbeatHandler, appContext, leafVertex, + taskResource, containerContext, mock(Vertex.class)); + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt(); + launchTaskAttempt(firstAttempt.getID()); + updateAttemptState(firstAttempt, TaskAttemptState.RUNNING); + + // Add a speculative task attempt + mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt(); + launchTaskAttempt(specAttempt.getID()); + updateAttemptState(specAttempt, TaskAttemptState.RUNNING); + assertEquals(2, mockTask.getAttemptList().size()); + + // Fail the first attempt + updateAttemptState(firstAttempt, TaskAttemptState.FAILED); + mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(), + TaskEventType.T_ATTEMPT_FAILED)); + assertEquals(TaskState.FAILED, mockTask.getState()); + assertEquals(2, mockTask.getAttemptList().size()); + + // Now succeed the speculative attempt + updateAttemptState(specAttempt, TaskAttemptState.SUCCEEDED); + mockTask.handle(new TaskEventTAUpdate(specAttempt.getID(), + TaskEventType.T_ATTEMPT_SUCCEEDED)); + assertEquals(TaskState.FAILED, mockTask.getState()); + assertEquals(2, mockTask.getAttemptList().size()); + } + // TODO Add test to validate the correct commit attempt.
