TEZ-3791. Failed/Killed task can throw InvalidStateTransitonException when a new attempt is launched (Kuhu Shukla via jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/87b60237 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/87b60237 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/87b60237 Branch: refs/heads/branch-0.9.0 Commit: 87b6023717e8bff3fb76dcc17bc64852439b1123 Parents: 6d26a5c Author: Jonathan Eagles <[email protected]> Authored: Tue Jul 11 15:29:09 2017 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Tue Jul 11 15:29:09 2017 -0500 ---------------------------------------------------------------------- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 6 +- .../tez/dag/app/dag/impl/TestTaskImpl.java | 108 +++++++++++++++++++ 2 files changed, 113 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/87b60237/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index c8e911e..bed4141 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -268,7 +268,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { EnumSet.of( TaskEventType.T_TERMINATE, TaskEventType.T_SCHEDULE, - TaskEventType.T_ADD_SPEC_ATTEMPT)) + TaskEventType.T_ADD_SPEC_ATTEMPT, + TaskEventType.T_ATTEMPT_LAUNCHED)) // Transitions from KILLED state // Ignorable event: T_ATTEMPT_KILLED @@ -289,6 +290,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { TaskEventType.T_TERMINATE, TaskEventType.T_SCHEDULE, TaskEventType.T_ADD_SPEC_ATTEMPT, + TaskEventType.T_ATTEMPT_LAUNCHED, + TaskEventType.T_ATTEMPT_SUCCEEDED, + TaskEventType.T_ATTEMPT_FAILED, TaskEventType.T_ATTEMPT_KILLED)) // create the topology tables http://git-wip-us.apache.org/repos/asf/tez/blob/87b60237/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index e5d564e..e03e282 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -78,6 +78,7 @@ import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.app.dag.TaskStateInternal; import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled; import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest; import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; @@ -1081,6 +1082,113 @@ public class TestTaskImpl { expectedIncompleteAttempts, mockTask.getUncompletedAttemptsCount()); } + @Test (timeout = 30000) + public void testFailedTaskTransitionWithLaunchedAttempt() throws InterruptedException { + Configuration newConf = new Configuration(conf); + newConf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1); + Vertex vertex = mock(Vertex.class); + doReturn(new VertexImpl.VertexConfigImpl(newConf)).when(vertex).getVertexConfig(); + mockTask = new MockTaskImpl(vertexId, partition, + eventHandler, conf, taskCommunicatorManagerInterface, clock, + taskHeartbeatHandler, appContext, leafVertex, + taskResource, containerContext, vertex); + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getLastAttempt(); + launchTaskAttempt(firstMockTaskAttempt.getID()); + mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID())); + MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getLastAttempt(); + launchTaskAttempt(secondMockTaskAttempt.getID()); + + firstMockTaskAttempt.handle(new TaskAttemptEventSchedule( + TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10)); + secondMockTaskAttempt.handle(new TaskAttemptEventSchedule( + TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10)); + firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted( + TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId())); + secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted( + TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId())); + + secondMockTaskAttempt.handle( + new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()))); + firstMockTaskAttempt.handle( + new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()))); + secondMockTaskAttempt.handle( + new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), + TaskAttemptEventType.TA_FAILED,TaskFailureType.NON_FATAL, "test", + TaskAttemptTerminationCause.NO_PROGRESS)); + firstMockTaskAttempt.handle( + new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), + TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "test", + TaskAttemptTerminationCause.NO_PROGRESS)); + + firstMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId, + firstMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS)); + secondMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId, + secondMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS)); + mockTask.handle(new TaskEventTAFailed(secondMockTaskAttempt.getID(), TaskFailureType.NON_FATAL, + mock(TaskAttemptEvent.class))); + mockTask.handle(new TaskEventTAFailed(firstMockTaskAttempt.getID(), TaskFailureType.NON_FATAL, + mock(TaskAttemptEvent.class))); + assertTrue("Attempts should have failed!", + firstMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.FAILED + && secondMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.FAILED); + assertEquals("Task should have no uncompleted attempts!", 0, mockTask.getUncompletedAttemptsCount()); + assertTrue("Task should have failed!", mockTask.getState() == TaskState.FAILED); + mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID())); + MockTaskAttemptImpl thirdMockTaskAttempt = mockTask.getLastAttempt(); + mockTask.handle(createTaskTALauncherEvent(thirdMockTaskAttempt.getID())); + } + + @Test (timeout = 30000) + public void testKilledTaskTransitionWithLaunchedAttempt() throws InterruptedException { + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getLastAttempt(); + launchTaskAttempt(firstMockTaskAttempt.getID()); + mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID())); + MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getLastAttempt(); + launchTaskAttempt(secondMockTaskAttempt.getID()); + + firstMockTaskAttempt.handle(new TaskAttemptEventSchedule( + TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10)); + secondMockTaskAttempt.handle(new TaskAttemptEventSchedule( + TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10)); + firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted( + TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId())); + secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted( + TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId())); + + secondMockTaskAttempt.handle( + new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()))); + firstMockTaskAttempt.handle( + new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()))); + mockTask.handle(new TaskEventTermination(mockTask.getTaskId(), + TaskAttemptTerminationCause.FRAMEWORK_ERROR, "test")); + secondMockTaskAttempt.handle( + new TaskAttemptEventAttemptKilled(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()),"test", + TaskAttemptTerminationCause.FRAMEWORK_ERROR)); + mockTask.handle(new TaskEventTAKilled(secondMockTaskAttempt.getID(), + new TaskAttemptEvent(secondMockTaskAttempt.getID(), TaskAttemptEventType.TA_KILLED))); + firstMockTaskAttempt.handle( + new TaskAttemptEventAttemptKilled(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),"test", + TaskAttemptTerminationCause.FRAMEWORK_ERROR)); + mockTask.handle(new TaskEventTAKilled(firstMockTaskAttempt.getID(), + new TaskAttemptEvent(firstMockTaskAttempt.getID(), TaskAttemptEventType.TA_KILLED))); + firstMockTaskAttempt.handle( + new TaskAttemptEventAttemptKilled(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),"test", + TaskAttemptTerminationCause.FRAMEWORK_ERROR)); + assertEquals("Task should have been killed!", mockTask.getInternalState(), TaskStateInternal.KILLED); + mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID())); + MockTaskAttemptImpl thirdMockTaskAttempt = mockTask.getLastAttempt(); + mockTask.handle(createTaskTALauncherEvent(thirdMockTaskAttempt.getID())); + mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID())); + MockTaskAttemptImpl fourthMockTaskAttempt = mockTask.getLastAttempt(); + mockTask.handle(createTaskTASucceededEvent(fourthMockTaskAttempt.getID())); + MockTaskAttemptImpl fifthMockTaskAttempt = mockTask.getLastAttempt(); + mockTask.handle(createTaskTAFailedEvent(fifthMockTaskAttempt.getID())); + } + // TODO Add test to validate the correct commit attempt.
