Repository: tez Updated Branches: refs/heads/branch-0.5 0bceb8e49 -> bac43ea84
TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException: Invalid event: T_ATTEMPT_KILLED at KILLED. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bac43ea8 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bac43ea8 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bac43ea8 Branch: refs/heads/branch-0.5 Commit: bac43ea84281f4a928fb7f9bd4887817f55f4e73 Parents: 0bceb8e Author: Hitesh Shah <[email protected]> Authored: Mon May 4 12:39:20 2015 -0700 Committer: Hitesh Shah <[email protected]> Committed: Mon May 4 12:39:20 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/dag/app/dag/impl/TaskImpl.java | 21 ++++++-- .../tez/dag/app/dag/impl/TestTaskImpl.java | 55 +++++++++++++++++++- 3 files changed, 73 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/bac43ea8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2691b6a..e21a6fd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,8 @@ Apache Tez Change Log Release 0.5.4: Unreleased ALL CHANGES: + TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException: + Invalid event: T_ATTEMPT_KILLED at KILLED. TEZ-2397. Translation of LocalResources via Tez plan serialization can be lossy. TEZ-2221. VertexGroup name should be unqiue TEZ-1521. VertexDataMovementEventsGeneratedEvent may be logged twice in recovery log http://git-wip-us.apache.org/repos/asf/tez/blob/bac43ea8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 38b6688..6a1136c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -112,7 +112,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { protected final EventHandler eventHandler; private final TezTaskID taskId; private Map<TezTaskAttemptID, TaskAttempt> attempts; - private final int maxFailedAttempts; + protected final int maxFailedAttempts; protected final Clock clock; private final Lock readLock; private final Lock writeLock; @@ -255,13 +255,28 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED, EnumSet.of( TaskEventType.T_TERMINATE, - TaskEventType.T_ADD_SPEC_ATTEMPT)) + TaskEventType.T_ADD_SPEC_ATTEMPT, + TaskEventType.T_ATTEMPT_KILLED)) // Transitions from KILLED state + // Ignorable event: T_ATTEMPT_KILLED + // Refer to TEZ-2379 + // T_ATTEMPT_KILLED can show up in KILLED state as + // a SUCCEEDED attempt can still transition to KILLED after receiving + // a KILL event. + // This could happen when there is a race where the task receives a + // kill event, it tries to kill all running attempts and a potential + // running attempt succeeds before it receives the kill event. + // The task will then receive both a SUCCEEDED and KILLED + // event from the same attempt. + // Duplicate events from a single attempt in KILL_WAIT are handled + // properly. However, the subsequent T_ATTEMPT_KILLED event might + // be received after the task reaches its terminal KILLED state. .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED, EnumSet.of( TaskEventType.T_TERMINATE, - TaskEventType.T_ADD_SPEC_ATTEMPT)) + TaskEventType.T_ADD_SPEC_ATTEMPT, + TaskEventType.T_ATTEMPT_KILLED)) .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED, TaskEventType.T_SCHEDULE) http://git-wip-us.apache.org/repos/asf/tez/blob/bac43ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index 51ed49f..434107f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -177,6 +177,13 @@ public class TestTaskImpl { assertTaskKillWaitState(); } + private void failTask(TezTaskID taskId) { + mockTask.handle(new TaskEventTermination(taskId, + TaskTerminationCause.OWN_TASK_FAILURE)); + assertTaskKillWaitState(); + } + + private void killScheduledTaskAttempt(TezTaskAttemptID attemptId) { mockTask.handle(new TaskEventTAUpdate(attemptId, TaskEventType.T_ATTEMPT_KILLED)); @@ -289,7 +296,6 @@ public class TestTaskImpl { killTask(taskId); mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), TaskEventType.T_ATTEMPT_KILLED)); - assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState()); verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED); } @@ -377,7 +383,52 @@ public class TestTaskImpl { killRunningTaskAttempt(mockTask.getLastAttempt().getID()); } - @Test + /** + * {@link TaskState#KILLED}->{@link TaskState#KILLED} + */ + @Test(timeout = 5000) + public void testKilledAttemptAtTaskKilled() { + LOG.info("--- START: testKilledAttemptAtTaskKilled ---"); + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + launchTaskAttempt(mockTask.getLastAttempt().getID()); + killTask(taskId); + mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), + TaskEventType.T_ATTEMPT_KILLED)); + assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState()); + + // Send duplicate kill for same attempt + // This will not happen in practice but this is to simulate handling + // of killed attempts in killed state. + mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), + TaskEventType.T_ATTEMPT_KILLED)); + assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState()); + + } + + /** + * {@link TaskState#FAILED}->{@link TaskState#FAILED} + */ + @Test(timeout = 5000) + public void testKilledAttemptAtTaskFailed() { + LOG.info("--- START: testKilledAttemptAtTaskFailed ---"); + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + for (int i = 0; i < mockTask.maxFailedAttempts; ++i) { + mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), + TaskEventType.T_ATTEMPT_FAILED)); + } + assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState()); + + // Send kill for an attempt + mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), + TaskEventType.T_ATTEMPT_KILLED)); + assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState()); + + } + + + @Test(timeout = 5000) public void testFetchedEventsModifyUnderlyingList() { // Tests to ensure that adding an event to a task, does not affect the // result of past getTaskAttemptTezEvents calls.
