jinchengchenghh opened a new issue, #11808:
URL: https://github.com/apache/gluten/issues/11808
### Description
Background
In Spark, adaptive query execution (AQE) can adjust join strategies at stage
boundaries. However, when a filter on the build side of a join has high
selectivity, the static cost model often fails to predict the true size of the
filtered build side. Consequently, the join strategy (e.g., SortMergeJoin vs.
BroadcastHashJoin) is fixed before the filter is executed, even though the
actual data size after filtering could be small enough to benefit from
broadcast.
This limitation is particularly relevant in Gluten because the actual
filtering happens inside Velox, which is opaque to Spark’s optimizer. Spark
cannot see the real data volume after filter pushdown, and AQE cannot
re‑optimize the join once the build side has been processed.
Goal
Introduce a mechanism in Gluten that allows dynamic join strategy selection
based on actual build‑side statistics collected after filter execution, rather
than relying on static estimates.
Proposed Approach
The idea is to add a partial‑execution and plan‑rewriting step inside Gluten:
1. Identify candidate joins
During ColumnarRule processing, detect joins where the build side
contains selective filters that may significantly reduce data volume.
2. Execute the build‑side pipeline first
Generate and execute the Velox plan for the build side (including
filters) independently. Collect real‑time statistics:
· numRows
· numBytes
· (Optional) cardinality distribution
3. Dynamically decide join strategy
In Gluten, compare the collected numRows with[
spark.sql.autoBroadcastJoinThreshold:](https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxcheckurl?requrl=http%3A%2F%2Fspark.sql.autoBroadcastJoinThreshold%3A&skey=%40crypt_7b8362fa_83d36f1f1973025616e51c61c016bf2f&deviceid=e675102796084738&pass_ticket=A28D%252F5TROKlAl5JrwjcHB%252BZziqPWm1uD0gwDwNa7vh38atSuYPk48yTeuSmDp8lhs%252B2T%252FdkiV4m8q1%252Fg6kUK0w%253D%253D&opcode=2&scene=1&username=@b48d72d782e796395bb6a83552a90640)
· If below threshold → rewrite the join as BroadcastHashJoin
· Else → keep the original strategy (e.g., SortMergeJoin /
ShuffledHashJoin)
4. Continue execution
Generate the new plan for the remaining operators (join + probe side)
and execute it via Gluten/Velox.
Benefits
· Better runtime adaptability: Join strategy is decided after the actual
filter effect is known, not based on stale or inaccurate statistics.
· Improved performance: High‑selectivity filters can enable broadcast joins
that would otherwise be missed.
· Leverages Velox execution transparency: Gluten can collect precise metrics
from Velox without exposing low‑level details to Spark.
· Remains within Gluten’s scope: This enhancement fits Gluten’s role as a
plan‑translation and execution‑bridge layer; no changes to Spark core or Velox
are required.
Example Scenario
```sql
SELECT *
FROM orders o
JOIN customers c ON o.cust_id = c.id
WHERE c.is_vip = true
```
· customers table has 10 million rows, but is_vip=true yields only 10
thousand rows.
· Static estimation may choose SortMergeJoin (assuming the whole table is
large).
· After the filter is executed in Velox, Gluten sees the real row count
(10k) and rewrites the join to BroadcastHashJoin, significantly improving
performance.
Implementation Considerations
· Partial execution overhead
The build side may still be large in some cases. The optimization should
only be triggered when the filter has high estimated selectivity or when
existing statistics are known to be inaccurate.
· Caching / re‑use
The result of the build‑side execution can be cached and reused in the
final plan to avoid double‑scanning.
· Compatibility with existing AQE
This approach does not conflict with Spark’s AQE; it fills a gap where
AQE cannot see into the Velox‑executed filter. The rewritten plan can still be
further optimized by AQE if needed.
· Graceful fallback
If partial execution fails or statistics collection is expensive, the
original plan remains unchanged.
Next Steps
I’m interested in community feedback on this direction and would like to
start with a proof‑of‑concept implementation. Areas where input would be
especially helpful:
· Best place to hook into Gluten’s existing planning/execution pipeline for
partial execution
· How to efficiently collect statistics from Velox without materializing the
entire build side
· Handling different join types and build‑side complexity
Any thoughts or suggestions are welcome.
### Gluten version
None
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]