[
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15931009#comment-15931009
]
Kay Ousterhout commented on SPARK-18886:
----------------------------------------
Sorry for the slow response here! I realized this is the same issue as
SPARK-11460 (although that JIRA proposed a slightly different solution), which
stalled for reasons that are completely my fault (I neglected it because I
couldn't think of a practical way of solving it).
Imran, unfortunately I don't think your latest idea will quite work. Delay
scheduling was originally intended for situations where the number of slots
that a particular job could use was limited by a fairness policy. In that
case, it can be better to wait a bit for a "better" slot (i.e., one that
satisfies locality preferences). In particular, if you never wait, you end up
with this "sticky slot" issue where tasks for a job keep finishing up in a
"bad" slot (one with no locality preferences), and then they'll be re-offered
to the same job, which will again accept the bad slot. If the job just waited
a bit, it could get a better slot (e.g., as a result of tasks from another job
finishing). [1]
This relates to your idea because of the following situation: suppose you have
a cluster with 10 machines, the job has locality preferences for 5 of them
(with ids 1, 2, 3, 4, 5), and fairness dictates that the job can only use 3
slots at a time (e.g., it's sharing equally with 2 other jobs). Suppose that
for a long time, the job has been running tasks on slots 1, 2, and 3 (so local
slots). At this point, the times for machines 6, 7, 8, 9, and 10 will have
expired, because the job has been running for a while. But if the job is now
offered a slot on one of those non-local machines (e.g., 6), the job hasn't
been waiting long for non-local resources: until this point, it's been running
it's full share of 3 slots at a time, and it's been doing so on machines that
satisfy locality preferences. So, we shouldn't accept that slot on machine 6
-- we should wait a bit to see if we can get a slot on 1, 2, 3, 4, or 5.
The solution I proposed (in a long PR comment) for the other JIRA is: if the
task set is using fewer than the number of slots it could be using (where “#
slots it could be using” is all of the slots in the cluster if the job is
running alone, or the job’s fair share, if it’s not) for some period of time,
increase the locality level. The problem with that solution is that I thought
it was completely impractical to determine the number of slots a TSM "should"
be allowed to use.
However, after thinking about this more today, I think we might be able to do
this in a practical way:
- First, I thought that we could use information about when offers are rejected
to determine this (e.g., if you've been rejecting offers for a while, then
you're not using your fair share). But the problem here is that it's not easy
to determine when you *are* using your fair / allowed share: accepting a single
offer doesn't necessarily mean that you're now using the allowed share. This
is precisely the problem with the current approach, hence this JIRA.
- v1: one possible proxy for this is if there are slots that are currently
available that haven't been accepted by any job. The TaskSchedulerImpl could
feasibly pass this information to each TaskSetManager, and the TSM could use it
to update it's delay timer: something like only reset the delay timer to 0 if
(a) the TSM accepts an offer and (b) the flag passed by the TSM indicates that
there are no other unused slots in the cluster. This fixes the problem
described in the JIRA: in that case, the flag would indicate that there *were*
other unused slots, even though a task got successfully scheduled with this
offer, so the delay timer wouldn't be reset, and would eventually correctly
expire.
- v2: The problem with v1 is that it doesn't correctly handle situations where
e.g., you have two jobs A and B with equal shares. B is "greedy" and will
accept any slot (e.g., it's a reduce stage), and A is doing delay scheduling.
In this case, A might have much less than its share, but the flag from the
TaskSchedulerImpl would indicate that there were no other free slots in the
cluster, so the delay timer wouldn't ever expire. I suspect we could handle
this (e.g., with some logic in the TaskSchedulerImpl to detect when a
particular TSM is getting starved: when it keeps rejecting offers that are
later accepted by someone else) but before thinking about this further, I
wanted to run the general idea by you to see what your thoughts are.
[1] There's a whole side question / discussion of how often this is useful for
Spark at all. It can be useful if you're running in a shared cluster where
e.g. Yarn might be assigning you more slots over time, and it's also useful
when a single Spark context is being shared across many jobs. But often for
Spark, you have one job running alone, in which case delay scheduling should
arguably be turned of altogether, as you suggested earlier Imran. But let's
separate that discussion from this one, of how to make it work better.
> Delay scheduling should not delay some executors indefinitely if one task is
> scheduled before delay timeout
> -----------------------------------------------------------------------------------------------------------
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
> Issue Type: Bug
> Components: Scheduler
> Affects Versions: 2.1.0
> Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality,
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500
> tasks. Say all tasks have a preference for one executor, which is by itself
> on one host. Given the default locality wait of 3s per level, we end up with
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks
> get scheduled on _only one_ executor. This means you're only using a 1% of
> your cluster, and you get a ~100x slowdown. You'd actually be better off if
> tasks took 7 seconds.
> *WORKAROUNDS*:
> (1) You can change the locality wait times so that it is shorter than the
> task execution time. You need to take into account the sum of all wait times
> to use all the resources on your cluster. For example, if you have resources
> on different racks, this will include the sum of
> "spark.locality.wait.process" + "spark.locality.wait.node" +
> "spark.locality.wait.rack". Those each default to "3s". The simplest way to
> be to set "spark.locality.wait.process" to your desired wait interval, and
> set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".
> For example, if your tasks take ~3 seconds on average, you might set
> "spark.locality.wait.process" to "1s". *NOTE*: due to SPARK-18967, avoid
> setting the {{spark.locality.wait=0}} -- instead, use
> {{spark.locality.wait=1ms}}.
> Note that this workaround isn't perfect --with less delay scheduling, you may
> not get as good resource locality. After this issue is fixed, you'd most
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in
> their locality preferences. Users may be able to modify their job to
> controlling the distribution of the original input data.
> (2a) A shuffle may end up with very skewed locality preferences, especially
> if you do a repartition starting from a small number of partitions. (Shuffle
> locality preference is assigned if any node has more than 20% of the shuffle
> input data -- by chance, you may have one node just above that threshold, and
> all other nodes just below it.) In this case, you can turn off locality
> preference for shuffle data by setting
> {{spark.shuffle.reduceLocality.enabled=false}}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]