[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15752451#comment-15752451
 ] 

Imran Rashid commented on SPARK-18886:
--------------------------------------

[~markhamstra] I'm not really sure yet, I can't really decide exactly what the 
right behavior should be.  My initial thought was that the TSM should track the 
last time that anything has been scheduled at *each* locality level, rather 
than just one overall {{lastLaunchTime}}, so that it realizes that some 
resources have been waiting a long time at a higher locality level.  But I 
wasn't exactly sure what should happen after you schedule one task at a higher 
locality level.  Do you reset the timer?  Or do you just keep scheduling at the 
locality level for the rest of the task set?  If you reset the timer, than you 
will only schedule one task on the other resources, before adding another 
locality delay, so that doesn't work.  But if you keep scheduling at that new 
locality level, then you've thrown away delay scheduling for the rest of the 
task set.  Is that OK?

To put that last question a different way -- what is the point of delay 
scheduling anyway?  What are we hoping will happen in that delay window?
1) the tasks run so fast that the preferred localities make it through all of 
the tasks in the entire task set before the delay is up
2) other tasksets are concurrently submitted with different locality 
preferences, so we can submit those tasks to the remaining executors (rather 
than sitting idle)
3) new resources will be spun up with the desired locality preferences.  (Eg., 
we've requested a bunch of resources from dynamic allocation, and the resources 
which have become available so far don't have the preferred localities, but 
more are still getting spun up.)

Under all the scenarios I can think of, you might as well turn off delay 
scheduling for the rest of the taskset.  But to be honest I don't feel like 
I've got good justification for delay scheduling in the first place, so I feel 
like I may be missing something.


Also I did a bit more digging into the case I had, and its happening because of 
a repartition from a small number (~10) of partitions to a much larger one.  
The shuffle map stage ends up running two tasks on one host, and with a very 
small amount of skew, it turns out that one host has > 20% of the shuffle 
output, while none of the other hosts do.   I feel like there is also something 
else we can do here to improve the shuffle locality preferences, but I don't 
have any concrete ideas on what that improvement should be.

> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-18886
>                 URL: https://issues.apache.org/jira/browse/SPARK-18886
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 2.1.0
>            Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.process" to your desired wait interval, and set 
> both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  For 
> example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.
> (2a) A shuffle may end up with very skewed locality preferences, especially 
> if you do a repartition starting from a small number of partitions.  (Shuffle 
> locality preference is assigned if any node has more than 20% of the shuffle 
> input data -- by chance, you may have one node just above that threshold, and 
> all other nodes just below it.)  In this case, you can turn off locality 
> preference for shuffle data by setting 
> {{spark.shuffle.reduceLocality.enabled=false}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to