This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f1046afa40 [SPARK-32170][CORE] Improve the speculation through the 
stage task metrics
6f1046afa40 is described below

commit 6f1046afa40096f477b29beecca5ca6286dfa7f3
Author: weixiuli <[email protected]>
AuthorDate: Wed Jun 29 13:39:03 2022 +0800

    [SPARK-32170][CORE] Improve the speculation through the stage task metrics
    
    ### What changes were proposed in this pull request?
    
    Currently, the mechanism of speculation is as follows:
    1. The number of successful tasks more than 
spark.speculation.quantile(default is 0.75) * numTasks.
    2. When some unsuccessful tasks run for more than 
spark.speculation.multiplier( default is 1.5) * medianDuration, they will 
speculate.
    
    The mechanism means that it will be the last 10% of the tasks subject to 
speculate, as long as above conditions are met.
    
    For example: A reduce stage whose TaskSet's size is 10, and the TaskSet 
state is  as follows:
    
    taskIndex | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9
    -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | --
    All   records need to deal with | 10000 | 10000 | 10000 | 10000 | 10000 | 
10000 | 10000 | 10000 | 18000 | 18000
    Records   that have been processed | 10000 | 10000 | 10000 | 10000 | 10000 
| 10000 | 10000 | 10000 | 11000 | 15000
    runTime(seconds) | 20 | 20 | 20 | 20 | 20 | 20 | 20 | 20 | 30 | 30
    progressRate(Records   that have been processed/runTime) | 500 | 500 | 500 
| 500 | 500 | 500 | 500 | 500 | 183.33 | 500
    completed | true | true | true | true | true | true | true | true | false | 
false
    
    In the current speculation mechanism, both taskIndex 8 and taskIndex 9 will 
speculate, however, only taskIndex 8 needs to speculate, because only taskIndex 
8 is inefficient.
    
    In our production, there are more than 110 million speculation tasks every 
day, but only 30% of speculation tasks are successful finally. To analysis the 
unsuccessful speculation tasks, and found that the original tasks of 
speculation are more efficient, which might be unnecessary to speculate at all. 
 The unnecessary speculative tasks not only waste cluster resources but also 
interfere with the scheduling of other tasks.
    
    This pr will try to improve the speculation through the stage task metrics. 
We use stage task metrics(inputMetrics and shuffleReadMetrics) and task 
runtimes to evaluate the efficiency of task processing, and  evaluate the 
inefficient tasks by measuring successful ones , and only need to speculate the 
inefficient tasks.  With this pr,  we may only speculate the taskIndex 8 in the 
example above,  which makes more sense and helps optimize cluster resources.
    
    In addition, in order to avoid regression, we should try to speculate as 
much as possible for long-running tasks.
    
    ### Why are the changes needed?
    Improve the speculation.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add unittests.
    
    Closes #36162 from weixiuli/improve-speculation.
    
    Authored-by: weixiuli <[email protected]>
    Signed-off-by: yi.wu <[email protected]>
---
 .../org/apache/spark/executor/InputMetrics.scala   |   2 +
 .../org/apache/spark/internal/config/package.scala |  35 ++++
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |  44 +++-
 .../apache/spark/scheduler/TaskSetManager.scala    | 179 ++++++++++++----
 .../spark/scheduler/TaskSetManagerSuite.scala      | 232 +++++++++++++++++++++
 docs/configuration.md                              |  35 ++++
 6 files changed, 481 insertions(+), 46 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
