Repository: tez Updated Branches: refs/heads/master bdcdfcc54 -> cc9dd2799
TEZ-3102. Fetch failure of a speculated task causes job hang (jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cc9dd279 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cc9dd279 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cc9dd279 Branch: refs/heads/master Commit: cc9dd2799ff67243017edb9ae5df42dc887032c9 Parents: bdcdfcc Author: Jason Lowe <[email protected]> Authored: Thu Feb 25 20:18:17 2016 +0000 Committer: Jason Lowe <[email protected]> Committed: Thu Feb 25 20:18:17 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/dag/app/dag/impl/TaskImpl.java | 22 ++--- .../tez/dag/app/dag/impl/TestTaskImpl.java | 89 ++++++++++++++++++++ 3 files changed, 98 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/cc9dd279/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c521890..8de1383 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-3029. Add an onError method to service plugin contexts. ALL CHANGES: + TEZ-3102. Fetch failure of a speculated task causes job hang TEZ-3124. Running task hangs due to missing event to initialize input in recovery. TEZ-3135. tez-ext-service-tests, tez-plugins/tez-yarn-timeline-history and tez-tools/tez-javadoc-tools missing dependencies. TEZ-3134. tez-dag should depend on commons-collections4. @@ -384,6 +385,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES + TEZ-3102. Fetch failure of a speculated task causes job hang TEZ-3126. Log reason for not reducing parallelism TEZ-3123. Containers can get re-used even with conflicting local resources. TEZ-3117. Deadlock in Edge and Vertex code http://git-wip-us.apache.org/repos/asf/tez/blob/cc9dd279/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 9ec7ce8..bdadf3f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -1268,14 +1268,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { TaskEventTAUpdate attemptEvent = (TaskEventTAUpdate) event; TezTaskAttemptID attemptId = attemptEvent.getTaskAttemptID(); - if(task.successfulAttempt == attemptId) { - // successful attempt is now killed. reschedule - // tell the job about the rescheduling - unSucceed(task); - task.handleTaskAttemptCompletion( - attemptId, - TaskAttemptStateInternal.KILLED); - task.eventHandler.handle(new VertexEventTaskReschedule(task.taskId)); + TaskStateInternal resultState = TaskStateInternal.SUCCEEDED; + if(task.successfulAttempt.equals(attemptId)) { // typically we are here because this map task was run on a bad node and // we want to reschedule it on a different node. // Depending on whether there are previous failed attempts or not this @@ -1284,14 +1278,12 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { // from the map splitInfo. So the bad node might be sent as a location // to the RM. But the RM would ignore that just like it would ignore // currently pending container requests affinitized to bad nodes. - task.addAndScheduleAttempt(attemptId); - return TaskStateInternal.SCHEDULED; - } else { - // nothing to do - LOG.info("Ignoring kill of attempt: " + attemptId + " because attempt: " + - task.successfulAttempt + " is already successful"); - return TaskStateInternal.SUCCEEDED; + unSucceed(task); + task.eventHandler.handle(new VertexEventTaskReschedule(task.taskId)); + resultState = TaskStateInternal.SCHEDULED; } + ATTEMPT_KILLED_TRANSITION.transition(task, event); + return resultState; } } http://git-wip-us.apache.org/repos/asf/tez/blob/cc9dd279/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index 1274378..6f11aa0 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -56,7 +56,9 @@ import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.app.dag.TaskStateInternal; import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest; import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask; import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; import org.apache.tez.dag.app.dag.event.TaskEventTermination; @@ -643,6 +645,33 @@ public class TestTaskImpl { Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA()); } + @SuppressWarnings("rawtypes") + @Test(timeout = 5000) + public void testTaskSucceedAndRetroActiveKilled() { + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + launchTaskAttempt(mockTask.getLastAttempt().getID()); + updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); + + mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), + TaskEventType.T_ATTEMPT_SUCCEEDED)); + + // The task should now have succeeded + assertTaskSucceededState(); + verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId), + eq(mockTask.getLastAttempt().getID().getId())); + + eventHandler.events.clear(); + // Now kill the attempt after it has succeeded + mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt() + .getID(), TaskEventType.T_ATTEMPT_KILLED)); + + // The task should still be in the scheduled state + assertTaskScheduledState(); + Event event = eventHandler.events.get(0); + Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType()); + } + @Test(timeout = 5000) public void testDiagnostics_KillNew(){ TezTaskID taskId = getNewTaskID(); @@ -734,6 +763,66 @@ public class TestTaskImpl { assertEquals(2, mockTask.getAttemptList().size()); } + @Test(timeout = 20000) + public void testSpeculatedThenRetroactiveFailure() { + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt(); + launchTaskAttempt(firstAttempt.getID()); + updateAttemptState(firstAttempt, TaskAttemptState.RUNNING); + + // Add a speculative task attempt + mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt(); + launchTaskAttempt(specAttempt.getID()); + updateAttemptState(specAttempt, TaskAttemptState.RUNNING); + assertEquals(2, mockTask.getAttemptList().size()); + + // Have the first task succeed + eventHandler.events.clear(); + mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(), + TaskEventType.T_ATTEMPT_SUCCEEDED)); + + // The task should now have succeeded and sent kill to other attempt + assertTaskSucceededState(); + verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId), + eq(firstAttempt.getID().getId())); + @SuppressWarnings("rawtypes") + Event event = eventHandler.events.get(eventHandler.events.size()-1); + assertEquals(TaskAttemptEventType.TA_KILL_REQUEST, event.getType()); + assertEquals(specAttempt.getID(), + ((TaskAttemptEventKillRequest) event).getTaskAttemptID()); + + // Emulate the spec attempt being killed + mockTask.handle(new TaskEventTAUpdate(specAttempt.getID(), + TaskEventType.T_ATTEMPT_KILLED)); + assertTaskSucceededState(); + + // Now fail the attempt after it has succeeded + TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class); + TezEvent mockTezEvent = mock(TezEvent.class); + EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge", mockDestId); + when(mockTezEvent.getSourceInfo()).thenReturn(meta); + TaskAttemptEventOutputFailed outputFailedEvent = + new TaskAttemptEventOutputFailed(mockDestId, mockTezEvent, 1); + eventHandler.events.clear(); + mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(), + TaskEventType.T_ATTEMPT_FAILED, outputFailedEvent)); + + // The task should still be in the scheduled state + assertTaskScheduledState(); + event = eventHandler.events.get(eventHandler.events.size()-1); + Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType()); + + // There should be a new attempt, and report of output read error + // should be the causal TA + List<MockTaskAttemptImpl> attempts = mockTask.getAttemptList(); + Assert.assertEquals(3, attempts.size()); + MockTaskAttemptImpl newAttempt = attempts.get(2); + Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA()); + } + // TODO Add test to validate the correct commit attempt.
