milenkovicm opened a new pull request, #1752:
URL: https://github.com/apache/datafusion-ballista/pull/1752

   ## Overview
   
   This commit implements **dynamic join strategy selection** in Ballista's 
Adaptive Query Execution (AQE) pipeline. Instead of choosing a join strategy at 
planning time (when data sizes are unknown), the plan now inserts a placeholder 
`DynamicJoinSelectionExec` node that defers the decision until runtime 
statistics are available.
   
   
   ```text 
   Before (planning time)
     HashJoinExec / SortMergeJoinExec
         ↑ chosen by DataFusion optimizer (no runtime stats)
   
   After (AQE flow)
     Step 1 – plan creation:
       DelayJoinSelectionRule → DynamicJoinSelectionExec  (placeholder)
   
     Step 2 – stage completes, stats available:
       SelectJoinRule → CollectLeft | LateCollectLeft | Hash | Sort | 
Repartition
                             ↑ decided using actual byte / row counts
   ```
   
   The change means Ballista can now **broadcast small tables** or **swap join 
sides** based on real data sizes observed during execution, not just estimates 
from the logical plan.
   
   Introduced planner rules consider few configuration values to decide join 
type:
   
   - `datafusion.optimizer.hash_join_single_partition_threshold_rows`, to 
select broadcast join or not
   - `datafusion.optimizer.hash_join_single_partition_threshold`, to select 
broadcast join or not
   - `datafusion.optimizer.prefer_hash_join` to select actual preferred join 
implementation
   
   with dynamic join selection some TPCH jobs have less stages created 
   
   <img width="1697" height="634" alt="Screenshot 2026-05-23 at 20 51 03" 
src="https://github.com/user-attachments/assets/bc9ff52b-3ec2-4879-a223-8ef13b99d379";
 />
   
   compared to static scheduling 
   
   <img width="1707" height="630" alt="Screenshot 2026-05-23 at 20 55 20" 
src="https://github.com/user-attachments/assets/dfb037ee-e3b7-4bfe-a077-e97832f7c4d3";
 />
   
   
   ## TODO 
   
   - [ ] configuration to disable rule `ballista.planner.adaptive.join`
   - [ ] test with `sort join`
   - [ ] broadcast join gives bad results for  TPCH 2, 13, 16
   - [ ] decision should we stick with 
`ballista.optimizer.broadcast_join_threshold_bytes` or go with datafusion 
configuration values
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to