This is an automated email from the ASF dual-hosted git repository. jeagles pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tez.git
commit 4125639619d5e3252fde62579d431ce5071414ea Author: Ying Han <[email protected]> AuthorDate: Fri May 10 04:59:02 2019 -0500 TEZ-4068. Prevent new speculative attempt after task has issued canCommit to an attempt Signed-off-by: Jonathan Eagles <[email protected]> --- .../java/org/apache/tez/dag/app/dag/impl/TaskImpl.java | 5 +++++ .../org/apache/tez/dag/app/dag/impl/TestTaskImpl.java | 17 +++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index e563fe9..2d0688f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -1043,6 +1043,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { LOG.info("Ignore speculation scheduling since there is no running attempt on task {}.", task.getTaskId()); return; } + if (task.commitAttempt != null) { + LOG.info("Ignore speculation scheduling for task {} since commit has started with commitAttempt {}.", + task.getTaskId(), task.commitAttempt); + return; + } task.addAndScheduleAttempt(earliestUnfinishedAttempt.getID()); } } diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index 2d4adcc..a3de936 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -999,6 +999,23 @@ public class TestTaskImpl { assertEquals(1, mockTask.getAttemptList().size()); } + @Test(timeout = 20000) + public void testIgnoreSpeculationAfterOriginalAttemptCommit() { + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt(); + launchTaskAttempt(firstAttempt.getID()); + updateAttemptState(firstAttempt, TaskAttemptState.RUNNING); + // Mock commit of the first task attempt + mockTask.canCommit(firstAttempt.getID()); + + // Verify the speculation scheduling is ignored and no speculative attempt was added to the task + mockTask.handle(createTaskTAAddSpecAttempt(firstAttempt.getID())); + MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt(); + launchTaskAttempt(specAttempt.getID()); + assertEquals(1, mockTask.getAttemptList().size()); + } + @SuppressWarnings("rawtypes") @Test public void testSucceededAttemptStatusWithRetroActiveFailures() throws InterruptedException {
