Repository: tez Updated Branches: refs/heads/master 3fb57c869 -> a51af593f
TEZ-3857. Tez TaskImpl can throw Invalid state transition for leaf tasks that do Retro Active Transition (kshukla) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a51af593 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a51af593 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a51af593 Branch: refs/heads/master Commit: a51af593f3cf9cf42120cb741a2c69eb83a08768 Parents: 3fb57c8 Author: Kuhu Shukla <[email protected]> Authored: Wed Nov 1 11:26:54 2017 -0500 Committer: Kuhu Shukla <[email protected]> Committed: Wed Nov 1 11:26:54 2017 -0500 ---------------------------------------------------------------------- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 13 +++-- .../tez/dag/app/dag/impl/TestTaskImpl.java | 56 ++++++++++++++++++++ 2 files changed, 62 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a51af593/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index bed4141..99cb2e0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -1248,12 +1248,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { @Override public TaskStateInternal transition(TaskImpl task, TaskEvent event) { - if (task.leafVertex) { - LOG.error("Unexpected event for task of leaf vertex " + event.getType() + ", taskId: " - + task.getTaskId()); - task.internalError(event.getType()); - } - TaskEventTAFailed castEvent = (TaskEventTAFailed) event; TezTaskAttemptID failedAttemptId = castEvent.getTaskAttemptID(); TaskAttempt failedAttempt = task.getAttempt(failedAttemptId); @@ -1277,7 +1271,12 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { task.taskAttemptStatus.put(failedAttemptId.getId(), true); return TaskStateInternal.SUCCEEDED; } - + + if (task.leafVertex) { + LOG.error("Unexpected event for task of leaf vertex " + event.getType() + ", taskId: " + + task.getTaskId()); + task.internalError(event.getType()); + } Preconditions.checkState(castEvent.getCausalEvent() != null); TaskAttemptEventOutputFailed destinationEvent = (TaskAttemptEventOutputFailed) castEvent.getCausalEvent(); http://git-wip-us.apache.org/repos/asf/tez/blob/a51af593/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index e03e282..d13e654 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -1067,6 +1067,62 @@ public class TestTaskImpl { assertTrue("Task should have failed!", mockTask.getState() == TaskState.FAILED); } + @SuppressWarnings("rawtypes") + @Test (timeout = 10000L) + public void testSucceededLeafTaskWithRetroFailures() throws InterruptedException { + Configuration newConf = new Configuration(conf); + newConf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1); + Vertex vertex = mock(Vertex.class); + doReturn(new VertexImpl.VertexConfigImpl(newConf)).when(vertex).getVertexConfig(); + mockTask = new MockTaskImpl(vertexId, partition, + eventHandler, conf, taskCommunicatorManagerInterface, clock, + taskHeartbeatHandler, appContext, true, + taskResource, containerContext, vertex); + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getAttemptList().get(0); + launchTaskAttempt(firstMockTaskAttempt.getID()); + mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID())); + MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getAttemptList().get(1); + launchTaskAttempt(secondMockTaskAttempt.getID()); + + firstMockTaskAttempt.handle(new TaskAttemptEventSchedule( + TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10)); + secondMockTaskAttempt.handle(new TaskAttemptEventSchedule( + TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10)); + firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted( + TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId())); + secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted( + TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId())); + + secondMockTaskAttempt.handle( + new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()))); + firstMockTaskAttempt.handle( + new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()))); + secondMockTaskAttempt.handle( + new TaskAttemptEvent(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), + TaskAttemptEventType.TA_DONE)); + firstMockTaskAttempt.handle( + new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), + TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "test", + TaskAttemptTerminationCause.CONTAINER_EXITED)); + + mockTask.handle(new TaskEventTASucceeded(secondMockTaskAttempt.getID())); + firstMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId, + firstMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS)); + + InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, 0); + TezTaskAttemptID mockDestId = firstMockTaskAttempt.getID(); + EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge", mockDestId); + TezEvent tzEvent = new TezEvent(mockReEvent, meta); + TaskAttemptEventOutputFailed outputFailedEvent = + new TaskAttemptEventOutputFailed(mockDestId, tzEvent, 1); + firstMockTaskAttempt.handle(outputFailedEvent); + mockTask.handle(new TaskEventTAFailed(firstMockTaskAttempt.getID(), TaskFailureType.NON_FATAL, + mock(TaskAttemptEvent.class))); + Assert.assertEquals(mockTask.getInternalState(), TaskStateInternal.SUCCEEDED); + } + private void failAttempt(MockTaskAttemptImpl taskAttempt, int index, int expectedIncompleteAttempts) { InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, index); TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class);
