This is an automated email from the ASF dual-hosted git repository.
wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6f1046afa40 [SPARK-32170][CORE] Improve the speculation through the
stage task metrics
6f1046afa40 is described below
commit 6f1046afa40096f477b29beecca5ca6286dfa7f3
Author: weixiuli <[email protected]>
AuthorDate: Wed Jun 29 13:39:03 2022 +0800
[SPARK-32170][CORE] Improve the speculation through the stage task metrics
### What changes were proposed in this pull request?
Currently, the mechanism of speculation is as follows:
1. The number of successful tasks more than
spark.speculation.quantile(default is 0.75) * numTasks.
2. When some unsuccessful tasks run for more than
spark.speculation.multiplier( default is 1.5) * medianDuration, they will
speculate.
The mechanism means that it will be the last 10% of the tasks subject to
speculate, as long as above conditions are met.
For example: A reduce stage whose TaskSet's size is 10, and the TaskSet
state is as follows:
taskIndex | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9
-- | -- | -- | -- | -- | -- | -- | -- | -- | -- | --
All records need to deal with | 10000 | 10000 | 10000 | 10000 | 10000 |
10000 | 10000 | 10000 | 18000 | 18000
Records that have been processed | 10000 | 10000 | 10000 | 10000 | 10000
| 10000 | 10000 | 10000 | 11000 | 15000
runTime(seconds) | 20 | 20 | 20 | 20 | 20 | 20 | 20 | 20 | 30 | 30
progressRateļ¼Records that have been processed/runTimeļ¼ | 500 | 500 | 500
| 500 | 500 | 500 | 500 | 500 | 183.33 | 500
completed | true | true | true | true | true | true | true | true | false |
false
In the current speculation mechanism, both taskIndex 8 and taskIndex 9 will
speculate, however, only taskIndex 8 needs to speculate, because only taskIndex
8 is inefficient.
In our production, there are more than 110 million speculation tasks every
day, but only 30% of speculation tasks are successful finally. To analysis the
unsuccessful speculation tasks, and found that the original tasks of
speculation are more efficient, which might be unnecessary to speculate at all.
The unnecessary speculative tasks not only waste cluster resources but also
interfere with the scheduling of other tasks.
This pr will try to improve the speculation through the stage task metrics.
We use stage task metrics(inputMetrics and shuffleReadMetrics) and task
runtimes to evaluate the efficiency of task processing, and evaluate the
inefficient tasks by measuring successful ones , and only need to speculate the
inefficient tasks. With this pr, we may only speculate the taskIndex 8 in the
example above, which makes more sense and helps optimize cluster resources.
In addition, in order to avoid regression, we should try to speculate as
much as possible for long-running tasks.
### Why are the changes needed?
Improve the speculation.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add unittests.
Closes #36162 from weixiuli/improve-speculation.
Authored-by: weixiuli <[email protected]>
Signed-off-by: yi.wu <[email protected]>
---
.../org/apache/spark/executor/InputMetrics.scala | 2 +
.../org/apache/spark/internal/config/package.scala | 35 ++++
.../apache/spark/scheduler/TaskSchedulerImpl.scala | 44 +++-
.../apache/spark/scheduler/TaskSetManager.scala | 179 ++++++++++++----
.../spark/scheduler/TaskSetManagerSuite.scala | 232 +++++++++++++++++++++
docs/configuration.md | 35 ++++
6 files changed, 481 insertions(+), 46 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
index 3d15f3a0396..a398a0159cc 100644
--- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
@@ -56,4 +56,6 @@ class InputMetrics private[spark] () extends Serializable {
private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
+ // For test only
+ private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v)
}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index b67250b7b84..02a52e86454 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2073,6 +2073,41 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
+ private[spark] val SPECULATION_EFFICIENCY_TASK_PROCESS_RATE_MULTIPLIER =
+ ConfigBuilder("spark.speculation.efficiency.processRateMultiplier")
+ .doc("A multiplier that used when evaluating inefficient tasks. The
higher the multiplier " +
+ "is, the more tasks will be possibly considered as inefficient.")
+ .version("3.4.0")
+ .doubleConf
+ .checkValue(v => v > 0.0 && v <= 1.0, "multiplier must be in (0.0, 1.0]")
+ .createWithDefault(0.75)
+
+ private[spark] val SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR =
+ ConfigBuilder("spark.speculation.efficiency.longRunTaskFactor")
+ .doc(s"A task will be speculated anyway as long as its duration has
exceeded the value of " +
+ s"multiplying the factor and the time threshold (either be
${SPECULATION_MULTIPLIER.key} " +
+ s"* successfulTaskDurations.median or
${SPECULATION_MIN_THRESHOLD.key}) regardless of " +
+ s"it's data process rate is good or not. This avoids missing the
inefficient tasks when " +
+ s"task slow isn't related to data process rate.")
+ .version("3.4.0")
+ .doubleConf
+ .checkValue(_ >= 1.0, "Duration factor must be >= 1.0")
+ .createWithDefault(2.0)
+
+ private[spark] val SPECULATION_EFFICIENCY_ENABLE =
+ ConfigBuilder("spark.speculation.efficiency.enabled")
+ .doc(s"When set to true, spark will evaluate the efficiency of task
processing through the " +
+ s"stage task metrics or its duration, and only need to speculate the
inefficient tasks. " +
+ s"A task is inefficient when 1)its data process rate is less than the
average data " +
+ s"process rate of all successful tasks in the stage multiplied by a
multiplier or 2)its " +
+ s"duration has exceeded the value of multiplying " +
+ s"${SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR.key} and the time
threshold (either be " +
+ s"${SPECULATION_MULTIPLIER.key} * successfulTaskDurations.median or " +
+ s"${SPECULATION_MIN_THRESHOLD.key}).")
+ .version("3.4.0")
+ .booleanConf
+ .createWithDefault(true)
+
private[spark] val DECOMMISSION_ENABLED =
ConfigBuilder("spark.decommission.enabled")
.doc("When decommission enabled, Spark will try its best to shutdown the
executor " +
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index d20b534ee63..9bd6b976f40 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer,
HashMap, HashSet}
import scala.util.Random
import org.apache.spark._
+import org.apache.spark.InternalAccumulator.{input, shuffleRead}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.executor.ExecutorMetrics
@@ -103,6 +104,9 @@ private[spark] class TaskSchedulerImpl(
// of tasks that are very short.
val MIN_TIME_TO_SPECULATION = conf.get(SPECULATION_MIN_THRESHOLD)
+ private[scheduler] val efficientTaskCalcualtionEnabled =
conf.get(SPECULATION_ENABLED) &&
+ conf.get(SPECULATION_EFFICIENCY_ENABLE)
+
private val speculationScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")
@@ -853,8 +857,13 @@ private[spark] class TaskSchedulerImpl(
// (taskId, stageId, stageAttemptId, accumUpdates)
val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])]
= {
accumUpdates.flatMap { case (id, updates) =>
- val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None))
Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr =>
+ val (accInfos, taskProcessRate) =
getTaskAccumulableInfosAndProcessRate(updates)
+ if (efficientTaskCalcualtionEnabled && taskProcessRate > 0.0) {
+ taskSetMgr.taskProcessRateCalculator.foreach {
+ _.updateRunningTaskProcessRate(id, taskProcessRate)
+ }
+ }
(id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos)
}
}
@@ -863,6 +872,39 @@ private[spark] class TaskSchedulerImpl(
executorUpdates)
}
+ private def getTaskAccumulableInfosAndProcessRate(
+ updates: Seq[AccumulatorV2[_, _]]): (Seq[AccumulableInfo], Double) = {
+ var recordsRead = 0L
+ var executorRunTime = 0L
+ val accInfos = updates.map { acc =>
+ if (efficientTaskCalcualtionEnabled && acc.name.isDefined) {
+ val name = acc.name.get
+ if (name == shuffleRead.RECORDS_READ || name == input.RECORDS_READ) {
+ recordsRead += acc.value.asInstanceOf[Long]
+ } else if (name == InternalAccumulator.EXECUTOR_RUN_TIME) {
+ executorRunTime = acc.value.asInstanceOf[Long]
+ }
+ }
+ acc.toInfo(Some(acc.value), None)
+ }
+ val taskProcessRate = if (efficientTaskCalcualtionEnabled) {
+ getTaskProcessRate(recordsRead, executorRunTime)
+ } else {
+ 0.0D
+ }
+ (accInfos, taskProcessRate)
+ }
+
+ private[scheduler] def getTaskProcessRate(
+ recordsRead: Long,
+ executorRunTime: Long): Double = {
+ if (executorRunTime > 0 && recordsRead > 0) {
+ recordsRead / (executorRunTime / 1000.0)
+ } else {
+ 0.0D
+ }
+ }
+
def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit
= synchronized {
taskSetManager.handleTaskGettingResult(tid)
}
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 437f4860ed8..1636587e9dd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
import java.io.NotSerializableException
import java.nio.ByteBuffer
-import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue,
TimeUnit}
import scala.collection.immutable.Map
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -27,6 +27,8 @@ import scala.math.max
import scala.util.control.NonFatal
import org.apache.spark._
+import org.apache.spark.InternalAccumulator
+import org.apache.spark.InternalAccumulator.{input, shuffleRead}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.{config, Logging}
@@ -80,12 +82,17 @@ private[spark] class TaskSetManager(
val copiesRunning = new Array[Int](numTasks)
val speculationEnabled = conf.get(SPECULATION_ENABLED)
+ private val efficientTaskProcessMultiplier =
+ conf.get(SPECULATION_EFFICIENCY_TASK_PROCESS_RATE_MULTIPLIER)
+ private val efficientTaskDurationFactor =
conf.get(SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR)
+
// Quantile of tasks at which to start speculation
val speculationQuantile = conf.get(SPECULATION_QUANTILE)
val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER)
val minFinishedForSpeculation = math.max((speculationQuantile *
numTasks).floor.toInt, 1)
// User provided threshold for speculation regardless of whether the
quantile has been reached
val speculationTaskDurationThresOpt =
conf.get(SPECULATION_TASK_DURATION_THRESHOLD)
+ private val isSpeculationThresholdSpecified =
speculationTaskDurationThresOpt.exists(_ > 0)
// SPARK-29976: Only when the total number of tasks in the stage is less
than or equal to the
// number of slots on a single executor, would the task manager speculative
run the tasks if
// their duration is longer than the given threshold. In this way, we
wouldn't speculate too
@@ -109,6 +116,13 @@ private[spark] class TaskSetManager(
private val executorDecommissionKillInterval =
conf.get(EXECUTOR_DECOMMISSION_KILL_INTERVAL).map(TimeUnit.SECONDS.toMillis)
+ private[scheduler] val taskProcessRateCalculator =
+ if (sched.efficientTaskCalcualtionEnabled) {
+ Some(new TaskProcessRateCalculator())
+ } else {
+ None
+ }
+
// For each task, tracks whether a copy of the task has succeeded. A task
will also be
// marked as "succeeded" if it failed with a fetch failure, in which case it
should not
// be re-run because the missing map data needs to be regenerated first.
@@ -801,6 +815,7 @@ private[spark] class TaskSetManager(
info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
if (speculationEnabled) {
successfulTaskDurations.insert(info.duration)
+ taskProcessRateCalculator.foreach(_.updateAvgTaskProcessRate(tid,
result))
}
removeRunningTask(tid)
@@ -1069,25 +1084,66 @@ private[spark] class TaskSetManager(
* Check if the task associated with the given tid has past the time
threshold and should be
* speculative run.
*/
- private def checkAndSubmitSpeculatableTask(
- tid: Long,
+ private def checkAndSubmitSpeculatableTasks(
currentTimeMillis: Long,
- threshold: Double): Boolean = {
- val info = taskInfos(tid)
- val index = info.index
- if (!successful(index) && copiesRunning(index) == 1 &&
- info.timeRunning(currentTimeMillis) > threshold &&
!speculatableTasks.contains(index)) {
- addPendingTask(index, speculatable = true)
- logInfo(
- ("Marking task %d in stage %s (on %s) as speculatable because it ran
more" +
- " than %.0f ms(%d speculatable tasks in this taskset now)")
- .format(index, taskSet.id, info.host, threshold,
speculatableTasks.size + 1))
- speculatableTasks += index
- sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
- true
- } else {
- false
+ threshold: Double,
+ customizedThreshold: Boolean = false): Boolean = {
+ var foundTasksResult = false
+ for (tid <- runningTasksSet) {
+ val info = taskInfos(tid)
+ val index = info.index
+ if (!successful(index) && copiesRunning(index) == 1 &&
!speculatableTasks.contains(index)) {
+ val runtimeMs = info.timeRunning(currentTimeMillis)
+
+ def checkMaySpeculate(): Boolean = {
+ if (customizedThreshold || taskProcessRateCalculator.isEmpty) {
+ true
+ } else {
+ isInefficient()
+ }
+ }
+
+ def isInefficient(): Boolean = {
+ (runtimeMs > efficientTaskDurationFactor * threshold) ||
taskProcessRateIsInefficient()
+ }
+
+ def taskProcessRateIsInefficient(): Boolean = {
+ taskProcessRateCalculator.forall(calculator => {
+ calculator.getRunningTasksProcessRate(tid) <
+ calculator.getAvgTaskProcessRate() *
efficientTaskProcessMultiplier
+ })
+ }
+
+ def shouldSpeculateForExecutorDecommissioning(): Boolean = {
+ !customizedThreshold && executorDecommissionKillInterval.isDefined &&
+ !successfulTaskDurations.isEmpty() &&
+ sched.getExecutorDecommissionState(info.executorId).exists {
decomState =>
+ // Check if this task might finish after this executor is
decommissioned.
+ // We estimate the task's finish time by using the median task
duration.
+ // Whereas the time when the executor might be decommissioned is
estimated using the
+ // config executorDecommissionKillInterval. If the task is going
to finish after
+ // decommissioning, then we will eagerly speculate the task.
+ val taskEndTimeBasedOnMedianDuration =
+ info.launchTime + successfulTaskDurations.median
+ val executorDecomTime = decomState.startTime +
executorDecommissionKillInterval.get
+ executorDecomTime < taskEndTimeBasedOnMedianDuration
+ }
+ }
+ val speculated = (runtimeMs > threshold) && checkMaySpeculate() ||
+ shouldSpeculateForExecutorDecommissioning()
+ if (speculated) {
+ addPendingTask(index, speculatable = true)
+ logInfo(
+ ("Marking task %d in stage %s (on %s) as speculatable because it
ran more" +
+ " than %.0f ms(%d speculatable tasks in this taskset now)")
+ .format(index, taskSet.id, info.host, threshold,
speculatableTasks.size + 1))
+ speculatableTasks += index
+ sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
+ }
+ foundTasksResult |= speculated
+ }
}
+ foundTasksResult
}
/**
@@ -1099,7 +1155,7 @@ private[spark] class TaskSetManager(
// No need to speculate if the task set is zombie or is from a barrier
stage. If there is only
// one task we don't speculate since we don't have metrics to decide
whether it's taking too
// long or not, unless a task duration threshold is explicitly provided.
- if (isZombie || isBarrier || (numTasks == 1 &&
!speculationTaskDurationThresOpt.isDefined)) {
+ if (isZombie || isBarrier || (numTasks == 1 &&
!isSpeculationThresholdSpecified)) {
return false
}
var foundTasks = false
@@ -1109,40 +1165,24 @@ private[spark] class TaskSetManager(
// `successfulTaskDurations` may not equal to `tasksSuccessful`. Here we
should only count the
// tasks that are submitted by this `TaskSetManager` and are completed
successfully.
val numSuccessfulTasks = successfulTaskDurations.size()
+ val timeMs = clock.getTimeMillis()
if (numSuccessfulTasks >= minFinishedForSpeculation) {
- val time = clock.getTimeMillis()
val medianDuration = successfulTaskDurations.median
val threshold = max(speculationMultiplier * medianDuration,
minTimeToSpeculation)
// TODO: Threshold should also look at standard deviation of task
durations and have a lower
// bound based on that.
logDebug("Task length threshold for speculation: " + threshold)
- for (tid <- runningTasksSet) {
- var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
- if (!speculated && executorDecommissionKillInterval.isDefined) {
- val taskInfo = taskInfos(tid)
- val decomState =
sched.getExecutorDecommissionState(taskInfo.executorId)
- if (decomState.isDefined) {
- // Check if this task might finish after this executor is
decommissioned.
- // We estimate the task's finish time by using the median task
duration.
- // Whereas the time when the executor might be decommissioned is
estimated using the
- // config executorDecommissionKillInterval. If the task is going
to finish after
- // decommissioning, then we will eagerly speculate the task.
- val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime +
medianDuration
- val executorDecomTime = decomState.get.startTime +
executorDecommissionKillInterval.get
- val canExceedDeadline = executorDecomTime <
taskEndTimeBasedOnMedianDuration
- if (canExceedDeadline) {
- speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
- }
- }
- }
- foundTasks |= speculated
- }
- } else if (speculationTaskDurationThresOpt.isDefined &&
speculationTasksLessEqToSlots) {
- val time = clock.getTimeMillis()
+ foundTasks = checkAndSubmitSpeculatableTasks(timeMs, threshold)
+ } else if (isSpeculationThresholdSpecified &&
speculationTasksLessEqToSlots) {
val threshold = speculationTaskDurationThresOpt.get
logDebug(s"Tasks taking longer time than provided speculation threshold:
$threshold")
- for (tid <- runningTasksSet) {
- foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
+ foundTasks = checkAndSubmitSpeculatableTasks(timeMs, threshold,
customizedThreshold = true)
+ }
+ // avoid more warning logs.
+ if (foundTasks) {
+ val elapsedMs = clock.getTimeMillis() - timeMs
+ if (elapsedMs > minTimeToSpeculation) {
+ logWarning(s"Time to checkSpeculatableTasks ${elapsedMs}ms >
${minTimeToSpeculation}ms")
}
}
foundTasks
@@ -1218,6 +1258,55 @@ private[spark] class TaskSetManager(
def executorAdded(): Unit = {
recomputeLocality()
}
+
+ /**
+ * A class for checking inefficient tasks to be speculated, a task is
inefficient when its data
+ * process rate is less than the average data process rate of all successful
tasks in the stage
+ * multiplied by a multiplier.
+ */
+ private[TaskSetManager] class TaskProcessRateCalculator {
+ private var totalRecordsRead = 0L
+ private var totalExecutorRunTime = 0L
+ private var avgTaskProcessRate = Double.MaxValue
+ private val runningTasksProcessRate = new ConcurrentHashMap[Long, Double]()
+
+ private[TaskSetManager] def getAvgTaskProcessRate(): Double = {
+ avgTaskProcessRate
+ }
+
+ private[TaskSetManager] def getRunningTasksProcessRate(taskId: Long):
Double = {
+ runningTasksProcessRate.getOrDefault(taskId, 0.0)
+ }
+
+ private[TaskSetManager] def updateAvgTaskProcessRate(
+ taskId: Long,
+ result: DirectTaskResult[_]): Unit = {
+ var recordsRead = 0L
+ var executorRunTime = 0L
+ result.accumUpdates.foreach { a =>
+ if (a.name == Some(shuffleRead.RECORDS_READ) ||
+ a.name == Some(input.RECORDS_READ)) {
+ val acc = a.asInstanceOf[LongAccumulator]
+ recordsRead += acc.value
+ } else if (a.name == Some(InternalAccumulator.EXECUTOR_RUN_TIME)) {
+ val acc = a.asInstanceOf[LongAccumulator]
+ executorRunTime = acc.value
+ }
+ }
+ totalRecordsRead += recordsRead
+ totalExecutorRunTime += executorRunTime
+ if (totalRecordsRead > 0 && totalExecutorRunTime > 0) {
+ avgTaskProcessRate = sched.getTaskProcessRate(totalRecordsRead,
totalExecutorRunTime)
+ }
+ runningTasksProcessRate.remove(taskId)
+ }
+
+ private[scheduler] def updateRunningTaskProcessRate(
+ taskId: Long,
+ taskProcessRate: Double): Unit = {
+ runningTasksProcessRate.put(taskId, taskProcessRate)
+ }
+ }
}
private[spark] object TaskSetManager {
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index f21daa1aea6..c1c62ea4b85 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.scheduler
+import java.nio.ByteBuffer
import java.util.{Properties, Random}
import scala.collection.mutable
@@ -32,6 +33,7 @@ import org.scalatest.PrivateMethodTester
import org.scalatest.concurrent.Eventually
import org.apache.spark.{FakeSchedulerBackend => _, _}
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Tests.SKIP_VALIDATE_CORES_TESTING
@@ -203,6 +205,9 @@ class TaskSetManagerSuite
val LOCALITY_WAIT_MS = conf.get(config.LOCALITY_WAIT)
val MAX_TASK_FAILURES = 4
+ val SUBMISSION_TIME = 0L
+ val RUNTIME = 20 * 1000
+ val RECORDS_NUM = 10000L
var sched: FakeTaskScheduler = null
@@ -2245,6 +2250,233 @@ class TaskSetManagerSuite
assert(sched.speculativeTasks.size == 1)
}
+ private def createTaskMetrics(
+ taskSet: TaskSet,
+ inefficientTaskIds: Set[Int],
+ efficientMultiplier: Double = 0.6): Array[TaskMetrics] = {
+ taskSet.tasks.zipWithIndex.map { case (task, index) =>
+ val metrics = task.metrics
+ if (inefficientTaskIds.contains(index)) {
+ updateAndGetTaskMetrics(metrics, efficientMultiplier)
+ } else {
+ updateAndGetTaskMetrics(metrics, 1)
+ }
+ }
+ }
+
+ private def updateAndGetTaskMetrics(
+ taskMetrics: TaskMetrics,
+ efficientMultiplier: Double): TaskMetrics = {
+ taskMetrics.inputMetrics.setRecordsRead((efficientMultiplier *
RECORDS_NUM).toLong)
+ taskMetrics.shuffleReadMetrics.setRecordsRead((efficientMultiplier *
RECORDS_NUM).toLong)
+ taskMetrics.setExecutorRunTime(RUNTIME)
+ taskMetrics
+ }
+
+ test("SPARK-32170: test speculation for TaskSet with single task") {
+ val conf = new SparkConf()
+ .set(config.SPECULATION_ENABLED, true)
+ sc = new SparkContext("local", "test", conf)
+ Seq(0, 15).foreach { duration =>
+ sc.conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key,
duration.toString)
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+ val numTasks = 1
+ val taskSet = FakeTask.createTaskSet(numTasks)
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES,
clock = clock)
+ for ((k, v) <- List("exec1" -> "host1")) {
+ val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+ assert(taskOption.isDefined)
+ val task = taskOption.get
+ sched.taskIdToTaskSetManager.put(task.taskId, manager)
+ }
+ clock.advance(RUNTIME)
+ // runtimeMs(20s) > 15s(1 * 15s)
+ if (duration <= 0) {
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set.empty)
+ } else {
+ assert(manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set(0))
+ }
+ }
+ }
+
+ test("SPARK-32170: test SPECULATION_MIN_THRESHOLD for speculating
inefficient tasks") {
+ // set the speculation multiplier to be 0, so speculative tasks are
launched based on
+ // minTimeToSpeculation parameter to checkSpeculatableTasks
+ val conf = new SparkConf()
+ .set(config.SPECULATION_MULTIPLIER, 0.0)
+ .set(config.SPECULATION_ENABLED, true)
+ sc = new SparkContext("local", "test", conf)
+ val ser = sc.env.closureSerializer.newInstance()
+ val speculativeAllDurations = Set(0)
+ val speculativeInefficientDurations = Set(10000)
+ val nonSpeculativeDurations = Set(50000)
+ (speculativeAllDurations ++ speculativeInefficientDurations
+ ++ nonSpeculativeDurations).foreach { minDuration =>
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+ val taskSet = FakeTask.createTaskSet(5)
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES,
clock = clock)
+ val taskMetricsByTask = createTaskMetrics(taskSet, Set(3),
efficientMultiplier = 0.4)
+ val blockManagerId = BlockManagerId("exec1", "localhost", 12345)
+ // offer resources for 5 tasks to start
+ for ((k, v) <- List(
+ "exec1" -> "host1",
+ "exec1" -> "host1",
+ "exec1" -> "host1",
+ "exec2" -> "host2",
+ "exec2" -> "host2")) {
+ val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+ assert(taskOption.isDefined)
+ val task = taskOption.get
+ sched.taskIdToTaskSetManager.put(task.taskId, manager)
+ }
+ clock.advance(RUNTIME)
+ // complete the 3 tasks and leave 2 task in running
+ val task3Metrics: TaskMetrics =
+
ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(3)).array()))
+ sched.executorHeartbeatReceived("exec1", Array((3,
task3Metrics.internalAccums)),
+ blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+
+ updateAndGetTaskMetrics(taskMetricsByTask(4), efficientMultiplier = 5)
+ val task4Metrics: TaskMetrics =
+
ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(4)).array()))
+ sched.executorHeartbeatReceived("exec1", Array((4,
task4Metrics.internalAccums)),
+ blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+ for (id <- Set(0, 1, 2)) {
+ val resultBytes = ser.serialize(createTaskResult(id,
taskMetricsByTask(id).internalAccums))
+ sched.statusUpdate(tid = id, state = TaskState.FINISHED,
serializedData = resultBytes)
+ eventually(timeout(1.second), interval(10.milliseconds)) {
+ assert(sched.endedTasks(id) === Success)
+ }
+ }
+ // 1) when SPECULATION_MIN_THRESHOLD is equal 0s, both the task 3 and
the task 4 will be
+ // speculated by previous strategy.
+ // 2) when SPECULATION_MIN_THRESHOLD is equal 10s and runtime(20s) is
above 10s, the task 3
+ // will be evaluated an inefficient task to speculate, but the task 4
will not.
+ // 3) when SPECULATION_MIN_THRESHOLD is equal 50s, the task 3 and the
task 4 runtime(20s) is
+ // less than (50s) and no needs to speculate at all.
+ if (speculativeAllDurations.contains(minDuration)) {
+ assert(manager.checkSpeculatableTasks(minDuration))
+ assert(sched.speculativeTasks.toSet === Set(3, 4))
+ } else if (speculativeInefficientDurations.contains(minDuration)) {
+ assert(manager.checkSpeculatableTasks(minDuration))
+ assert(sched.speculativeTasks.toSet === Set(3))
+ } else {
+ assert(!manager.checkSpeculatableTasks(minDuration))
+ assert(sched.speculativeTasks.toSet === Set.empty)
+ }
+ }
+ }
+
+ test("SPARK-32170: test SPECULATION_EFFICIENCY_TASK_PROCESS_RATE_MULTIPLIER
for " +
+ "speculating inefficient tasks") {
+ // set the speculation multiplier to be 0, so speculative tasks are
launched based on
+ // minTimeToSpeculation parameter to checkSpeculatableTasks
+ val conf = new SparkConf()
+ .set(config.SPECULATION_MULTIPLIER, 0.0)
+ .set(config.SPECULATION_ENABLED, true)
+ sc = new SparkContext("local", "test", conf)
+ val ser = sc.env.closureSerializer.newInstance()
+ Seq(0.5, 0.8).foreach(processMultiplier => {
+ sc.conf.set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_RATE_MULTIPLIER,
processMultiplier)
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+ val taskSet = FakeTask.createTaskSet(4)
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES,
clock = clock)
+ val taskMetricsByTask = createTaskMetrics(taskSet, Set(3),
efficientMultiplier = 0.6)
+ val blockManagerId = BlockManagerId("exec1", "localhost", 12345)
+ // offer resources for 4 tasks to start
+ for ((k, v) <- List(
+ "exec1" -> "host1",
+ "exec1" -> "host1",
+ "exec2" -> "host2",
+ "exec2" -> "host2")) {
+ val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+ assert(taskOption.isDefined)
+ val task = taskOption.get
+ sched.taskIdToTaskSetManager.put(task.taskId, manager)
+ }
+ clock.advance(RUNTIME)
+ // complete the 3 tasks and leave 1 task in running
+ val taskMetrics: TaskMetrics =
+
ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(3)).array()))
+ sched.executorHeartbeatReceived("exec1", Array((3,
taskMetrics.internalAccums)),
+ blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+ for (id <- Set(0, 1, 2)) {
+ val resultBytes = ser.serialize(createTaskResult(id,
taskMetricsByTask(id).internalAccums))
+ sched.statusUpdate(tid = id, state = TaskState.FINISHED,
serializedData = resultBytes)
+ eventually(timeout(1.second), interval(10.milliseconds)) {
+ assert(sched.endedTasks(id) === Success)
+ }
+ }
+ // 0.5 < 0.6 < 0.8
+ if (processMultiplier == 0.8) {
+ assert(manager.checkSpeculatableTasks(15000))
+ assert(sched.speculativeTasks.toSet === Set(3))
+ } else {
+ assert(!manager.checkSpeculatableTasks(15000))
+ assert(sched.speculativeTasks.toSet === Set.empty)
+ }
+ })
+ }
+
+ test("SPARK-32170: test SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR for " +
+ "speculating tasks") {
+ // set the speculation multiplier to be 0, so speculative tasks are
launched based on
+ // minTimeToSpeculation parameter to checkSpeculatableTasks
+ val conf = new SparkConf()
+ .set(config.SPECULATION_MULTIPLIER, 0.0)
+ .set(config.SPECULATION_ENABLED, true)
+ .set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_RATE_MULTIPLIER, 0.5)
+ sc = new SparkContext("local", "test", conf)
+ val ser = sc.env.closureSerializer.newInstance()
+ Seq(1.0, 2.0).foreach(factor => {
+ sc.conf.set(config.SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR, factor)
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+ val taskSet = FakeTask.createTaskSet(4)
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES,
clock = clock)
+ val taskMetricsByTask = createTaskMetrics(taskSet, Set(3),
efficientMultiplier = 0.6)
+ val blockManagerId = BlockManagerId("exec1", "localhost", 12345)
+ // offer resources for 4 tasks to start
+ for ((k, v) <- List(
+ "exec1" -> "host1",
+ "exec1" -> "host1",
+ "exec2" -> "host2",
+ "exec2" -> "host2")) {
+ val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+ assert(taskOption.isDefined)
+ val task = taskOption.get
+ sched.taskIdToTaskSetManager.put(task.taskId, manager)
+ }
+ clock.advance(RUNTIME)
+ // complete the 3 tasks and leave 1 task in running
+ val taskMetrics: TaskMetrics =
+
ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(3)).array()))
+ sched.executorHeartbeatReceived("exec1", Array((3,
taskMetrics.internalAccums)),
+ blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+ for (id <- Set(0, 1, 2)) {
+ val resultBytes = ser.serialize(createTaskResult(id,
taskMetricsByTask(id).internalAccums))
+ sched.statusUpdate(tid = id, state = TaskState.FINISHED,
serializedData = resultBytes)
+ eventually(timeout(1.second), interval(10.milliseconds)) {
+ assert(sched.endedTasks(id) === Success)
+ }
+ }
+ // runtimeMs(20s) > 15s(1 * 15s)
+ if (factor == 1.0) {
+ assert(manager.checkSpeculatableTasks(15000))
+ assert(sched.speculativeTasks.toSet === Set(3))
+ } else {
+ // runtimeMs(20s) < 30s(2 * 15s)
+ assert(!manager.checkSpeculatableTasks(15000))
+ assert(sched.speculativeTasks.toSet === Set.empty)
+ }
+ })
+ }
+
test("SPARK-37580: Reset numFailures when one of task attempts succeeds") {
sc = new SparkContext("local", "test")
// Set the speculation multiplier to be 0 so speculative tasks are
launched immediately
diff --git a/docs/configuration.md b/docs/configuration.md
index a6d2a8b9d52..fd189aa88b6 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -2476,6 +2476,41 @@ Apart from these, the following properties are also
available, and may be useful
</td>
<td>3.0.0</td>
</tr>
+<tr>
+ <td><code>spark.speculation.efficiency.processRateMultiplier</code></td>
+ <td>0.75</td>
+ <td>
+ A multiplier that used when evaluating inefficient tasks. The higher the
multiplier
+ is, the more tasks will be possibly considered as inefficient.
+ </td>
+ <td>3.4.0</td>
+</tr>
+<tr>
+ <td><code>spark.speculation.efficiency.longRunTaskFactor</code></td>
+ <td>2</td>
+ <td>
+ A task will be speculated anyway as long as its duration has exceeded the
value of multiplying
+ the factor and the time threshold (either be
<code>spark.speculation.multiplier</code>
+ * successfulTaskDurations.median or
<code>spark.speculation.minTaskRuntime</code>) regardless
+ of it's data process rate is good or not. This avoids missing the
inefficient tasks when task
+ slow isn't related to data process rate.
+ </td>
+ <td>3.4.0</td>
+</tr>
+<tr>
+ <td><code>spark.speculation.efficiency.enabled</code></td>
+ <td>true</td>
+ <td>
+ When set to true, spark will evaluate the efficiency of task processing
through the stage task
+ metrics or its duration, and only need to speculate the inefficient tasks.
A task is inefficient
+ when 1)its data process rate is less than the average data process rate of
all successful tasks
+ in the stage multiplied by a multiplier or 2)its duration has exceeded the
value of multiplying
+ <code>spark.speculation.efficiency.longRunTaskFactor</code> and the time
threshold (either be
+ <code>spark.speculation.multiplier</code> *
successfulTaskDurations.median or
+ <code>spark.speculation.minTaskRuntime</code>).
+ </td>
+ <td>3.4.0</td>
+</tr>
<tr>
<td><code>spark.task.cpus</code></td>
<td>1</td>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]