index 3d15f3a0396..a398a0159cc 100644
--- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
@@ -56,4 +56,6 @@ class InputMetrics private[spark] () extends Serializable {
   private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
   private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
   private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
+  // For test only
+  private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v)
 }
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index b67250b7b84..02a52e86454 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2073,6 +2073,41 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
+  private[spark] val SPECULATION_EFFICIENCY_TASK_PROCESS_RATE_MULTIPLIER =
+    ConfigBuilder("spark.speculation.efficiency.processRateMultiplier")
+      .doc("A multiplier that used when evaluating inefficient tasks. The 
higher the multiplier " +
+        "is, the more tasks will be possibly considered as inefficient.")
+      .version("3.4.0")
+      .doubleConf
+      .checkValue(v => v > 0.0 && v <= 1.0, "multiplier must be in (0.0, 1.0]")
+      .createWithDefault(0.75)
+
+  private[spark] val SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR =
+    ConfigBuilder("spark.speculation.efficiency.longRunTaskFactor")
+      .doc(s"A task will be speculated anyway as long as its duration has 
exceeded the value of " +
+        s"multiplying the factor and the time threshold (either be 
${SPECULATION_MULTIPLIER.key} " +
+        s"* successfulTaskDurations.median or 
${SPECULATION_MIN_THRESHOLD.key}) regardless of " +
+        s"it's data process rate is good or not. This avoids missing the 
inefficient tasks when " +
+        s"task slow isn't related to data process rate.")
+      .version("3.4.0")
+      .doubleConf
+      .checkValue(_ >= 1.0, "Duration factor must be >= 1.0")
+      .createWithDefault(2.0)
+
+  private[spark] val SPECULATION_EFFICIENCY_ENABLE =
+    ConfigBuilder("spark.speculation.efficiency.enabled")
+      .doc(s"When set to true, spark will evaluate the efficiency of task 
processing through the " +
+        s"stage task metrics or its duration, and only need to speculate the 
inefficient tasks. " +
+        s"A task is inefficient when 1)its data process rate is less than the 
average data " +
+        s"process rate of all successful tasks in the stage multiplied by a 
multiplier or 2)its " +
+        s"duration has exceeded the value of multiplying " +
+        s"${SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR.key} and the time 
threshold (either be " +
+        s"${SPECULATION_MULTIPLIER.key} * successfulTaskDurations.median or " +
+        s"${SPECULATION_MIN_THRESHOLD.key}).")
+      .version("3.4.0")
+      .booleanConf
+      .createWithDefault(true)
+
   private[spark] val DECOMMISSION_ENABLED =
     ConfigBuilder("spark.decommission.enabled")
       .doc("When decommission enabled, Spark will try its best to shutdown the 
executor " +
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index d20b534ee63..9bd6b976f40 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, 
HashMap, HashSet}
 import scala.util.Random
 
 import org.apache.spark._
+import org.apache.spark.InternalAccumulator.{input, shuffleRead}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.executor.ExecutorMetrics
@@ -103,6 +104,9 @@ private[spark] class TaskSchedulerImpl(
   // of tasks that are very short.
   val MIN_TIME_TO_SPECULATION = conf.get(SPECULATION_MIN_THRESHOLD)
 
+  private[scheduler] val efficientTaskCalcualtionEnabled = 
conf.get(SPECULATION_ENABLED) &&
+    conf.get(SPECULATION_EFFICIENCY_ENABLE)
+
   private val speculationScheduler =
     
ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")
 
@@ -853,8 +857,13 @@ private[spark] class TaskSchedulerImpl(
     // (taskId, stageId, stageAttemptId, accumUpdates)
     val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] 
= {
       accumUpdates.flatMap { case (id, updates) =>
-        val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None))
         Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr =>
+          val (accInfos, taskProcessRate) = 
getTaskAccumulableInfosAndProcessRate(updates)
+          if (efficientTaskCalcualtionEnabled && taskProcessRate > 0.0) {
+            taskSetMgr.taskProcessRateCalculator.foreach {
+              _.updateRunningTaskProcessRate(id, taskProcessRate)
+            }
+          }
           (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos)
         }
       }
@@ -863,6 +872,39 @@ private[spark] class TaskSchedulerImpl(
       executorUpdates)
   }
 
