Repository: tez
Updated Branches:
  refs/heads/branch-0.5 0bceb8e49 -> bac43ea84


TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException: Invalid 
event: T_ATTEMPT_KILLED at KILLED. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bac43ea8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bac43ea8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bac43ea8

Branch: refs/heads/branch-0.5
Commit: bac43ea84281f4a928fb7f9bd4887817f55f4e73
Parents: 0bceb8e
Author: Hitesh Shah <[email protected]>
Authored: Mon May 4 12:39:20 2015 -0700
Committer: Hitesh Shah <[email protected]>
Committed: Mon May 4 12:39:20 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 21 ++++++--
 .../tez/dag/app/dag/impl/TestTaskImpl.java      | 55 +++++++++++++++++++-
 3 files changed, 73 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/bac43ea8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2691b6a..e21a6fd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@ Apache Tez Change Log
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException:
+    Invalid event: T_ATTEMPT_KILLED at KILLED.
   TEZ-2397. Translation of LocalResources via Tez plan serialization can be 
lossy.
   TEZ-2221. VertexGroup name should be unqiue
   TEZ-1521. VertexDataMovementEventsGeneratedEvent may be logged twice in 
recovery log

http://git-wip-us.apache.org/repos/asf/tez/blob/bac43ea8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 38b6688..6a1136c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -112,7 +112,7 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
   protected final EventHandler eventHandler;
   private final TezTaskID taskId;
   private Map<TezTaskAttemptID, TaskAttempt> attempts;
-  private final int maxFailedAttempts;
+  protected final int maxFailedAttempts;
   protected final Clock clock;
   private final Lock readLock;
   private final Lock writeLock;
@@ -255,13 +255,28 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
     .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
         EnumSet.of(
             TaskEventType.T_TERMINATE,
-            TaskEventType.T_ADD_SPEC_ATTEMPT))
+            TaskEventType.T_ADD_SPEC_ATTEMPT,
+            TaskEventType.T_ATTEMPT_KILLED))
 
     // Transitions from KILLED state
+    // Ignorable event: T_ATTEMPT_KILLED
+    // Refer to TEZ-2379
+    // T_ATTEMPT_KILLED can show up in KILLED state as
+    // a SUCCEEDED attempt can still transition to KILLED after receiving
+    // a KILL event.
+    // This could happen when there is a race where the task receives a
+    // kill event, it tries to kill all running attempts and a potential
+    // running attempt succeeds before it receives the kill event.
+    // The task will then receive both a SUCCEEDED and KILLED
+    // event from the same attempt.
+    // Duplicate events from a single attempt in KILL_WAIT are handled
+    // properly. However, the subsequent T_ATTEMPT_KILLED event might
+    // be received after the task reaches its terminal KILLED state.
     .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
         EnumSet.of(
             TaskEventType.T_TERMINATE,
-            TaskEventType.T_ADD_SPEC_ATTEMPT))
+            TaskEventType.T_ADD_SPEC_ATTEMPT,
+            TaskEventType.T_ATTEMPT_KILLED))
     .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
         TaskEventType.T_SCHEDULE)
 

http://git-wip-us.apache.org/repos/asf/tez/blob/bac43ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 51ed49f..434107f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -177,6 +177,13 @@ public class TestTaskImpl {
     assertTaskKillWaitState();
   }
 
+  private void failTask(TezTaskID taskId) {
+    mockTask.handle(new TaskEventTermination(taskId,
+        TaskTerminationCause.OWN_TASK_FAILURE));
+    assertTaskKillWaitState();
+  }
+
+
   private void killScheduledTaskAttempt(TezTaskAttemptID attemptId) {
     mockTask.handle(new TaskEventTAUpdate(attemptId,
         TaskEventType.T_ATTEMPT_KILLED));
@@ -289,7 +296,6 @@ public class TestTaskImpl {
     killTask(taskId);
     mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
         TaskEventType.T_ATTEMPT_KILLED));
-
     assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
     verifyOutgoingEvents(eventHandler.events, 
VertexEventType.V_TASK_COMPLETED);
   }
@@ -377,7 +383,52 @@ public class TestTaskImpl {
     killRunningTaskAttempt(mockTask.getLastAttempt().getID());
   }
 
-  @Test
+  /**
+   * {@link TaskState#KILLED}->{@link TaskState#KILLED}
+   */
+  @Test(timeout = 5000)
+  public void testKilledAttemptAtTaskKilled() {
+    LOG.info("--- START: testKilledAttemptAtTaskKilled ---");
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    killTask(taskId);
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ATTEMPT_KILLED));
+    assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
+
+    // Send duplicate kill for same attempt
+    // This will not happen in practice but this is to simulate handling
+    // of killed attempts in killed state.
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ATTEMPT_KILLED));
+    assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
+
+  }
+
+  /**
+   * {@link TaskState#FAILED}->{@link TaskState#FAILED}
+   */
+  @Test(timeout = 5000)
+  public void testKilledAttemptAtTaskFailed() {
+    LOG.info("--- START: testKilledAttemptAtTaskFailed ---");
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    for (int i = 0; i < mockTask.maxFailedAttempts; ++i) {
+      mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+          TaskEventType.T_ATTEMPT_FAILED));
+    }
+    assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());
+
+    // Send kill for an attempt
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ATTEMPT_KILLED));
+    assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());
+
+  }
+
+
+  @Test(timeout = 5000)
   public void testFetchedEventsModifyUnderlyingList() {
     // Tests to ensure that adding an event to a task, does not affect the
     // result of past getTaskAttemptTezEvents calls.

Reply via email to