Repository: tez Updated Branches: refs/heads/master 4fa2e8641 -> 4a7719b0c
TEZ-3758. Vertex can hang in RUNNING state when two task attempts finish very closely and have retroactive failures (Kuhu Shukla via jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4a7719b0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4a7719b0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4a7719b0 Branch: refs/heads/master Commit: 4a7719b0c164eae41845815d103ceb7e6e2548c3 Parents: 4fa2e86 Author: Jonathan Eagles <[email protected]> Authored: Thu Jun 22 15:44:08 2017 +0000 Committer: Jonathan Eagles <[email protected]> Committed: Thu Jun 22 15:44:08 2017 +0000 ---------------------------------------------------------------------- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 32 +++- .../tez/dag/app/dag/impl/TestTaskImpl.java | 145 ++++++++++++++++++- 2 files changed, 166 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/4a7719b0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index f25e583..c8e911e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -151,6 +151,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition(); private static final SingleArcTransition<TaskImpl, TaskEvent> KILL_TRANSITION = new KillTransition(); + private static final SingleArcTransition<TaskImpl, TaskEvent> + REDUNDANT_COMPLETED_TRANSITION = new AttemptRedundantCompletedTransition(); private static final TaskStateChangedCallback STATE_CHANGED_CALLBACK = new TaskStateChangedCallback(); @@ -244,12 +246,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { .addTransition(TaskStateInternal.SUCCEEDED, //only possible for map tasks EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED), TaskEventType.T_ATTEMPT_KILLED, new TaskRetroactiveKilledTransition()) - // Ignore-able transitions. .addTransition( TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED, - EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT, + TaskEventType.T_ATTEMPT_SUCCEEDED, REDUNDANT_COMPLETED_TRANSITION) + // Ignore-able transitions. + .addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED, + EnumSet.of( + TaskEventType.T_ADD_SPEC_ATTEMPT, TaskEventType.T_TERMINATE, - TaskEventType.T_ATTEMPT_SUCCEEDED, // Maybe track and reuse later TaskEventType.T_ATTEMPT_LAUNCHED)) .addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED, TaskEventType.T_SCHEDULE) @@ -257,12 +261,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { // Transitions from FAILED state .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED, EnumSet.of( - TaskEventType.T_TERMINATE, - TaskEventType.T_SCHEDULE, - TaskEventType.T_ADD_SPEC_ATTEMPT, TaskEventType.T_ATTEMPT_FAILED, TaskEventType.T_ATTEMPT_KILLED, - TaskEventType.T_ATTEMPT_SUCCEEDED)) + TaskEventType.T_ATTEMPT_SUCCEEDED), REDUNDANT_COMPLETED_TRANSITION) + .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED, + EnumSet.of( + TaskEventType.T_TERMINATE, + TaskEventType.T_SCHEDULE, + TaskEventType.T_ADD_SPEC_ATTEMPT)) // Transitions from KILLED state // Ignorable event: T_ATTEMPT_KILLED @@ -1263,7 +1269,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { if (task.getInternalState() == TaskStateInternal.SUCCEEDED && !failedAttemptId.equals(task.successfulAttempt)) { // don't allow a different task attempt to override a previous - // succeeded state + // succeeded state and mark the attempt status as done + task.taskAttemptStatus.put(failedAttemptId.getId(), true); return TaskStateInternal.SUCCEEDED; } @@ -1346,6 +1353,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { } } + private static class AttemptRedundantCompletedTransition + implements SingleArcTransition<TaskImpl, TaskEvent> { + @Override + public void transition(TaskImpl task, TaskEvent event) { + TezTaskAttemptID successTaId = ((TaskEventTAUpdate)event).getTaskAttemptID(); + task.taskAttemptStatus.put(successTaId.getId(), true); + } + } + private static class TaskStateChangedCallback implements OnStateChangedCallback<TaskStateInternal, TaskImpl> { http://git-wip-us.apache.org/repos/asf/tez/blob/4a7719b0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index da25927..e5d564e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -32,14 +32,21 @@ import java.util.List; import java.util.Map; import org.apache.tez.common.TezAbstractEvent; -import org.apache.tez.dag.app.dag.DAGScheduler; -import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted; import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.TaskEvent; import org.apache.tez.dag.app.dag.event.TaskEventTAFailed; import org.apache.tez.dag.app.dag.event.TaskEventTAKilled; import org.apache.tez.dag.app.dag.event.TaskEventTALaunched; import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded; +import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; +import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.HistoryEvent; @@ -127,6 +134,7 @@ public class TestTaskImpl { private MockTaskImpl mockTask; private TaskSpec mockTaskSpec; + private Vertex mockVertex; @SuppressWarnings("rawtypes") class TestEventHandler implements EventHandler<Event> { @@ -152,9 +160,13 @@ public class TestTaskImpl { dagId = TezDAGID.getInstance(appId, 1); vertexId = TezVertexID.getInstance(dagId, 1); appContext = mock(AppContext.class, RETURNS_DEEP_STUBS); + when(appContext.getDAGRecoveryData()).thenReturn(null); + appContext.setDAGRecoveryData(null); mockContainerId = mock(ContainerId.class); mockContainer = mock(Container.class); mockAMContainer = mock(AMContainer.class); + when(mockAMContainer.getContainer()).thenReturn(mockContainer); + when(mockContainer.getNodeHttpAddress()).thenReturn("localhost:1234"); mockNodeId = mock(NodeId.class); mockHistoryHandler = mock(HistoryEventHandler.class); when(mockContainer.getId()).thenReturn(mockContainerId); @@ -178,6 +190,11 @@ public class TestTaskImpl { taskHeartbeatHandler, appContext, leafVertex, taskResource, containerContext, vertex); mockTaskSpec = mock(TaskSpec.class); + mockVertex = mock(Vertex.class); + ServicePluginInfo servicePluginInfo = new ServicePluginInfo() + .setContainerLauncherName(TezConstants.getTezYarnServicePluginName()); + when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo); + when(mockVertex.getVertexConfig()).thenReturn(new VertexImpl.VertexConfigImpl(conf)); } private TezTaskID getNewTaskID() { @@ -947,6 +964,123 @@ public class TestTaskImpl { Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA()); } + @SuppressWarnings("rawtypes") + @Test + public void testSucceededAttemptStatusWithRetroActiveFailures() throws InterruptedException { + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getAttemptList().get(0); + launchTaskAttempt(firstMockTaskAttempt.getID()); + mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID())); + MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getAttemptList().get(1); + launchTaskAttempt(secondMockTaskAttempt.getID()); + + firstMockTaskAttempt.handle(new TaskAttemptEventSchedule( + TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10)); + secondMockTaskAttempt.handle(new TaskAttemptEventSchedule( + TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10)); + firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted( + TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId())); + secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted( + TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId())); + + secondMockTaskAttempt.handle( + new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()))); + firstMockTaskAttempt.handle( + new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()))); + secondMockTaskAttempt.handle( + new TaskAttemptEvent(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), + TaskAttemptEventType.TA_DONE)); + firstMockTaskAttempt.handle( + new TaskAttemptEvent(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), + TaskAttemptEventType.TA_DONE)); + + mockTask.handle(new TaskEventTASucceeded(secondMockTaskAttempt.getID())); + mockTask.handle(new TaskEventTASucceeded(firstMockTaskAttempt.getID())); + assertTrue("Attempts should have succeeded!", + firstMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED + && secondMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED); + assertEquals("Task should have no uncompleted attempts!", 0, mockTask.getUncompletedAttemptsCount()); + assertTrue("Task should have Succeeded!", mockTask.getState() == TaskState.SUCCEEDED); + //Failing the attempt that finished after the task was marked succeeded, should not schedule another attempt + failAttempt(firstMockTaskAttempt, 0, 0); + assertTaskSucceededState(); + //Failing the attempt that allowed the task to succeed, should schedule another attempt + failAttempt(secondMockTaskAttempt, 1, 1); + assertTaskScheduledState(); + } + + @SuppressWarnings("rawtypes") + @Test + public void testFailedAttemptStatus() throws InterruptedException { + Configuration newConf = new Configuration(conf); + newConf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1); + Vertex vertex = mock(Vertex.class); + doReturn(new VertexImpl.VertexConfigImpl(newConf)).when(vertex).getVertexConfig(); + mockTask = new MockTaskImpl(vertexId, partition, + eventHandler, conf, taskCommunicatorManagerInterface, clock, + taskHeartbeatHandler, appContext, leafVertex, + taskResource, containerContext, vertex); + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getAttemptList().get(0); + launchTaskAttempt(firstMockTaskAttempt.getID()); + mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID())); + MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getAttemptList().get(1); + launchTaskAttempt(secondMockTaskAttempt.getID()); + + firstMockTaskAttempt.handle(new TaskAttemptEventSchedule( + TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10)); + secondMockTaskAttempt.handle(new TaskAttemptEventSchedule( + TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10)); + firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted( + TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId())); + secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted( + TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId())); + + secondMockTaskAttempt.handle( + new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()))); + firstMockTaskAttempt.handle( + new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()))); + secondMockTaskAttempt.handle( + new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), + TaskAttemptEventType.TA_FAILED,TaskFailureType.NON_FATAL, "test", + TaskAttemptTerminationCause.NO_PROGRESS)); + firstMockTaskAttempt.handle( + new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), + TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "test", + TaskAttemptTerminationCause.NO_PROGRESS)); + + firstMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId, + firstMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS)); + secondMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId, + secondMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS)); + mockTask.handle(new TaskEventTAFailed(secondMockTaskAttempt.getID(), TaskFailureType.NON_FATAL, + mock(TaskAttemptEvent.class))); + mockTask.handle(new TaskEventTAFailed(firstMockTaskAttempt.getID(), TaskFailureType.NON_FATAL, + mock(TaskAttemptEvent.class))); + assertTrue("Attempts should have failed!", + firstMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.FAILED + && secondMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.FAILED); + assertEquals("Task should have no uncompleted attempts!", 0, mockTask.getUncompletedAttemptsCount()); + assertTrue("Task should have failed!", mockTask.getState() == TaskState.FAILED); + } + + private void failAttempt(MockTaskAttemptImpl taskAttempt, int index, int expectedIncompleteAttempts) { + InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, index); + TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class); + EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge", mockDestId); + TezEvent tzEvent = new TezEvent(mockReEvent, meta); + TaskAttemptEventOutputFailed outputFailedEvent = + new TaskAttemptEventOutputFailed(mockDestId, tzEvent, 1); + taskAttempt.handle( + outputFailedEvent); + TaskEvent tEventFail1 = new TaskEventTAFailed(taskAttempt.getID(), TaskFailureType.NON_FATAL, outputFailedEvent); + mockTask.handle(tEventFail1); + assertEquals("Unexpected number of incomplete attempts!", + expectedIncompleteAttempts, mockTask.getUncompletedAttemptsCount()); + } + // TODO Add test to validate the correct commit attempt. @@ -1053,7 +1187,12 @@ public class TestTaskImpl { appContext, isRescheduled, resource, containerContext, false, null, locationHint, mockTaskSpec, schedCausalTA); } - + + @Override + protected Vertex getVertex() { + return mockVertex; + } + @Override public float getProgress() { return progress;