+ private def getTaskAccumulableInfosAndProcessRate(
+     updates: Seq[AccumulatorV2[_, _]]): (Seq[AccumulableInfo], Double) = {
+   var recordsRead = 0L
+   var executorRunTime = 0L
+   val accInfos = updates.map { acc =>
+     if (efficientTaskCalcualtionEnabled && acc.name.isDefined) {
+       val name = acc.name.get
+       if (name == shuffleRead.RECORDS_READ || name == input.RECORDS_READ) {
+         recordsRead += acc.value.asInstanceOf[Long]
+       } else if (name == InternalAccumulator.EXECUTOR_RUN_TIME) {
+         executorRunTime = acc.value.asInstanceOf[Long]
+       }
+     }
+     acc.toInfo(Some(acc.value), None)
+   }
+   val taskProcessRate = if (efficientTaskCalcualtionEnabled) {
+     getTaskProcessRate(recordsRead, executorRunTime)
+   } else {
+     0.0D
+   }
+   (accInfos, taskProcessRate)
+ }
+
+  private[scheduler] def getTaskProcessRate(
+      recordsRead: Long,
+      executorRunTime: Long): Double = {
+    if (executorRunTime > 0 && recordsRead > 0) {
+      recordsRead / (executorRunTime / 1000.0)
+    } else {
+      0.0D
+    }
+  }
+
   def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit 
= synchronized {
     taskSetManager.handleTaskGettingResult(tid)
   }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 437f4860ed8..1636587e9dd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
 
 import java.io.NotSerializableException
 import java.nio.ByteBuffer
-import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, 
TimeUnit}
 
 import scala.collection.immutable.Map
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -27,6 +27,8 @@ import scala.math.max
 import scala.util.control.NonFatal
 
 import org.apache.spark._
+import org.apache.spark.InternalAccumulator
+import org.apache.spark.InternalAccumulator.{input, shuffleRead}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.{config, Logging}
@@ -80,12 +82,17 @@ private[spark] class TaskSetManager(
   val copiesRunning = new Array[Int](numTasks)
 
   val speculationEnabled = conf.get(SPECULATION_ENABLED)
+  private val efficientTaskProcessMultiplier =
+    conf.get(SPECULATION_EFFICIENCY_TASK_PROCESS_RATE_MULTIPLIER)
+  private val efficientTaskDurationFactor = 
conf.get(SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR)
+
   // Quantile of tasks at which to start speculation
   val speculationQuantile = conf.get(SPECULATION_QUANTILE)
   val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER)
   val minFinishedForSpeculation = math.max((speculationQuantile * 
numTasks).floor.toInt, 1)
   // User provided threshold for speculation regardless of whether the 
quantile has been reached
   val speculationTaskDurationThresOpt = 
conf.get(SPECULATION_TASK_DURATION_THRESHOLD)
+  private val isSpeculationThresholdSpecified = 
speculationTaskDurationThresOpt.exists(_ > 0)
   // SPARK-29976: Only when the total number of tasks in the stage is less 
than or equal to the
   // number of slots on a single executor, would the task manager speculative 
run the tasks if
   // their duration is longer than the given threshold. In this way, we 
