Repository: tez Updated Branches: refs/heads/master bfb34afba -> e1968681c
TEZ-2317. Event processing backlog can result in task failures for short tasks (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e1968681 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e1968681 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e1968681 Branch: refs/heads/master Commit: e1968681cee821103e0105e4948c4fc6dc949776 Parents: bfb34af Author: Bikas Saha <[email protected]> Authored: Thu Apr 16 11:42:09 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Thu Apr 16 11:42:09 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../apache/tez/dag/app/dag/impl/TaskImpl.java | 15 +++++++++++-- .../tez/dag/app/dag/impl/TestTaskImpl.java | 22 ++++++++++++++++++++ .../api/events/TaskStatusUpdateEvent.java | 2 ++ 4 files changed, 39 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e1968681/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a41a724..ce9bfe8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -288,6 +288,8 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-2317. Event processing backlog can result in task failures for short + tasks TEZ-2289. ATSHistoryLoggingService can generate ArrayOutOfBoundsException. TEZ-2257. Fix potential NPEs in TaskReporter. TEZ-2192. Relocalization does not check for source. http://git-wip-us.apache.org/repos/asf/tez/blob/e1968681/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 10a688f..a1eed07 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -715,7 +715,18 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { public boolean canCommit(TezTaskAttemptID taskAttemptID) { writeLock.lock(); try { - if (getState() != TaskState.RUNNING) { + TaskState state = getState(); + if (state == TaskState.SCHEDULED) { + // the actual running task ran and is done and asking for commit. we are still stuck + // in the scheduled state which indicates a backlog in event processing. lets wait for the + // backlog to clear. returning false will make the attempt come back to us. + LOG.debug("Event processing delay. " + + "Attempt committing before state machine transitioned to running : Task {}", taskId); + return false; + } + // at this point the attempt is no longer in scheduled state or else we would still + // have been in scheduled state in task impl. + if (state != TaskState.RUNNING) { LOG.info("Task not running. Issuing kill to bad commit attempt " + taskAttemptID); eventHandler.handle(new TaskAttemptEventKillRequest(taskAttemptID , "Task not running. Bad attempt.", TaskAttemptTerminationCause.TERMINATED_ORPHANED)); @@ -758,7 +769,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { writeLock.unlock(); } } - + TaskAttemptImpl createAttempt(int attemptNumber) { return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler, taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext, http://git-wip-us.apache.org/repos/asf/tez/blob/e1968681/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index 1c4b319..9509df4 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -467,6 +467,28 @@ public class TestTaskImpl { assertTaskSucceededState(); } + + + @Test(timeout = 5000) + public void testEventBacklogDuringTaskAttemptCommit() { + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + assertEquals(TaskState.SCHEDULED, mockTask.getState()); + // simulate + // task in scheduled state due to event backlog - real task done and calling canCommit + assertFalse("Commit should return false to make running task wait", + mockTask.canCommit(mockTask.getLastAttempt().getID())); + launchTaskAttempt(mockTask.getLastAttempt().getID()); + updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); + assertTrue("Task state in AM is running now. Can commit.", + mockTask.canCommit(mockTask.getLastAttempt().getID())); + + updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.SUCCEEDED); + mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), + TaskEventType.T_ATTEMPT_SUCCEEDED)); + + assertTaskSucceededState(); + } @Test(timeout = 5000) http://git-wip-us.apache.org/repos/asf/tez/blob/e1968681/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java index 875a345..6465bed 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java @@ -66,6 +66,8 @@ public class TaskStatusUpdateEvent extends Event implements Writable { if (statistics != null) { out.writeBoolean(true); statistics.write(out); + } else { + out.writeBoolean(false); } }
