[ 
https://issues.apache.org/jira/browse/SPARK-27495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17245951#comment-17245951
 ] 

Thomas Graves commented on SPARK-27495:
---------------------------------------

the main functionality is all in, it probably makes more sense to resolve this 
in 3.1.0 and split the others off into followups, if not objections, I'll split 
it.

> SPIP: Support Stage level resource configuration and scheduling
> ---------------------------------------------------------------
>
>                 Key: SPARK-27495
>                 URL: https://issues.apache.org/jira/browse/SPARK-27495
>             Project: Spark
>          Issue Type: Epic
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Thomas Graves
>            Assignee: Thomas Graves
>            Priority: Major
>              Labels: SPIP
>
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
> Objectives:
>  # Allow users to specify task and executor resource requirements at the 
> stage level. 
>  # Spark will use the stage level requirements to acquire the necessary 
> resources/executors and schedule tasks based on the per stage requirements.
> Many times users have different resource requirements for different stages of 
> their application so they want to be able to configure resources at the stage 
> level. For instance, you have a single job that has 2 stages. The first stage 
> does some  ETL which requires a lot of tasks, each with a small amount of 
> memory and 1 core each. Then you have a second stage where you feed that ETL 
> data into an ML algorithm. The second stage only requires a few executors but 
> each executor needs a lot of memory, GPUs, and many cores.  This feature 
> allows the user to specify the task and executor resource requirements for 
> the ETL Stage and then change them for the ML stage of the job. 
> Resources include cpu, memory (on heap, overhead, pyspark, and off heap), and 
> extra Resources (GPU/FPGA/etc). It has the potential to allow for other 
> things like limiting the number of tasks per stage, specifying other 
> parameters for things like shuffle, etc. Initially I would propose we only 
> support resources as they are now. So Task resources would be cpu and other 
> resources (GPU, FPGA), that way we aren't adding in extra scheduling things 
> at this point.  Executor resources would be cpu, memory, and extra 
> resources(GPU,FPGA, etc). Changing the executor resources will rely on 
> dynamic allocation being enabled.
> Main use cases:
>  # ML use case where user does ETL and feeds it into an ML algorithm where 
> it’s using the RDD API. This should work with barrier scheduling as well once 
> it supports dynamic allocation.
>  # This adds the framework/api for Spark's own internal use.  In the future 
> (not covered by this SPIP), Catalyst could control the stage level resources 
> as it finds the need to change it between stages for different optimizations. 
> For instance, with the new columnar plugin to the query planner we can insert 
> stages into the plan that would change running something on the CPU in row 
> format to running it on the GPU in columnar format. This API would allow the 
> planner to make sure the stages that run on the GPU get the corresponding GPU 
> resources it needs to run. Another possible use case for catalyst is that it 
> would allow catalyst to add in more optimizations to where the user doesn’t 
> need to configure container sizes at all. If the optimizer/planner can handle 
> that for the user, everyone wins.
> This SPIP focuses on the RDD API but we don’t exclude the Dataset API. I 
> think the DataSet API will require more changes because it specifically hides 
> the RDD from the users via the plans and catalyst can optimize the plan and 
> insert things into the plan. The only way I’ve found to make this work with 
> the Dataset API would be modifying all the plans to be able to get the 
> resource requirements down into where it creates the RDDs, which I believe 
> would be a lot of change.  If other people know better options, it would be 
> great to hear them.
> *Q2.* What problem is this proposal NOT designed to solve?
> The initial implementation is not going to add Dataset APIs.
> We are starting with allowing users to specify a specific set of 
> task/executor resources and plan to design it to be extendable, but the first 
> implementation will not support changing generic SparkConf configs and only 
> specific limited resources.
> This initial version will have a programmatic API for specifying the resource 
> requirements per stage, we can add the ability to perhaps have profiles in 
> the configs later if its useful.
> *Q3.* How is it done today, and what are the limits of current practice?
> Currently this is either done by having multiple spark jobs or requesting 
> containers with the max resources needed for any part of the job.  To do this 
> today, you can break it into separate jobs where each job requests the 
> corresponding resources needed, but then you have to write the data out 
> somewhere and then read it back in between jobs.  This is going to take 
> longer as well as require that job coordination between those to make sure 
> everything works smoothly. Another option would be to request executors with 
> your largest need up front and potentially waste those resources when they 
> aren't being used, which in turn wastes money. For instance, for an ML 
> application where it does ETL first, many times people request containers 
> with GPUs and the GPUs sit idle while the ETL is happening. This is wasting 
> those GPU resources and in turn money because those GPUs could have been used 
> by other applications until they were really needed.  
> Note for the catalyst internal use, that can’t be done today.
> *Q4.* What is new in your approach and why do you think it will be successful?
> This is a new way for users to specify the per stage resource requirements.  
> This will give users and Spark a lot more flexibility within a job and get 
> better utilization of their hardware.
> *Q5.* Who cares? If you are successful, what difference will it make?
> Spark application developers, cluster admins, managers and companies who pay 
> the bills for running Spark. It has the potential to make a huge difference 
> in cost by utilizing resources better and saving developers time.
> I’ve talked to different people from different companies and all of them have 
> said this would be a useful feature for them.
> *Q6.* What are the risks?
> The scoping of the new API could cause some confusion to the user as to which 
> resources actually get used in a stage. If the user has specified different 
> resources in multiple RDDs that get combined into a single stage, the 
> scheduler will have to merge those and come up with a final container size to 
> request. We will have a specific algorithm for merging but if the user 
> doesn’t realize things get combined or that some RDD’s require shuffle, they 
> might get confused.  I don't know how to get around this other then to 
> document the way it works and try to make it obvious to the user what was 
> chosen. Another option here is to have it fail if it gets a conflict to make 
> sure the user is aware. We could have a config flag for this to have it fail 
> first and then they could allow it to by turning the config on. See the 
> design doc for options on scoping.
> The cluster managers (like YARN) and dynamic allocation manager have to track 
> everything at a ResourceProfile (specific set of resource requirements) level 
> rather than just a global cores or executors level, so it requires a bunch of 
> data structure changes to those.
> *Q7.* How long will it take?
> I suspect this will take multiple months because it’s a fairly large change.  
> I think we can do it in pieces fairly easily though. For instance, I think we 
> can do the dynamic allocation manager and scheduler, YARN cluster manager, 
> and then finally the RDD API.  We can do the backend pieces first where the 
> global resource configs apply and then once we add in the actual RDD API, it 
> will only be there and only at that point would the user actually see it.  I 
> have a rough prototype of those where I was investigating what would all need 
> to change.
> *Q8.* What are the mid-term and final “exams” to check for success?
> Success is for the user to specify the resources per stage with dynamic 
> allocation and everything to work with it. One stage would run with a set of 
> resources and when the next stage starts with different resources the first 
> stage containers are let go and new ones acquired.  The mid-term might be to 
> put in the changes for the allocation manager and cluster manager and 
> scheduler and have the normal global resource requirements continue to work 
> as expected.
> *Appendix A.* Proposed API Changes. Optional section defining APIs changes, 
> if any. Backward and forward compatibility must be taken into account.
> I split the appendices out into a google doc since it was getting big and to 
> allow inline comments, see link below
> *Appendix B.* Optional Design Sketch: How are the goals going to be 
> accomplished? Give sufficient technical detail to allow a contributor to 
> judge whether it’s likely to be feasible. Note that this is not a full design 
> document.
> I split the appendices out into a google doc since it was getting big and to 
> allow inline comments, see link below
> *Appendix C.* Optional Rejected Designs: What alternatives were considered? 
> Why were they rejected? If no alternatives have been considered, the problem 
> needs more thought.
> I split the appendices out into a google doc since it was getting big and to 
> allow inline comments, see link below



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to