[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292855#comment-16292855
 ] 

Thomas Graves commented on SPARK-22683:
---------------------------------------

Thanks for the clarification, a few of those I misread and was thinking cores 
per executor vs the tasks per slot.  I think this proposal makes more sense to 
me now.  

| I don't think asking upfront vs exponential has any effect over how Yarn 
yields containers.

This is not necessarily true.  You have time between heartbeats and depends on 
how much capacity the cluster has and how often nodemanager heartbeat. yarn 
schedules on nodemanager heartbeats and it schedules based on you asks. You ask 
once, then you have to check again to see if it allocated them.   If I ask for 
all upfront and yarn can give them to me before I heartbeat again, I can launch 
them all upfront.  If I do exponential I ask for 1, wait 5+ seconds, check and 
ask for more, wait, etc.  spark ramps up asks quickly but you have wasted some 
time and if your tasks are really small and your launch time is quick it can 
make a difference.  You can see more skew you referenced in a bunch of tasks 
could have run on few executors while waiting for the ramp up.   You can 
mitigate this by asking more often and it can be affected by the overall 
cluster utilization.  In my experience I haven't seen the exponential ramp up 
help much so don't see a reason to not just ask for it all up front.  But this 
is the use cases I see and at the time it was written I believe they did see it 
help some of their jobs. this goes back to the above where this works well for 
your workload the question is how this applies in general to others.

Thinking about this more really this could be orthogonal to the allocation 
policy (SPARK-16158) though since its really more about dynamic configuration 
of the max # of executors.  This could apply to different allocation policies 
(existing exponential or like the all up front I mentioned).   There is a jira 
out there to add the ability to limit # of concurrent tasks for a stage/job 
which is in line with this but the intention is different (limit DOS attacks 
from spark to other services like HBASE) and it would be harder to configure 
that to solve this problem.

[~srowen] I know you had some reservations were all your concerns addressed?  I 
can see where the existing configs don't cover this sufficiently or at least 
not ideally.  You can set max, but that would be for entire job, different 
stages could have greatly different # of tasks.   I can also see 
schedulerBacklogTimeout not being optimal for this as it adversely could affect 
run time and the experiments here seem to confirm that.

The thing I'm thinking about is what the config is exactly so it makes sense to 
the user.  tasksPerSlot to me could be mis-interpreted as limiting # of tasks 
run on each slot and we don't really use slot anywhere else.  I am thinking 
more along the lines of 
spark.dynamicAllocation.dynamicMaxExecutorsPerStagePercent=[max # of executors 
based on percent of # of tasks required for that stage] . I need to think about 
this some more though.

I think we would also need to define its interaction with 
spark.dynamicAllocation.maxExecutors as well as how it works as # of running/to 
be run tasks changes.  For instance I assume the # of executors here doesn't 
change until the # of running/to be run tasks goes below this, so this really 
just applies to the initial max executors.








> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-22683
>                 URL: https://issues.apache.org/jira/browse/SPARK-22683
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 2.1.0, 2.2.0
>            Reporter: Julien Cuquemelle
>              Labels: pull-request-available
>
> While migrating a series of jobs from MR to Spark using dynamicAllocation, 
> I've noticed almost a doubling (+114% exactly) of resource consumption of 
> Spark w.r.t MR, for a wall clock time gain of 43%
> About the context: 
> - resource usage stands for vcore-hours allocation for the whole job, as seen 
> by YARN
> - I'm talking about a series of jobs because we provide our users with a way 
> to define experiments (via UI / DSL) that automatically get translated to 
> Spark / MR jobs and submitted on the cluster
> - we submit around 500 of such jobs each day
> - these jobs are usually one shot, and the amount of processing can vary a 
> lot between jobs, and as such finding an efficient number of executors for 
> each job is difficult to get right, which is the reason I took the path of 
> dynamic allocation.  
> - Some of the tests have been scheduled on an idle queue, some on a full 
> queue.
> - experiments have been conducted with spark.executor-cores = 5 and 10, only 
> results for 5 cores have been reported because efficiency was overall better 
> than with 10 cores
> - the figures I give are averaged over a representative sample of those jobs 
> (about 600 jobs) ranging from tens to thousands splits in the data 
> partitioning and between 400 to 9000 seconds of wall clock time.
> - executor idle timeout is set to 30s;
>  
> Definition: 
> - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, 
> which represent the max number of tasks an executor will process in parallel.
> - the current behaviour of the dynamic allocation is to allocate enough 
> containers to have one taskSlot per task, which minimizes latency, but wastes 
> resources when tasks are small regarding executor allocation and idling 
> overhead. 
> The results using the proposal (described below) over the job sample (600 
> jobs):
> - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in 
> resource usage, for a 37% (against 43%) reduction in wall clock time for 
> Spark w.r.t MR
> - by trying to minimize the average resource consumption, I ended up with 6 
> tasks per core, with a 30% resource usage reduction, for a similar wall clock 
> time w.r.t. MR
> What did I try to solve the issue with existing parameters (summing up a few 
> points mentioned in the comments) ?
> - change dynamicAllocation.maxExecutors: this would need to be adapted for 
> each job (tens to thousands splits can occur), and essentially remove the 
> interest of using the dynamic allocation.
> - use dynamicAllocation.backlogTimeout: 
>     - setting this parameter right to avoid creating unused executors is very 
> dependant on wall clock time. One basically needs to solve the exponential 
> ramp up for the target time. So this is not an option for my use case where I 
> don't want a per-job tuning. 
>     - I've still done a series of experiments, details in the comments. 
> Result is that after manual tuning, the best I could get was a similar 
> resource consumption at the expense of 20% more wall clock time, or a similar 
> wall clock time at the expense of 60% more resource consumption than what I 
> got using my proposal @ 6 tasks per slot (this value being optimized over a 
> much larger range of jobs as already stated)
>     - as mentioned in another comment, tampering with the exponential ramp up 
> might yield task imbalance and such old executors could become contention 
> points for other exes trying to remotely access blocks in the old exes (not 
> witnessed in the jobs I'm talking about, but we did see this behavior in 
> other jobs)
> Proposal: 
> Simply add a tasksPerExecutorSlot parameter, which makes it possible to 
> specify how many tasks a single taskSlot should ideally execute to mitigate 
> the overhead of executor allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to