Repository: tez
Updated Branches:
  refs/heads/branch-0.9 1a8179af5 -> bd2cbb050


TEZ-3934. LegacySpeculator sometime issues wrong number of speculative attempts 
(Nishant Dash via jeagles)

(cherry picked from commit fe22f3276d6d97f6b5dfab24490ee2ca32bf73c3)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bd2cbb05
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bd2cbb05
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bd2cbb05

Branch: refs/heads/branch-0.9
Commit: bd2cbb050895968c5c0a399fb1adbd8db9a27fe9
Parents: 1a8179a
Author: Jonathan Eagles <[email protected]>
Authored: Fri Jul 27 09:56:10 2018 -0500
Committer: Jonathan Eagles <[email protected]>
Committed: Fri Jul 27 09:57:39 2018 -0500

----------------------------------------------------------------------
 .../speculation/legacy/LegacySpeculator.java    | 54 ++++++++++++--------
 1 file changed, 33 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/bd2cbb05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
index 9fbea19..c132fb1 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
@@ -85,8 +85,7 @@ public class LegacySpeculator {
   // in progress.
   private static final long MAX_WAITTING_TIME_FOR_HEARTBEAT = 9 * 1000;
 
-
-  private final Set<TezTaskID> mayHaveSpeculated = new HashSet<TezTaskID>();
+  private final Set<TezTaskID> waitingToSpeculate = new HashSet<TezTaskID>();
 
   private Vertex vertex;
   private TaskRuntimeEstimator estimator;
@@ -229,24 +228,44 @@ public class LegacySpeculator {
     if (task.getState() == TaskState.SUCCEEDED) {
       return NOT_RUNNING;
     }
-    
-    if (!mayHaveSpeculated.contains(taskID) && !shouldUseTimeout) {
-      acceptableRuntime = estimator.thresholdRuntime(taskID);
-      if (acceptableRuntime == Long.MAX_VALUE) {
-        return ON_SCHEDULE;
-      }
-    }
-
-    TezTaskAttemptID runningTaskAttemptID = null;
 
     int numberRunningAttempts = 0;
 
     for (TaskAttempt taskAttempt : attempts.values()) {
-      if (taskAttempt.getState() == TaskAttemptState.RUNNING
-          || taskAttempt.getState() == TaskAttemptState.STARTING) {
+      TaskAttemptState taskAttemptState = taskAttempt.getState();
+      if (taskAttemptState == TaskAttemptState.RUNNING
+          || taskAttemptState == TaskAttemptState.STARTING) {
         if (++numberRunningAttempts > 1) {
+          waitingToSpeculate.remove(taskID);
           return ALREADY_SPECULATING;
         }
+      }
+    }
+
+    // If we are here, there's at most one task attempt.
+    if (numberRunningAttempts == 0) {
+      return NOT_RUNNING;
+    }
+
+    if ((numberRunningAttempts == 1) && waitingToSpeculate.contains(taskID)) {
+      return ALREADY_SPECULATING;
+    }
+    else {
+      if (!shouldUseTimeout) {
+        acceptableRuntime = estimator.thresholdRuntime(taskID);
+        if (acceptableRuntime == Long.MAX_VALUE) {
+          return ON_SCHEDULE;
+        }
+      }
+    }
+
+    TezTaskAttemptID runningTaskAttemptID = null;
+
+    for (TaskAttempt taskAttempt : attempts.values()) {
+      TaskAttemptState taskAttemptState = taskAttempt.getState();
+      if (taskAttemptState == TaskAttemptState.RUNNING
+          || taskAttemptState == TaskAttemptState.STARTING) {
+
         runningTaskAttemptID = taskAttempt.getID();
 
         long taskAttemptStartTime
@@ -311,13 +330,6 @@ public class LegacySpeculator {
       }
     }
 
-    // If we are here, there's at most one task attempt.
-    if (numberRunningAttempts == 0) {
-      return NOT_RUNNING;
-    }
-
-
-
     if ((acceptableRuntime == Long.MIN_VALUE) && !shouldUseTimeout) {
       acceptableRuntime = estimator.thresholdRuntime(taskID);
       if (acceptableRuntime == Long.MAX_VALUE) {
@@ -332,7 +344,7 @@ public class LegacySpeculator {
   protected void addSpeculativeAttempt(TezTaskID taskID) {
     LOG.info("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " 
+ taskID);
     vertex.scheduleSpeculativeTask(taskID);
-    mayHaveSpeculated.add(taskID);
+    waitingToSpeculate.add(taskID);
   }
 
   private int maybeScheduleASpeculation() {

Reply via email to