Repository: tez Updated Branches: refs/heads/master 5218f481d -> ba6d7e0e9
TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException: Invalid event: T_ATTEMPT_KILLED at KILLED. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ba6d7e0e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ba6d7e0e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ba6d7e0e Branch: refs/heads/master Commit: ba6d7e0e9106ad26c48bda8bb8caa7e4a890c2b1 Parents: 5218f48 Author: Hitesh Shah <[email protected]> Authored: Mon May 4 12:20:46 2015 -0700 Committer: Hitesh Shah <[email protected]> Committed: Mon May 4 12:20:46 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/dag/app/dag/impl/TaskImpl.java | 21 ++++++-- .../tez/dag/app/dag/impl/TestTaskImpl.java | 53 +++++++++++++++++++- 3 files changed, 72 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/ba6d7e0e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6c19770..0027e98 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -335,6 +335,8 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException: + Invalid event: T_ATTEMPT_KILLED at KILLED. TEZ-2397. Translation of LocalResources via Tez plan serialization can be lossy. TEZ-2221. VertexGroup name should be unqiue TEZ-1521. VertexDataMovementEventsGeneratedEvent may be logged twice in recovery log http://git-wip-us.apache.org/repos/asf/tez/blob/ba6d7e0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 8b63734..15382a8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -117,7 +117,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { protected final EventHandler eventHandler; private final TezTaskID taskId; private Map<TezTaskAttemptID, TaskAttempt> attempts; - private final int maxFailedAttempts; + protected final int maxFailedAttempts; protected final Clock clock; private final Vertex vertex; private final Lock readLock; @@ -256,14 +256,29 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { EnumSet.of( TaskEventType.T_TERMINATE, TaskEventType.T_SCHEDULE, - TaskEventType.T_ADD_SPEC_ATTEMPT)) + TaskEventType.T_ADD_SPEC_ATTEMPT, + TaskEventType.T_ATTEMPT_KILLED)) // Transitions from KILLED state + // Ignorable event: T_ATTEMPT_KILLED + // Refer to TEZ-2379 + // T_ATTEMPT_KILLED can show up in KILLED state as + // a SUCCEEDED attempt can still transition to KILLED after receiving + // a KILL event. + // This could happen when there is a race where the task receives a + // kill event, it tries to kill all running attempts and a potential + // running attempt succeeds before it receives the kill event. + // The task will then receive both a SUCCEEDED and KILLED + // event from the same attempt. + // Duplicate events from a single attempt in KILL_WAIT are handled + // properly. However, the subsequent T_ATTEMPT_KILLED event might + // be received after the task reaches its terminal KILLED state. .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED, EnumSet.of( TaskEventType.T_TERMINATE, TaskEventType.T_SCHEDULE, - TaskEventType.T_ADD_SPEC_ATTEMPT)) + TaskEventType.T_ADD_SPEC_ATTEMPT, + TaskEventType.T_ATTEMPT_KILLED)) // create the topology tables .installTopology(); http://git-wip-us.apache.org/repos/asf/tez/blob/ba6d7e0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index 66e6724..9da3fab 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -177,6 +177,12 @@ public class TestTaskImpl { assertTaskKillWaitState(); } + private void failTask(TezTaskID taskId) { + mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null)); + assertTaskKillWaitState(); + } + + private void killScheduledTaskAttempt(TezTaskAttemptID attemptId) { mockTask.handle(new TaskEventTAUpdate(attemptId, TaskEventType.T_ATTEMPT_KILLED)); @@ -289,7 +295,6 @@ public class TestTaskImpl { killTask(taskId); mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), TaskEventType.T_ATTEMPT_KILLED)); - assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState()); verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED); } @@ -377,6 +382,52 @@ public class TestTaskImpl { killRunningTaskAttempt(mockTask.getLastAttempt().getID()); } + /** + * {@link TaskState#KILLED}->{@link TaskState#KILLED} + */ + @Test(timeout = 5000) + public void testKilledAttemptAtTaskKilled() { + LOG.info("--- START: testKilledAttemptAtTaskKilled ---"); + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + launchTaskAttempt(mockTask.getLastAttempt().getID()); + killTask(taskId); + mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), + TaskEventType.T_ATTEMPT_KILLED)); + assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState()); + + // Send duplicate kill for same attempt + // This will not happen in practice but this is to simulate handling + // of killed attempts in killed state. + mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), + TaskEventType.T_ATTEMPT_KILLED)); + assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState()); + + } + + /** + * {@link TaskState#FAILED}->{@link TaskState#FAILED} + */ + @Test(timeout = 5000) + public void testKilledAttemptAtTaskFailed() { + LOG.info("--- START: testKilledAttemptAtTaskFailed ---"); + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + for (int i = 0; i < mockTask.maxFailedAttempts; ++i) { + mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), + TaskEventType.T_ATTEMPT_FAILED)); + } + assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState()); + + // Send kill for an attempt + mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), + TaskEventType.T_ATTEMPT_KILLED)); + assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState()); + + } + + + @Test(timeout = 5000) public void testFetchedEventsModifyUnderlyingList() { // Tests to ensure that adding an event to a task, does not affect the
