Repository: tez
Updated Branches:
  refs/heads/master bfb34afba -> e1968681c


TEZ-2317. Event processing backlog can result in task failures for short tasks 
(bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e1968681
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e1968681
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e1968681

Branch: refs/heads/master
Commit: e1968681cee821103e0105e4948c4fc6dc949776
Parents: bfb34af
Author: Bikas Saha <[email protected]>
Authored: Thu Apr 16 11:42:09 2015 -0700
Committer: Bikas Saha <[email protected]>
Committed: Thu Apr 16 11:42:09 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 15 +++++++++++--
 .../tez/dag/app/dag/impl/TestTaskImpl.java      | 22 ++++++++++++++++++++
 .../api/events/TaskStatusUpdateEvent.java       |  2 ++
 4 files changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e1968681/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a41a724..ce9bfe8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -288,6 +288,8 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2317. Event processing backlog can result in task failures for short
+  tasks
   TEZ-2289. ATSHistoryLoggingService can generate ArrayOutOfBoundsException.
   TEZ-2257. Fix potential NPEs in TaskReporter.
   TEZ-2192. Relocalization does not check for source.

http://git-wip-us.apache.org/repos/asf/tez/blob/e1968681/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 10a688f..a1eed07 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -715,7 +715,18 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
   public boolean canCommit(TezTaskAttemptID taskAttemptID) {
     writeLock.lock();
     try {
-      if (getState() != TaskState.RUNNING) {
+      TaskState state = getState();
+      if (state == TaskState.SCHEDULED) {
+        // the actual running task ran and is done and asking for commit. we 
are still stuck 
+        // in the scheduled state which indicates a backlog in event 
processing. lets wait for the
+        // backlog to clear. returning false will make the attempt come back 
to us.
+        LOG.debug("Event processing delay. "
+            + "Attempt committing before state machine transitioned to running 
: Task {}", taskId);
+        return false;
+      }
+      // at this point the attempt is no longer in scheduled state or else we 
would still 
+      // have been in scheduled state in task impl.
+      if (state != TaskState.RUNNING) {
         LOG.info("Task not running. Issuing kill to bad commit attempt " + 
taskAttemptID);
         eventHandler.handle(new TaskAttemptEventKillRequest(taskAttemptID
             , "Task not running. Bad attempt.", 
TaskAttemptTerminationCause.TERMINATED_ORPHANED));
@@ -758,7 +769,7 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
       writeLock.unlock();
     }
   }
-
+  
   TaskAttemptImpl createAttempt(int attemptNumber) {
     return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
         taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,

http://git-wip-us.apache.org/repos/asf/tez/blob/e1968681/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 1c4b319..9509df4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -467,6 +467,28 @@ public class TestTaskImpl {
 
     assertTaskSucceededState();
   }
+  
+
+  @Test(timeout = 5000)
+  public void testEventBacklogDuringTaskAttemptCommit() {
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    assertEquals(TaskState.SCHEDULED, mockTask.getState());
+    // simulate
+    // task in scheduled state due to event backlog - real task done and 
calling canCommit
+    assertFalse("Commit should return false to make running task wait",
+        mockTask.canCommit(mockTask.getLastAttempt().getID()));
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
+    assertTrue("Task state in AM is running now. Can commit.",
+        mockTask.canCommit(mockTask.getLastAttempt().getID()));
+
+    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.SUCCEEDED);
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+
+    assertTaskSucceededState();
+  }
 
 
   @Test(timeout = 5000)

http://git-wip-us.apache.org/repos/asf/tez/blob/e1968681/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
index 875a345..6465bed 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
@@ -66,6 +66,8 @@ public class TaskStatusUpdateEvent extends Event implements 
Writable {
     if (statistics != null) {
       out.writeBoolean(true);
       statistics.write(out);
+    } else {
+      out.writeBoolean(false);
     }
   }
 

Reply via email to