wouldn't speculate too
@@ -109,6 +116,13 @@ private[spark] class TaskSetManager(
   private val executorDecommissionKillInterval =
     
conf.get(EXECUTOR_DECOMMISSION_KILL_INTERVAL).map(TimeUnit.SECONDS.toMillis)
 
+  private[scheduler] val taskProcessRateCalculator =
+    if (sched.efficientTaskCalcualtionEnabled) {
+      Some(new TaskProcessRateCalculator())
+    } else {
+      None
+    }
+
   // For each task, tracks whether a copy of the task has succeeded. A task 
will also be
   // marked as "succeeded" if it failed with a fetch failure, in which case it 
should not
   // be re-run because the missing map data needs to be regenerated first.
@@ -801,6 +815,7 @@ private[spark] class TaskSetManager(
     info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
     if (speculationEnabled) {
       successfulTaskDurations.insert(info.duration)
+      taskProcessRateCalculator.foreach(_.updateAvgTaskProcessRate(tid, 
result))
     }
     removeRunningTask(tid)
 
@@ -1069,25 +1084,66 @@ private[spark] class TaskSetManager(
    * Check if the task associated with the given tid has past the time 
threshold and should be
    * speculative run.
    */
-  private def checkAndSubmitSpeculatableTask(
-      tid: Long,
+  private def checkAndSubmitSpeculatableTasks(
       currentTimeMillis: Long,
-      threshold: Double): Boolean = {
-    val info = taskInfos(tid)
-    val index = info.index
-    if (!successful(index) && copiesRunning(index) == 1 &&
-        info.timeRunning(currentTimeMillis) > threshold && 
!speculatableTasks.contains(index)) {
-      addPendingTask(index, speculatable = true)
-      logInfo(
-        ("Marking task %d in stage %s (on %s) as speculatable because it ran 
more" +
-          " than %.0f ms(%d speculatable tasks in this taskset now)")
-          .format(index, taskSet.id, info.host, threshold, 
speculatableTasks.size + 1))
-      speculatableTasks += index
-      sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
-      true
-    } else {
-      false
+      threshold: Double,
+      customizedThreshold: Boolean = false): Boolean = {
+    var foundTasksResult = false
+    for (tid <- runningTasksSet) {
+      val info = taskInfos(tid)
+      val index = info.index
+      if (!successful(index) && copiesRunning(index) == 1 && 
!speculatableTasks.contains(index)) {
+        val runtimeMs = info.timeRunning(currentTimeMillis)
+
+        def checkMaySpeculate(): Boolean = {
+          if (customizedThreshold || taskProcessRateCalculator.isEmpty) {
+            true
+          } else {
+            isInefficient()
+          }
+        }
+
+        def isInefficient(): Boolean = {
+          (runtimeMs > efficientTaskDurationFactor * threshold) || 
taskProcessRateIsInefficient()
+        }
+
+        def taskProcessRateIsInefficient(): Boolean = {
+          taskProcessRateCalculator.forall(calculator => {
+            calculator.getRunningTasksProcessRate(tid) <
+              calculator.getAvgTaskProcessRate() * 
efficientTaskProcessMultiplier
+          })
+        }
+
+        def shouldSpeculateForExecutorDecommissioning(): Boolean = {
+          !customizedThreshold && executorDecommissionKillInterval.isDefined &&
+            !successfulTaskDurations.isEmpty() &&
+            sched.getExecutorDecommissionState(info.executorId).exists { 
decomState =>
+              // Check if this task might finish after this executor is 
decommissioned.
+              // We estimate the task's finish time by using the median task 
duration.
+              // Whereas the time when the executor might be decommissioned is 
estimated using the
+              // config executorDecommissionKillInterval. If the task is going 
to finish after
+              // decommissioning, then we will eagerly speculate the task.
+              val taskEndTimeBasedOnMedianDuration =
+                info.launchTime + successfulTaskDurations.median
+              val executorDecomTime = decomState.startTime + 
executorDecommissionKillInterval.get
+              executorDecomTime < taskEndTimeBasedOnMedianDuration
+            }
+        }
+        val speculated = (runtimeMs > threshold) && checkMaySpeculate() ||
+          shouldSpeculateForExecutorDecommissioning()
+        if (speculated) {
+          addPendingTask(index, speculatable = true)
+          logInfo(
+            ("Marking task %d in stage %s (on %s) as speculatable because it 
ran more" +
+              " than %.0f ms(%d speculatable tasks in this taskset now)")
+              .format(index, taskSet.id, info.host, threshold, 
speculatableTasks.size + 1))
+          speculatableTasks += index
+          sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
+        }
+        foundTasksResult |= speculated
+      }
     }
+    foundTasksResult
   }
 
   /**
@@ -1099,7 +1155,7 @@ private[spark] class TaskSetManager(
     // No need to speculate if the task set is zombie or is from a barrier 
stage. If there is only
     // one task we don't speculate since we don't have metrics to decide 
whether it's taking too
     // long or not, unless a task duration threshold is explicitly provided.
-    if (isZombie || isBarrier || (numTasks == 1 && 
!speculationTaskDurationThresOpt.isDefined)) {
+    if (isZombie || isBarrier || (numTasks == 1 && 
!isSpeculationThresholdSpecified)) {
       return false
     }
     var foundTasks = false
@@ -1109,40 +1165,24 @@ private[spark] class TaskSetManager(
     // `successfulTaskDurations` may not equal to `tasksSuccessful`. Here we 
should only count the
     // tasks that are submitted by this `TaskSetManager` and are completed 
successfully.
     val numSuccessfulTasks = successfulTaskDurations.size()
+    val timeMs = clock.getTimeMillis()
     if (numSuccessfulTasks >= minFinishedForSpeculation) {
-      val time = clock.getTimeMillis()
       val medianDuration = successfulTaskDurations.median
       val threshold = max(speculationMultiplier * medianDuration, 
minTimeToSpeculation)
       // TODO: Threshold should also look at standard deviation of task 
durations and have a lower
       // bound based on that.
       logDebug("Task length threshold for speculation: " + threshold)
-      for (tid <- runningTasksSet) {
-        var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
-        if (!speculated && executorDecommissionKillInterval.isDefined) {
-          val taskInfo = taskInfos(tid)
-          val decomState = 
sched.getExecutorDecommissionState(taskInfo.executorId)
-          if (decomState.isDefined) {
-            // Check if this task might finish after this executor is 
decommissioned.
-            // We estimate the task's finish time by using the median task 
duration.
-            // Whereas the time when the executor might be decommissioned is 
estimated using the
-            // config executorDecommissionKillInterval. If the task is going 
to finish after
-            // decommissioning, then we will eagerly speculate the task.
-            val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + 
medianDuration
-            val executorDecomTime = decomState.get.startTime + 
executorDecommissionKillInterval.get
-            val canExceedDeadline = executorDecomTime < 
taskEndTimeBasedOnMedianDuration
-            if (canExceedDeadline) {
-              speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
-            }
-          }
-        }
-        foundTasks |= speculated
-      }
-    } else if (speculationTaskDurationThresOpt.isDefined && 
speculationTasksLessEqToSlots) {
-      val time = clock.getTimeMillis()
+      foundTasks = checkAndSubmitSpeculatableTasks(timeMs, threshold)
+    } else if (isSpeculationThresholdSpecified && 
speculationTasksLessEqToSlots) {
       val threshold = speculationTaskDurationThresOpt.get
       logDebug(s"Tasks taking longer time than provided speculation threshold: 
$threshold")
-      for (tid <- runningTasksSet) {
-        foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
+      foundTasks = checkAndSubmitSpeculatableTasks(timeMs, threshold, 
customizedThreshold = true)
+    }
+    // avoid more warning logs.
+    if (foundTasks) {
+      val elapsedMs = clock.getTimeMillis() - timeMs
+      if (elapsedMs > minTimeToSpeculation) {
+        logWarning(s"Time to checkSpeculatableTasks ${elapsedMs}ms > 
${minTimeToSpeculation}ms")
       }
     }
     foundTasks
@@ -1218,6 +1258,55 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, a task is 
inefficient when its data
+   * process rate is less than the average data process rate of all successful 
tasks in the stage
+   * multiplied by a multiplier.
+   */
+  private[TaskSetManager] class TaskProcessRateCalculator {
+    private var totalRecordsRead = 0L
+    private var totalExecutorRunTime = 0L
+    private var avgTaskProcessRate = Double.MaxValue
+    private val runningTasksProcessRate = new ConcurrentHashMap[Long, Double]()
+
+    private[TaskSetManager] def getAvgTaskProcessRate(): Double = {
+      avgTaskProcessRate
+    }
+
+    private[TaskSetManager] def getRunningTasksProcessRate(taskId: Long): 
Double = {
+      runningTasksProcessRate.getOrDefault(taskId, 0.0)
+    }
+
+    private[TaskSetManager] def updateAvgTaskProcessRate(
+        taskId: Long,
+        result: DirectTaskResult[_]): Unit = {
+      var recordsRead = 0L
+      var executorRunTime = 0L
+      result.accumUpdates.foreach { a =>
+        if (a.name == Some(shuffleRead.RECORDS_READ) ||
+          a.name == Some(input.RECORDS_READ)) {
+          val acc = a.asInstanceOf[LongAccumulator]
+          recordsRead += acc.value
+        } else if (a.name == Some(InternalAccumulator.EXECUTOR_RUN_TIME)) {
+          val acc = a.asInstanceOf[LongAccumulator]
+          executorRunTime = acc.value
+        }
+      }
+      totalRecordsRead += recordsRead
+      totalExecutorRunTime += executorRunTime
+      if (totalRecordsRead > 0 && totalExecutorRunTime > 0) {
+        avgTaskProcessRate = sched.getTaskProcessRate(totalRecordsRead, 
totalExecutorRunTime)
+      }
+      runningTasksProcessRate.remove(taskId)
+    }
+
+    private[scheduler] def updateRunningTaskProcessRate(
+        taskId: Long,
+        taskProcessRate: Double): Unit = {
+      runningTasksProcessRate.put(taskId, taskProcessRate)
+    }
+  }
 }
 
 private[spark] object TaskSetManager {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index f21daa1aea6..c1c62ea4b85 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.scheduler
 
+import java.nio.ByteBuffer
 import java.util.{Properties, Random}
 
 import scala.collection.mutable
@@ -32,6 +33,7 @@ import org.scalatest.PrivateMethodTester
 import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.{FakeSchedulerBackend => _, _}
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config.Tests.SKIP_VALIDATE_CORES_TESTING
@@ -203,6 +205,9 @@ class TaskSetManagerSuite
 
   val LOCALITY_WAIT_MS = conf.get(config.LOCALITY_WAIT)
   val MAX_TASK_FAILURES = 4
+  val SUBMISSION_TIME = 0L
+  val RUNTIME = 20 * 1000
+  val RECORDS_NUM = 10000L
 
   var sched: FakeTaskScheduler = null
 
@@ -2245,6 +2250,233 @@ class TaskSetManagerSuite
     assert(sched.speculativeTasks.size == 1)
   }
 
+  private def createTaskMetrics(
+       taskSet: TaskSet,
+       inefficientTaskIds: Set[Int],
+       efficientMultiplier: Double = 0.6): Array[TaskMetrics] = {
+    taskSet.tasks.zipWithIndex.map { case (task, index) =>
+      val metrics = task.metrics
+      if (inefficientTaskIds.contains(index)) {
+        updateAndGetTaskMetrics(metrics, efficientMultiplier)
+      } else {
+        updateAndGetTaskMetrics(metrics, 1)
+      }
+    }
+  }
+
+  private def updateAndGetTaskMetrics(
+      taskMetrics: TaskMetrics,
+      efficientMultiplier: Double): TaskMetrics = {
+    taskMetrics.inputMetrics.setRecordsRead((efficientMultiplier * 
RECORDS_NUM).toLong)
+    taskMetrics.shuffleReadMetrics.setRecordsRead((efficientMultiplier * 
RECORDS_NUM).toLong)
+    taskMetrics.setExecutorRunTime(RUNTIME)
+    taskMetrics
+  }
+
+  test("SPARK-32170: test speculation for TaskSet with single task") {
+    val conf = new SparkConf()
+      .set(config.SPECULATION_ENABLED, true)
+    sc = new SparkContext("local", "test", conf)
+    Seq(0, 15).foreach { duration =>
+      sc.conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, 
duration.toString)
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val numTasks = 1
+      val taskSet = FakeTask.createTaskSet(numTasks)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      for ((k, v) <- List("exec1" -> "host1")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // runtimeMs(20s) > 15s(1 * 15s)
+      if (duration <= 0) {
+        assert(!manager.checkSpeculatableTasks(0))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      } else {
+        assert(manager.checkSpeculatableTasks(0))
+        assert(sched.speculativeTasks.toSet === Set(0))
+      }
+    }
+  }
+
+  test("SPARK-32170: test SPECULATION_MIN_THRESHOLD for speculating 
inefficient tasks") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched based on
+    // minTimeToSpeculation parameter to checkSpeculatableTasks
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER, 0.0)
+      .set(config.SPECULATION_ENABLED, true)
+    sc = new SparkContext("local", "test", conf)
+    val ser = sc.env.closureSerializer.newInstance()
+    val speculativeAllDurations = Set(0)
+    val speculativeInefficientDurations = Set(10000)
+    val nonSpeculativeDurations = Set(50000)
+    (speculativeAllDurations ++ speculativeInefficientDurations
+      ++ nonSpeculativeDurations).foreach { minDuration =>
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val taskSet = FakeTask.createTaskSet(5)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      val taskMetricsByTask = createTaskMetrics(taskSet, Set(3), 
efficientMultiplier = 0.4)
+      val blockManagerId = BlockManagerId("exec1", "localhost", 12345)
+      // offer resources for 5 tasks to start
+      for ((k, v) <- List(
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec2" -> "host2",
+        "exec2" -> "host2")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // complete the 3 tasks and leave 2 task in running
+      val task3Metrics: TaskMetrics =
+        
ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(3)).array()))
+      sched.executorHeartbeatReceived("exec1", Array((3, 
task3Metrics.internalAccums)),
+        blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+
+      updateAndGetTaskMetrics(taskMetricsByTask(4), efficientMultiplier = 5)
+      val task4Metrics: TaskMetrics =
+        
ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(4)).array()))
+      sched.executorHeartbeatReceived("exec1", Array((4, 
task4Metrics.internalAccums)),
+        blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+      for (id <- Set(0, 1, 2)) {
+        val resultBytes = ser.serialize(createTaskResult(id, 
taskMetricsByTask(id).internalAccums))
+        sched.statusUpdate(tid = id, state = TaskState.FINISHED, 
serializedData = resultBytes)
+        eventually(timeout(1.second), interval(10.milliseconds)) {
+          assert(sched.endedTasks(id) === Success)
+        }
+      }
+      // 1) when SPECULATION_MIN_THRESHOLD is equal 0s, both the task 3 and 
the task 4 will be
+      // speculated by previous strategy.
+      // 2) when SPECULATION_MIN_THRESHOLD is equal 10s and runtime(20s) is 
above 10s, the task 3
+      //  will be evaluated an inefficient task to speculate, but the task 4 
will not.
+      // 3) when SPECULATION_MIN_THRESHOLD is equal 50s, the task 3 and the 
task 4 runtime(20s) is
+      // less than (50s) and no needs to speculate at all.
+      if (speculativeAllDurations.contains(minDuration)) {
+        assert(manager.checkSpeculatableTasks(minDuration))
+        assert(sched.speculativeTasks.toSet === Set(3, 4))
+      } else if (speculativeInefficientDurations.contains(minDuration)) {
+        assert(manager.checkSpeculatableTasks(minDuration))
+        assert(sched.speculativeTasks.toSet === Set(3))
+      } else {
+        assert(!manager.checkSpeculatableTasks(minDuration))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      }
+    }
+  }
+
+  test("SPARK-32170: test SPECULATION_EFFICIENCY_TASK_PROCESS_RATE_MULTIPLIER 
for " +
+    "speculating inefficient tasks") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched based on
+    // minTimeToSpeculation parameter to checkSpeculatableTasks
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER, 0.0)
+      .set(config.SPECULATION_ENABLED, true)
+    sc = new SparkContext("local", "test", conf)
+    val ser = sc.env.closureSerializer.newInstance()
+    Seq(0.5, 0.8).foreach(processMultiplier => {
+      sc.conf.set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_RATE_MULTIPLIER, 
processMultiplier)
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val taskSet = FakeTask.createTaskSet(4)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      val taskMetricsByTask = createTaskMetrics(taskSet, Set(3), 
efficientMultiplier = 0.6)
+      val blockManagerId = BlockManagerId("exec1", "localhost", 12345)
+      // offer resources for 4 tasks to start
+      for ((k, v) <- List(
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec2" -> "host2",
+        "exec2" -> "host2")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // complete the 3 tasks and leave 1 task in running
+      val taskMetrics: TaskMetrics =
+        
ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(3)).array()))
+      sched.executorHeartbeatReceived("exec1", Array((3, 
taskMetrics.internalAccums)),
+        blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+      for (id <- Set(0, 1, 2)) {
+        val resultBytes = ser.serialize(createTaskResult(id, 
taskMetricsByTask(id).internalAccums))
+        sched.statusUpdate(tid = id, state = TaskState.FINISHED, 
serializedData = resultBytes)
+        eventually(timeout(1.second), interval(10.milliseconds)) {
+          assert(sched.endedTasks(id) === Success)
+        }
+      }
+      // 0.5 < 0.6 < 0.8
+      if (processMultiplier == 0.8) {
+        assert(manager.checkSpeculatableTasks(15000))
+        assert(sched.speculativeTasks.toSet === Set(3))
+      } else {
+        assert(!manager.checkSpeculatableTasks(15000))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      }
+    })
+  }
+
+  test("SPARK-32170: test SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR for " +
+    "speculating tasks") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched based on
+    // minTimeToSpeculation parameter to checkSpeculatableTasks
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER, 0.0)
+      .set(config.SPECULATION_ENABLED, true)
+      .set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_RATE_MULTIPLIER, 0.5)
+    sc = new SparkContext("local", "test", conf)
+    val ser = sc.env.closureSerializer.newInstance()
+    Seq(1.0, 2.0).foreach(factor => {
+      sc.conf.set(config.SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR, factor)
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val taskSet = FakeTask.createTaskSet(4)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      val taskMetricsByTask = createTaskMetrics(taskSet, Set(3), 
efficientMultiplier = 0.6)
+      val blockManagerId = BlockManagerId("exec1", "localhost", 12345)
+      // offer resources for 4 tasks to start
+      for ((k, v) <- List(
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec2" -> "host2",
+        "exec2" -> "host2")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // complete the 3 tasks and leave 1 task in running
+      val taskMetrics: TaskMetrics =
+        
ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(3)).array()))
+      sched.executorHeartbeatReceived("exec1", Array((3, 
taskMetrics.internalAccums)),
+        blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+      for (id <- Set(0, 1, 2)) {
+        val resultBytes = ser.serialize(createTaskResult(id, 
taskMetricsByTask(id).internalAccums))
+        sched.statusUpdate(tid = id, state = TaskState.FINISHED, 
serializedData = resultBytes)
+        eventually(timeout(1.second), interval(10.milliseconds)) {
+          assert(sched.endedTasks(id) === Success)
+        }
+      }
+      // runtimeMs(20s) > 15s(1 * 15s)
+      if (factor == 1.0) {
+        assert(manager.checkSpeculatableTasks(15000))
+        assert(sched.speculativeTasks.toSet === Set(3))
+      } else {
+        // runtimeMs(20s) < 30s(2 * 15s)
+        assert(!manager.checkSpeculatableTasks(15000))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      }
+    })
+  }
+
   test("SPARK-37580: Reset numFailures when one of task attempts succeeds") {
     sc = new SparkContext("local", "test")
     // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
diff --git a/docs/configuration.md b/docs/configuration.md
index a6d2a8b9d52..fd189aa88b6 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -2476,6 +2476,41 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
   <td>3.0.0</td>
 </tr>
+<tr>
+  <td><code>spark.speculation.efficiency.processRateMultiplier</code></td>
+  <td>0.75</td>
+  <td>
+    A multiplier that used when evaluating inefficient tasks. The higher the 
multiplier
+    is, the more tasks will be possibly considered as inefficient.
+  </td>
+  <td>3.4.0</td>
+</tr>
+<tr>
+  <td><code>spark.speculation.efficiency.longRunTaskFactor</code></td>
+  <td>2</td>
+  <td>
+    A task will be speculated anyway as long as its duration has exceeded the 
value of multiplying
+    the factor and the time threshold (either be 
<code>spark.speculation.multiplier</code>
+    * successfulTaskDurations.median or 
<code>spark.speculation.minTaskRuntime</code>) regardless
+    of it's data process rate is good or not. This avoids missing the 
inefficient tasks when task
+    slow isn't related to data process rate.
+  </td>
+  <td>3.4.0</td>
+</tr>
+<tr>
+  <td><code>spark.speculation.efficiency.enabled</code></td>
+  <td>true</td>
+  <td>
+    When set to true, spark will evaluate the efficiency of task processing 
through the stage task
+    metrics or its duration, and only need to speculate the inefficient tasks. 
A task is inefficient
+    when 1)its data process rate is less than the average data process rate of 
all successful tasks
+    in the stage multiplied by a multiplier or 2)its duration has exceeded the 
value of multiplying
+     <code>spark.speculation.efficiency.longRunTaskFactor</code> and the time 
threshold (either be
+     <code>spark.speculation.multiplier</code> * 
successfulTaskDurations.median or
+    <code>spark.speculation.minTaskRuntime</code>).
+  </td>
+  <td>3.4.0</td>
+</tr>
 <tr>
   <td><code>spark.task.cpus</code></td>
   <td>1</td>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to