jinchengchenghh opened a new issue, #11808:
URL: https://github.com/apache/gluten/issues/11808

   ### Description
   
   Background
   
   In Spark, adaptive query execution (AQE) can adjust join strategies at stage 
boundaries. However, when a filter on the build side of a join has high 
selectivity, the static cost model often fails to predict the true size of the 
filtered build side. Consequently, the join strategy (e.g., SortMergeJoin vs. 
BroadcastHashJoin) is fixed before the filter is executed, even though the 
actual data size after filtering could be small enough to benefit from 
broadcast.
   
   This limitation is particularly relevant in Gluten because the actual 
filtering happens inside Velox, which is opaque to Spark’s optimizer. Spark 
cannot see the real data volume after filter pushdown, and AQE cannot 
re‑optimize the join once the build side has been processed.
   
   Goal
   
   Introduce a mechanism in Gluten that allows dynamic join strategy selection 
based on actual build‑side statistics collected after filter execution, rather 
than relying on static estimates.
   
   Proposed Approach
   
   The idea is to add a partial‑execution and plan‑rewriting step inside Gluten:
   
   1. Identify candidate joins
         During ColumnarRule processing, detect joins where the build side 
contains selective filters that may significantly reduce data volume.
   2. Execute the build‑side pipeline first
         Generate and execute the Velox plan for the build side (including 
filters) independently. Collect real‑time statistics:
      · numRows
      · numBytes
      · (Optional) cardinality distribution
   3. Dynamically decide join strategy
         In Gluten, compare the collected numRows with[ 
spark.sql.autoBroadcastJoinThreshold:](https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxcheckurl?requrl=http%3A%2F%2Fspark.sql.autoBroadcastJoinThreshold%3A&skey=%40crypt_7b8362fa_83d36f1f1973025616e51c61c016bf2f&deviceid=e675102796084738&pass_ticket=A28D%252F5TROKlAl5JrwjcHB%252BZziqPWm1uD0gwDwNa7vh38atSuYPk48yTeuSmDp8lhs%252B2T%252FdkiV4m8q1%252Fg6kUK0w%253D%253D&opcode=2&scene=1&username=@b48d72d782e796395bb6a83552a90640)
      · If below threshold → rewrite the join as BroadcastHashJoin
      · Else → keep the original strategy (e.g., SortMergeJoin / 
ShuffledHashJoin)
   4. Continue execution
         Generate the new plan for the remaining operators (join + probe side) 
and execute it via Gluten/Velox.
   
   Benefits
   
   · Better runtime adaptability: Join strategy is decided after the actual 
filter effect is known, not based on stale or inaccurate statistics.
   · Improved performance: High‑selectivity filters can enable broadcast joins 
that would otherwise be missed.
   · Leverages Velox execution transparency: Gluten can collect precise metrics 
from Velox without exposing low‑level details to Spark.
   · Remains within Gluten’s scope: This enhancement fits Gluten’s role as a 
plan‑translation and execution‑bridge layer; no changes to Spark core or Velox 
are required.
   
   Example Scenario
   
   ```sql
   SELECT * 
   FROM orders o 
   JOIN customers c ON o.cust_id = c.id
   WHERE c.is_vip = true
   ```
   
   · customers table has 10 million rows, but is_vip=true yields only 10 
thousand rows.
   · Static estimation may choose SortMergeJoin (assuming the whole table is 
large).
   · After the filter is executed in Velox, Gluten sees the real row count 
(10k) and rewrites the join to BroadcastHashJoin, significantly improving 
performance.
   
   Implementation Considerations
   
   · Partial execution overhead
       The build side may still be large in some cases. The optimization should 
only be triggered when the filter has high estimated selectivity or when 
existing statistics are known to be inaccurate.
   · Caching / re‑use
       The result of the build‑side execution can be cached and reused in the 
final plan to avoid double‑scanning.
   · Compatibility with existing AQE
       This approach does not conflict with Spark’s AQE; it fills a gap where 
AQE cannot see into the Velox‑executed filter. The rewritten plan can still be 
further optimized by AQE if needed.
   · Graceful fallback
       If partial execution fails or statistics collection is expensive, the 
original plan remains unchanged.
   
   Next Steps
   
   I’m interested in community feedback on this direction and would like to 
start with a proof‑of‑concept implementation. Areas where input would be 
especially helpful:
   
   · Best place to hook into Gluten’s existing planning/execution pipeline for 
partial execution
   · How to efficiently collect statistics from Velox without materializing the 
entire build side
   · Handling different join types and build‑side complexity
   
   Any thoughts or suggestions are welcome.
   
   ### Gluten version
   
   None


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to