[
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15830530#comment-15830530
]
Imran Rashid commented on SPARK-18886:
--------------------------------------
I had another idea for how to fix this. In addition to tracking the last time
any task was launched, TaskScheduler also tracks the last time it didn't
schedule anything due to locality constraints, on *each resource*. Then when a
new offer comes in, you are allowed to schedule if either the overall locality
timer is up, or if the timer is up for that particular resource.
On the plus side -- I think this keeps all the properties we want. You avoid
an indefinite delay just because *one* resource is local; but you also keep
the delay if those resources get used up by another task set.
The downside -- significantly more complex. It adds to the memory usage of
TaskScheduler (though in the scheme of things, pretty nominal increase), but it
will also make the code significantly more complicated.
Aside: There is also weird relationship between taskset priority, and locality
scheduling. Assuming all tasksets have cleared their locality wait timeouts,
then we favor taskset priority over locality. But if the tasksets haven't
cleared those timeouts, then things get strange. It really depends on what the
current locality levels are in each taskset. In the simple case, you end up
favoring locality, by limiting the max Locality of each taskset. A very low
priority taskset easily "steals" the resources from a high priority one if it
doesn't have locality preferences. We should probably figure out what the
desired behavior is so we can make it a little more consistent (or at least
document it).
> Delay scheduling should not delay some executors indefinitely if one task is
> scheduled before delay timeout
> -----------------------------------------------------------------------------------------------------------
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
> Issue Type: Bug
> Components: Scheduler
> Affects Versions: 2.1.0
> Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality,
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500
> tasks. Say all tasks have a preference for one executor, which is by itself
> on one host. Given the default locality wait of 3s per level, we end up with
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks
> get scheduled on _only one_ executor. This means you're only using a 1% of
> your cluster, and you get a ~100x slowdown. You'd actually be better off if
> tasks took 7 seconds.
> *WORKAROUNDS*:
> (1) You can change the locality wait times so that it is shorter than the
> task execution time. You need to take into account the sum of all wait times
> to use all the resources on your cluster. For example, if you have resources
> on different racks, this will include the sum of
> "spark.locality.wait.process" + "spark.locality.wait.node" +
> "spark.locality.wait.rack". Those each default to "3s". The simplest way to
> be to set "spark.locality.wait.process" to your desired wait interval, and
> set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".
> For example, if your tasks take ~3 seconds on average, you might set
> "spark.locality.wait.process" to "1s". *NOTE*: due to SPARK-18967, avoid
> setting the {{spark.locality.wait=0}} -- instead, use
> {{spark.locality.wait=1ms}}.
> Note that this workaround isn't perfect --with less delay scheduling, you may
> not get as good resource locality. After this issue is fixed, you'd most
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in
> their locality preferences. Users may be able to modify their job to
> controlling the distribution of the original input data.
> (2a) A shuffle may end up with very skewed locality preferences, especially
> if you do a repartition starting from a small number of partitions. (Shuffle
> locality preference is assigned if any node has more than 20% of the shuffle
> input data -- by chance, you may have one node just above that threshold, and
> all other nodes just below it.) In this case, you can turn off locality
> preference for shuffle data by setting
> {{spark.shuffle.reduceLocality.enabled=false}}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]