[
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15751956#comment-15751956
]
Imran Rashid commented on SPARK-18886:
--------------------------------------
Here's a failing test case: (you can check it out directly here:
https://github.com/squito/spark/tree/delay_sched-SPARK-18886)
{code}
test("Delay scheduling checks utilization at each locality level") {
// Create a cluster with 100 executors, and submit 100 tasks, but each task
would prefer to
// be on the same node in the cluster. We should not wait to schedule each
task on the one
// executor.
val conf = new SparkConf().set("spark.locality.wait", "1s")
sc = new SparkContext("local", "test", conf)
val execs = Seq(("exec0", "host0")) ++ (1 to 100).map { x => (s"exec$x",
s"host$x") }
val sched = new FakeTaskScheduler(sc, execs: _*)
val tasks = FakeTask.createTaskSet(500, (1 to 500).map { _ =>
Seq(TaskLocation(TaskLocation.executorLocationTag + "host0_exec0"))}: _*)
val clock = new ManualClock
val manager = new TaskSetManager(sched, tasks, MAX_TASK_FAILURES, clock)
logInfo("initial locality levels = " +
manager.myLocalityLevels.mkString(","))
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL,
NODE_LOCAL, ANY)))
// initially, the locality preferences should lead us to only schedule
tasks on one executor
logInfo(s"trying to schedule first task at ${clock.getTimeMillis()}")
val firstScheduledTask = execs.flatMap { case (exec, host) =>
val schedTaskOpt = manager.resourceOffer(execId = exec, host = host, ANY)
assert(schedTaskOpt.isDefined === (exec == "exec0"))
schedTaskOpt
}.head
// without advancing the clock, no matter how many times we make offers on
the *other*
// executors, nothing should get scheduled
(0 until 50).foreach { _ =>
execs.foreach { case (exec, host) =>
if (exec != "exec0") {
assert(manager.resourceOffer(execId = exec, host = host, ANY).isEmpty)
}
}
}
// now we advance the clock till just *before* the locality delay is up,
and we finish the first
// task
val processWait = sc.getConf.getTimeAsMs("spark.locality.wait.process",
"3s")
val nodeWait = sc.getConf.getTimeAsMs("spark.locality.wait.node", "3s")
clock.advance(processWait + nodeWait - 1)
logInfo(s"finishing first task at ${clock.getTimeMillis()}")
manager.handleSuccessfulTask(firstScheduledTask.taskId,
createTaskResult(firstScheduledTask.index))
// if we offer all the resources again, still we should only schedule on
one executor
logInfo(s"trying to schedule second task at ${clock.getTimeMillis()}")
val secondScheduledTask = execs.flatMap { case (exec, host) =>
val schedTaskOpt = manager.resourceOffer(execId = exec, host = host, ANY)
assert(schedTaskOpt.isDefined === (exec == "exec0"))
schedTaskOpt
}.head
// Now lets advance the clock further, so that all of our other executors
have been sitting
// idle for longer than the locality wait time. We have managed to
schedule *something* at a
// lower locality level within the time, but regardless, we *should* still
schedule on the all
// the other resources by this point
clock.advance(2000)
// this would pass if we advanced the clock by this much instead
// clock.advance(processWait + nodeWait + 10)
logInfo(s"trying to schedule everyting at ${clock.getTimeMillis()}")
execs.foreach { case (exec, host) =>
if (exec != "exec0") {
withClue(s"trying to schedule on $exec:$host at time
${clock.getTimeMillis()}") {
assert(manager.resourceOffer(execId = exec, host = host,
ANY).isDefined)
}
}
}
}
{code}
> Delay scheduling should not delay some executors indefinitely if one task is
> scheduled before delay timeout
> -----------------------------------------------------------------------------------------------------------
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
> Issue Type: Bug
> Components: Scheduler
> Affects Versions: 2.1.0
> Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality,
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500
> tasks. Say all tasks have a preference for one executor, which is by itself
> on one host. Given the default locality wait of 3s per level, we end up with
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks
> get scheduled on _only one_ executor. This means you're only using a 1% of
> your cluster, and you get a ~100x slowdown. You'd actually be better off if
> tasks took 7 seconds.
> *WORKAROUNDS*:
> (1) You can change the locality wait times so that it is shorter than the
> task execution time. You need to take into account the sum of all wait times
> to use all the resources on your cluster. For example, if you have resources
> on different racks, this will include the sum of
> "spark.locality.wait.process" + "spark.locality.wait.node" +
> "spark.locality.wait.rack". Those each default to "3s". The simplest way to
> be to set "spark.locality.process" to your desired wait interval, and set
> both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". For
> example, if your tasks take ~3 seconds on average, you might set
> "spark.locality.wait.process" to "1s".
> Note that this workaround isn't perfect --with less delay scheduling, you may
> not get as good resource locality. After this issue is fixed, you'd most
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in
> their locality preferences. Users may be able to modify their job to
> controlling the distribution of the original input data.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]