zjuwangg opened a new issue, #8018:
URL: https://github.com/apache/incubator-gluten/issues/8018

   ### Description
   
   ## Implement stage-level resourceProfile auto-adjust framework to avoid oom
   
   ### Backgroud
   In our production environment, we suffer a lot from gluten jobs throwing 
heap OOM exception occasionally.     
   We hava digged into these problem, and there are major two kinds problem 
causing our jobs throwing oom:    
   1. Stage contains fallback operator, eg: udaf and other still not supported 
function or operator, which require more heap memory then configured.
   2. Stage contains no fallback operator but contains a very heavy upstream 
exchanage. Here `heavy` means the upstream exchenage contains a huge M * N 
shuffle status(M means the shuffle mapper num and N means the reducer num), 
when this stage begins to do shuffle read, the executor side must keep the 
whole `mapStatuses` of the upstream shuffle status, when M * N is large, it's 
very likely causing heap OOM exception.
   
![image](https://github.com/user-attachments/assets/8c795e7c-c192-48a4-bea0-8dcb705b76d6)
   
   The root cause is for now in a same spark application, all stages share same 
task heap/offheap memory config, and when different stage requires different 
offheap/heap fraction, the problem appears. Since 
https://github.com/apache/incubator-gluten/issues/4392 has proposed a potential 
solution to solve this type of problem, we did some verification based on this 
idea.
   
   ### Design
   * Introduce ResourceProfile setter in `WholeStageTransformer`and 
`ColumnarShuffleExchangeExec`    
       Since all underlying native computation gets triggered from 
WholeStageTransformer or from ColumnarShuffle, we can add 
   
   ``` scala
     @transient private var resourceProfile: Option[ResourceProfile] = None
   
     def withResources(rp: ResourceProfile): Unit = {
       this.resourceProfile = Some(rp)
     }
   ```
   in WholeStageTransformer, and when doCxecuteColumnar get Called and before 
rdd returned, set the resourceProfile for rdd.
   ```scala 
       if (resourceProfile.isDefined) {
         logInfo(s"set resource profile ${resourceProfile.get} for child 
$child")
         rdd.withResources(resourceProfile.get)
       }
       rdd
   ```
   
   * Introduce `GlutenDynamicAdjustStageRP` in `HeuristicApplier`       
       when aqe is enabled, we can check all operator in this stage and collect 
all child queryStage if exist belong to this stage.      
       After we have collected all plan nodes belong to this stage, we can know 
whether there exists fallback or not, also we can calculate the shuffle status 
complexity to roughly estimate mapStatus memory occupation. The rule works in 
follwing steps
   ```
   1. Collect all plan nodes belong to this stage.
   
   2. Analyze plan nodes detail, gathing whether fallback exists and whether 
exist child queryStage.
   
   3. Generate new resource profile
        3.1 Get the default resource profile from the 
sparkContext.resourceProfileManager and initializes task and executor resource 
requests based on the default profile.
       3.2 Adjusting Memory/Offheap Request
   
   4. Handle Different Scenarios for Resource Profile Adjustment
   
   Scenario 1: Fallback Exists: If both existsC2RorR2C and existsGlutenOperator 
are true, tries to apply the new resource profile to the detailed plans. 
   
   Scenario 2: Shuffle Status Consideration: It filters the detailed plans to 
get only the ShuffleQueryStageExec instances. If there are any, it calculates 
the complexity of the stage shuffle status based on the number of mappers and 
reducers in each ShuffleQueryStageExec. If the calculated complexity meets or 
exceeds a threshold from the glutenConfig, then applies the new resource 
profile to the detailed plans.
   
   5. Apply new resource profile if needed
   ```
   
   We have completed a poc of this design and really sovled these two types oom 
problem, and we are  refactoring code and plan to contribute to community.
   
   
   ### Requirements
   * Aqe must be enabeld
   * Meets the stage level resource conditions
       * executor dynamic allocation is enabled, 
`spark.dynamicAllocation.enabled` must be true
       * Underlying resource schduler must support dynamic allocate executor
   
   ### Potential Other Benifit
   1. Provided a new way to specify other resources eg. gpu for stage
   2. External tuning systems can intervene through this way.
   
   ### FAQ
   1. what if a stage exists multiple WholeStageTransformer and will the 
multiple resource profile conflict each other?
   
   Multiple resource profile can be merged through spark's mechnism.
   
   2. What if one stage get totally fallback which means there no chance to set 
ResourceProfile for this stage?
   
     Potential solution: a) Wrap the whole fallbacked plan with a WrapperNode 
with interface and abillity to set ResourceProfile; b) Set default resource 
profile suitable for whole-stage-fallback stage and no need to set plan for 
this stage.
   
   3. other question?
   
   
   
   We‘d love to here more thoughts and receive more comments about this idea!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to