Repository: tez
Updated Branches:
refs/heads/branch-0.5 fec1cf55c -> 1d2725b79
Manual merge of TEZ-2317 from master
(cherry picked from commit 3288225a8ef0807000a98dd087203852efbd4699)
Conflicts: Manual resolve
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1d2725b7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1d2725b7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1d2725b7
Branch: refs/heads/branch-0.5
Commit: 1d2725b7982d9dec88580eeaf6c6ae38dbd44bd6
Parents: fec1cf5
Author: Bikas Saha <[email protected]>
Authored: Thu Apr 16 11:50:17 2015 -0700
Committer: Bikas Saha <[email protected]>
Committed: Thu Apr 16 11:53:04 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 15 ++++++++++++-
.../tez/dag/app/dag/impl/TestTaskImpl.java | 22 ++++++++++++++++++++
3 files changed, 38 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/1d2725b7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cc86374..9b2d24a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@ Apache Tez Change Log
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-2317. Event processing backlog can result in task failures for short
+ tasks
TEZ-2289. ATSHistoryLoggingService can generate ArrayOutOfBoundsException.
TEZ-2257. Fix potential NPEs in TaskReporter.
TEZ-2192. Relocalization does not check for source.
http://git-wip-us.apache.org/repos/asf/tez/blob/1d2725b7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 7bafb3b..58284c9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -706,7 +706,20 @@ public class TaskImpl implements Task,
EventHandler<TaskEvent> {
public boolean canCommit(TezTaskAttemptID taskAttemptID) {
writeLock.lock();
try {
- if (getState() != TaskState.RUNNING) {
+ TaskState state = getState();
+ if (state == TaskState.SCHEDULED) {
+ // the actual running task ran and is done and asking for commit. we
are still stuck
+ // in the scheduled state which indicates a backlog in event
processing. lets wait for the
+ // backlog to clear. returning false will make the attempt come back
to us.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Event processing delay. "
+ + "Attempt committing before state machine transitioned to running
: Task:" + taskId);
+ return false;
+ }
+ }
+ // at this point the attempt is no longer in scheduled state or else we
would still
+ // have been in scheduled state in task impl.
+ if (state != TaskState.RUNNING) {
LOG.info("Task not running. Issuing kill to bad commit attempt " +
taskAttemptID);
eventHandler.handle(new TaskAttemptEventKillRequest(taskAttemptID
, "Task not running. Bad attempt."));
http://git-wip-us.apache.org/repos/asf/tez/blob/1d2725b7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 88fa83d..51ed49f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -467,6 +467,28 @@ public class TestTaskImpl {
assertTaskSucceededState();
}
+
+
+ @Test(timeout = 5000)
+ public void testEventBacklogDuringTaskAttemptCommit() {
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ assertEquals(TaskState.SCHEDULED, mockTask.getState());
+ // simulate
+ // task in scheduled state due to event backlog - real task done and
calling canCommit
+ assertFalse("Commit should return false to make running task wait",
+ mockTask.canCommit(mockTask.getLastAttempt().getID()));
+ launchTaskAttempt(mockTask.getLastAttempt().getID());
+ updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
+ assertTrue("Task state in AM is running now. Can commit.",
+ mockTask.canCommit(mockTask.getLastAttempt().getID()));
+
+ updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.SUCCEEDED);
+ mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+
+ assertTaskSucceededState();
+ }
@Test