zjuwangg opened a new issue, #8018: URL: https://github.com/apache/incubator-gluten/issues/8018
### Description ## Implement stage-level resourceProfile auto-adjust framework to avoid oom ### Backgroud In our production environment, we suffer a lot from gluten jobs throwing heap OOM exception occasionally. We hava digged into these problem, and there are major two kinds problem causing our jobs throwing oom: 1. Stage contains fallback operator, eg: udaf and other still not supported function or operator, which require more heap memory then configured. 2. Stage contains no fallback operator but contains a very heavy upstream exchanage. Here `heavy` means the upstream exchenage contains a huge M * N shuffle status(M means the shuffle mapper num and N means the reducer num), when this stage begins to do shuffle read, the executor side must keep the whole `mapStatuses` of the upstream shuffle status, when M * N is large, it's very likely causing heap OOM exception.  The root cause is for now in a same spark application, all stages share same task heap/offheap memory config, and when different stage requires different offheap/heap fraction, the problem appears. Since https://github.com/apache/incubator-gluten/issues/4392 has proposed a potential solution to solve this type of problem, we did some verification based on this idea. ### Design * Introduce ResourceProfile setter in `WholeStageTransformer`and `ColumnarShuffleExchangeExec` Since all underlying native computation gets triggered from WholeStageTransformer or from ColumnarShuffle, we can add ``` scala @transient private var resourceProfile: Option[ResourceProfile] = None def withResources(rp: ResourceProfile): Unit = { this.resourceProfile = Some(rp) } ``` in WholeStageTransformer, and when doCxecuteColumnar get Called and before rdd returned, set the resourceProfile for rdd. ```scala if (resourceProfile.isDefined) { logInfo(s"set resource profile ${resourceProfile.get} for child $child") rdd.withResources(resourceProfile.get) } rdd ``` * Introduce `GlutenDynamicAdjustStageRP` in `HeuristicApplier` when aqe is enabled, we can check all operator in this stage and collect all child queryStage if exist belong to this stage. After we have collected all plan nodes belong to this stage, we can know whether there exists fallback or not, also we can calculate the shuffle status complexity to roughly estimate mapStatus memory occupation. The rule works in follwing steps ``` 1. Collect all plan nodes belong to this stage. 2. Analyze plan nodes detail, gathing whether fallback exists and whether exist child queryStage. 3. Generate new resource profile 3.1 Get the default resource profile from the sparkContext.resourceProfileManager and initializes task and executor resource requests based on the default profile. 3.2 Adjusting Memory/Offheap Request 4. Handle Different Scenarios for Resource Profile Adjustment Scenario 1: Fallback Exists: If both existsC2RorR2C and existsGlutenOperator are true, tries to apply the new resource profile to the detailed plans. Scenario 2: Shuffle Status Consideration: It filters the detailed plans to get only the ShuffleQueryStageExec instances. If there are any, it calculates the complexity of the stage shuffle status based on the number of mappers and reducers in each ShuffleQueryStageExec. If the calculated complexity meets or exceeds a threshold from the glutenConfig, then applies the new resource profile to the detailed plans. 5. Apply new resource profile if needed ``` We have completed a poc of this design and really sovled these two types oom problem, and we are refactoring code and plan to contribute to community. ### Requirements * Aqe must be enabeld * Meets the stage level resource conditions * executor dynamic allocation is enabled, `spark.dynamicAllocation.enabled` must be true * Underlying resource schduler must support dynamic allocate executor ### Potential Other Benifit 1. Provided a new way to specify other resources eg. gpu for stage 2. External tuning systems can intervene through this way. ### FAQ 1. what if a stage exists multiple WholeStageTransformer and will the multiple resource profile conflict each other? Multiple resource profile can be merged through spark's mechnism. 2. What if one stage get totally fallback which means there no chance to set ResourceProfile for this stage? Potential solution: a) Wrap the whole fallbacked plan with a WrapperNode with interface and abillity to set ResourceProfile; b) Set default resource profile suitable for whole-stage-fallback stage and no need to set plan for this stage. 3. other question? We‘d love to here more thoughts and receive more comments about this idea! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
