Repository: tez Updated Branches: refs/heads/branch-0.9 1a8179af5 -> bd2cbb050
TEZ-3934. LegacySpeculator sometime issues wrong number of speculative attempts (Nishant Dash via jeagles) (cherry picked from commit fe22f3276d6d97f6b5dfab24490ee2ca32bf73c3) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bd2cbb05 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bd2cbb05 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bd2cbb05 Branch: refs/heads/branch-0.9 Commit: bd2cbb050895968c5c0a399fb1adbd8db9a27fe9 Parents: 1a8179a Author: Jonathan Eagles <[email protected]> Authored: Fri Jul 27 09:56:10 2018 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Fri Jul 27 09:57:39 2018 -0500 ---------------------------------------------------------------------- .../speculation/legacy/LegacySpeculator.java | 54 ++++++++++++-------- 1 file changed, 33 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/bd2cbb05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java index 9fbea19..c132fb1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java @@ -85,8 +85,7 @@ public class LegacySpeculator { // in progress. private static final long MAX_WAITTING_TIME_FOR_HEARTBEAT = 9 * 1000; - - private final Set<TezTaskID> mayHaveSpeculated = new HashSet<TezTaskID>(); + private final Set<TezTaskID> waitingToSpeculate = new HashSet<TezTaskID>(); private Vertex vertex; private TaskRuntimeEstimator estimator; @@ -229,24 +228,44 @@ public class LegacySpeculator { if (task.getState() == TaskState.SUCCEEDED) { return NOT_RUNNING; } - - if (!mayHaveSpeculated.contains(taskID) && !shouldUseTimeout) { - acceptableRuntime = estimator.thresholdRuntime(taskID); - if (acceptableRuntime == Long.MAX_VALUE) { - return ON_SCHEDULE; - } - } - - TezTaskAttemptID runningTaskAttemptID = null; int numberRunningAttempts = 0; for (TaskAttempt taskAttempt : attempts.values()) { - if (taskAttempt.getState() == TaskAttemptState.RUNNING - || taskAttempt.getState() == TaskAttemptState.STARTING) { + TaskAttemptState taskAttemptState = taskAttempt.getState(); + if (taskAttemptState == TaskAttemptState.RUNNING + || taskAttemptState == TaskAttemptState.STARTING) { if (++numberRunningAttempts > 1) { + waitingToSpeculate.remove(taskID); return ALREADY_SPECULATING; } + } + } + + // If we are here, there's at most one task attempt. + if (numberRunningAttempts == 0) { + return NOT_RUNNING; + } + + if ((numberRunningAttempts == 1) && waitingToSpeculate.contains(taskID)) { + return ALREADY_SPECULATING; + } + else { + if (!shouldUseTimeout) { + acceptableRuntime = estimator.thresholdRuntime(taskID); + if (acceptableRuntime == Long.MAX_VALUE) { + return ON_SCHEDULE; + } + } + } + + TezTaskAttemptID runningTaskAttemptID = null; + + for (TaskAttempt taskAttempt : attempts.values()) { + TaskAttemptState taskAttemptState = taskAttempt.getState(); + if (taskAttemptState == TaskAttemptState.RUNNING + || taskAttemptState == TaskAttemptState.STARTING) { + runningTaskAttemptID = taskAttempt.getID(); long taskAttemptStartTime @@ -311,13 +330,6 @@ public class LegacySpeculator { } } - // If we are here, there's at most one task attempt. - if (numberRunningAttempts == 0) { - return NOT_RUNNING; - } - - - if ((acceptableRuntime == Long.MIN_VALUE) && !shouldUseTimeout) { acceptableRuntime = estimator.thresholdRuntime(taskID); if (acceptableRuntime == Long.MAX_VALUE) { @@ -332,7 +344,7 @@ public class LegacySpeculator { protected void addSpeculativeAttempt(TezTaskID taskID) { LOG.info("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID); vertex.scheduleSpeculativeTask(taskID); - mayHaveSpeculated.add(taskID); + waitingToSpeculate.add(taskID); } private int maybeScheduleASpeculation() {
