Repository: tez
Updated Branches:
  refs/heads/master bdcdfcc54 -> cc9dd2799


TEZ-3102. Fetch failure of a speculated task causes job hang (jlowe)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cc9dd279
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cc9dd279
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cc9dd279

Branch: refs/heads/master
Commit: cc9dd2799ff67243017edb9ae5df42dc887032c9
Parents: bdcdfcc
Author: Jason Lowe <[email protected]>
Authored: Thu Feb 25 20:18:17 2016 +0000
Committer: Jason Lowe <[email protected]>
Committed: Thu Feb 25 20:18:17 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 22 ++---
 .../tez/dag/app/dag/impl/TestTaskImpl.java      | 89 ++++++++++++++++++++
 3 files changed, 98 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/cc9dd279/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c521890..8de1383 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-3029. Add an onError method to service plugin contexts.
 
 ALL CHANGES:
+  TEZ-3102. Fetch failure of a speculated task causes job hang
   TEZ-3124. Running task hangs due to missing event to initialize input in 
recovery.
   TEZ-3135. tez-ext-service-tests, tez-plugins/tez-yarn-timeline-history and 
tez-tools/tez-javadoc-tools missing dependencies.
   TEZ-3134. tez-dag should depend on commons-collections4.
@@ -384,6 +385,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES
+  TEZ-3102. Fetch failure of a speculated task causes job hang
   TEZ-3126. Log reason for not reducing parallelism
   TEZ-3123. Containers can get re-used even with conflicting local resources.
   TEZ-3117. Deadlock in Edge and Vertex code

http://git-wip-us.apache.org/repos/asf/tez/blob/cc9dd279/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 9ec7ce8..bdadf3f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -1268,14 +1268,8 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
 
       TaskEventTAUpdate attemptEvent = (TaskEventTAUpdate) event;
       TezTaskAttemptID attemptId = attemptEvent.getTaskAttemptID();
-      if(task.successfulAttempt == attemptId) {
-        // successful attempt is now killed. reschedule
-        // tell the job about the rescheduling
-        unSucceed(task);
-        task.handleTaskAttemptCompletion(
-            attemptId,
-            TaskAttemptStateInternal.KILLED);
-        task.eventHandler.handle(new VertexEventTaskReschedule(task.taskId));
+      TaskStateInternal resultState = TaskStateInternal.SUCCEEDED;
+      if(task.successfulAttempt.equals(attemptId)) {
         // typically we are here because this map task was run on a bad node 
and
         // we want to reschedule it on a different node.
         // Depending on whether there are previous failed attempts or not this
@@ -1284,14 +1278,12 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
         // from the map splitInfo. So the bad node might be sent as a location
         // to the RM. But the RM would ignore that just like it would ignore
         // currently pending container requests affinitized to bad nodes.
-        task.addAndScheduleAttempt(attemptId);
-        return TaskStateInternal.SCHEDULED;
-      } else {
-        // nothing to do
-        LOG.info("Ignoring kill of attempt: " + attemptId + " because attempt: 
" +
-            task.successfulAttempt + " is already successful");
-        return TaskStateInternal.SUCCEEDED;
+        unSucceed(task);
+        task.eventHandler.handle(new VertexEventTaskReschedule(task.taskId));
+        resultState = TaskStateInternal.SCHEDULED;
       }
+      ATTEMPT_KILLED_TRANSITION.transition(task, event);
+      return resultState;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/cc9dd279/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 1274378..6f11aa0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -56,7 +56,9 @@ import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.TaskStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
@@ -643,6 +645,33 @@ public class TestTaskImpl {
     Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA());
   }
 
+  @SuppressWarnings("rawtypes")
+  @Test(timeout = 5000)
+  public void testTaskSucceedAndRetroActiveKilled() {
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
+
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+
+    // The task should now have succeeded
+    assertTaskSucceededState();
+    verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), 
eq(taskId),
+        eq(mockTask.getLastAttempt().getID().getId()));
+
+    eventHandler.events.clear();
+    // Now kill the attempt after it has succeeded
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt()
+        .getID(), TaskEventType.T_ATTEMPT_KILLED));
+
+    // The task should still be in the scheduled state
+    assertTaskScheduledState();
+    Event event = eventHandler.events.get(0);
+    Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType());
+  }
+
   @Test(timeout = 5000)
   public void testDiagnostics_KillNew(){
     TezTaskID taskId = getNewTaskID();
@@ -734,6 +763,66 @@ public class TestTaskImpl {
     assertEquals(2, mockTask.getAttemptList().size());
   }
 
+  @Test(timeout = 20000)
+  public void testSpeculatedThenRetroactiveFailure() {
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(firstAttempt.getID());
+    updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
+
+    // Add a speculative task attempt
+    mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(specAttempt.getID());
+    updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
+    assertEquals(2, mockTask.getAttemptList().size());
+
+    // Have the first task succeed
+    eventHandler.events.clear();
+    mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+
+    // The task should now have succeeded and sent kill to other attempt
+    assertTaskSucceededState();
+    verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), 
eq(taskId),
+        eq(firstAttempt.getID().getId()));
+    @SuppressWarnings("rawtypes")
+    Event event = eventHandler.events.get(eventHandler.events.size()-1);
+    assertEquals(TaskAttemptEventType.TA_KILL_REQUEST, event.getType());
+    assertEquals(specAttempt.getID(),
+        ((TaskAttemptEventKillRequest) event).getTaskAttemptID());
+
+    // Emulate the spec attempt being killed
+    mockTask.handle(new TaskEventTAUpdate(specAttempt.getID(),
+        TaskEventType.T_ATTEMPT_KILLED));
+    assertTaskSucceededState();
+
+    // Now fail the attempt after it has succeeded
+    TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class);
+    TezEvent mockTezEvent = mock(TezEvent.class);
+    EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, 
"Vertex", "Edge", mockDestId);
+    when(mockTezEvent.getSourceInfo()).thenReturn(meta);
+    TaskAttemptEventOutputFailed outputFailedEvent =
+        new TaskAttemptEventOutputFailed(mockDestId, mockTezEvent, 1);
+    eventHandler.events.clear();
+    mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
+        TaskEventType.T_ATTEMPT_FAILED, outputFailedEvent));
+
+    // The task should still be in the scheduled state
+    assertTaskScheduledState();
+    event = eventHandler.events.get(eventHandler.events.size()-1);
+    Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType());
+
+    // There should be a new attempt, and report of output read error
+    // should be the causal TA
+    List<MockTaskAttemptImpl> attempts = mockTask.getAttemptList();
+    Assert.assertEquals(3, attempts.size());
+    MockTaskAttemptImpl newAttempt = attempts.get(2);
+    Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA());
+  }
+
   // TODO Add test to validate the correct commit attempt.
 
 

Reply via email to