This is an automated email from the ASF dual-hosted git repository.
jeagles pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 7d5a183 TEZ-4062. Speculative attempt scheduling should be aborted
when Task has completed
7d5a183 is described below
commit 7d5a1830a51df64b23920901ec2e723999a9dfac
Author: Ying Han <[email protected]>
AuthorDate: Fri May 10 04:55:29 2019 -0500
TEZ-4062. Speculative attempt scheduling should be aborted when Task has
completed
Signed-off-by: Jonathan Eagles <[email protected]>
---
.../java/org/apache/tez/dag/app/dag/impl/TaskImpl.java | 5 +++++
.../org/apache/tez/dag/app/dag/impl/TestTaskImpl.java | 17 +++++++++++++++++
2 files changed, 22 insertions(+)
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index e563fe9..2d0688f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -1043,6 +1043,11 @@ public class TaskImpl implements Task,
EventHandler<TaskEvent> {
LOG.info("Ignore speculation scheduling since there is no running
attempt on task {}.", task.getTaskId());
return;
}
+ if (task.commitAttempt != null) {
+ LOG.info("Ignore speculation scheduling for task {} since commit has
started with commitAttempt {}.",
+ task.getTaskId(), task.commitAttempt);
+ return;
+ }
task.addAndScheduleAttempt(earliestUnfinishedAttempt.getID());
}
}
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 2d4adcc..a3de936 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -999,6 +999,23 @@ public class TestTaskImpl {
assertEquals(1, mockTask.getAttemptList().size());
}
+ @Test(timeout = 20000)
+ public void testIgnoreSpeculationAfterOriginalAttemptCommit() {
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
+ launchTaskAttempt(firstAttempt.getID());
+ updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
+ // Mock commit of the first task attempt
+ mockTask.canCommit(firstAttempt.getID());
+
+ // Verify the speculation scheduling is ignored and no speculative attempt
was added to the task
+ mockTask.handle(createTaskTAAddSpecAttempt(firstAttempt.getID()));
+ MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
+ launchTaskAttempt(specAttempt.getID());
+ assertEquals(1, mockTask.getAttemptList().size());
+ }
+
@SuppressWarnings("rawtypes")
@Test
public void testSucceededAttemptStatusWithRetroActiveFailures() throws
InterruptedException {