Repository: tez Updated Branches: refs/heads/branch-0.6 51903629d -> 3288225a8
Manual merge of TEZ-2317 from master Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3288225a Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3288225a Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3288225a Branch: refs/heads/branch-0.6 Commit: 3288225a8ef0807000a98dd087203852efbd4699 Parents: 5190362 Author: Bikas Saha <[email protected]> Authored: Thu Apr 16 11:50:17 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Thu Apr 16 11:50:17 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../apache/tez/dag/app/dag/impl/TaskImpl.java | 17 +++++++++++++-- .../tez/dag/app/dag/impl/TestTaskImpl.java | 22 ++++++++++++++++++++ 3 files changed, 39 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/3288225a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3eb5b2e..3b238fa 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -177,6 +177,8 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-2317. Event processing backlog can result in task failures for short + tasks TEZ-2289. ATSHistoryLoggingService can generate ArrayOutOfBoundsException. TEZ-2257. Fix potential NPEs in TaskReporter. TEZ-2192. Relocalization does not check for source. http://git-wip-us.apache.org/repos/asf/tez/blob/3288225a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 77e724d..ebd27ff 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -699,7 +699,20 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { public boolean canCommit(TezTaskAttemptID taskAttemptID) { writeLock.lock(); try { - if (getState() != TaskState.RUNNING) { + TaskState state = getState(); + if (state == TaskState.SCHEDULED) { + // the actual running task ran and is done and asking for commit. we are still stuck + // in the scheduled state which indicates a backlog in event processing. lets wait for the + // backlog to clear. returning false will make the attempt come back to us. + if (LOG.isDebugEnabled()) { + LOG.debug("Event processing delay. " + + "Attempt committing before state machine transitioned to running : Task:" + taskId); + return false; + } + } + // at this point the attempt is no longer in scheduled state or else we would still + // have been in scheduled state in task impl. + if (state != TaskState.RUNNING) { LOG.info("Task not running. Issuing kill to bad commit attempt " + taskAttemptID); eventHandler.handle(new TaskAttemptEventKillRequest(taskAttemptID , "Task not running. Bad attempt.", TaskAttemptTerminationCause.TERMINATED_ORPHANED)); @@ -742,7 +755,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { writeLock.unlock(); } } - + TaskAttemptImpl createAttempt(int attemptNumber) { return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler, taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext, http://git-wip-us.apache.org/repos/asf/tez/blob/3288225a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index fb1db0b..ec13607 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -467,6 +467,28 @@ public class TestTaskImpl { assertTaskSucceededState(); } + + + @Test(timeout = 5000) + public void testEventBacklogDuringTaskAttemptCommit() { + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + assertEquals(TaskState.SCHEDULED, mockTask.getState()); + // simulate + // task in scheduled state due to event backlog - real task done and calling canCommit + assertFalse("Commit should return false to make running task wait", + mockTask.canCommit(mockTask.getLastAttempt().getID())); + launchTaskAttempt(mockTask.getLastAttempt().getID()); + updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); + assertTrue("Task state in AM is running now. Can commit.", + mockTask.canCommit(mockTask.getLastAttempt().getID())); + + updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.SUCCEEDED); + mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), + TaskEventType.T_ATTEMPT_SUCCEEDED)); + + assertTaskSucceededState(); + } @Test(timeout = 5000)
