andygrove opened a new issue, #1910: URL: https://github.com/apache/datafusion-ballista/issues/1910
## Is your feature request related to a problem or challenge? #1909 made uncorrelated scalar subqueries work in distributed execution by setting `datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery = false`, which makes the optimizer rewrite them to joins (the previous physical `ScalarSubqueryExec` cannot be serialized across Ballista's stage boundaries). That fix is **correct but not efficient**. An uncorrelated scalar subquery produces exactly one value, but rewriting it to a join turns it into a (cross / left) join against the subquery's single-row result. Compared to using the value as a constant, the join adds extra shuffle/build/probe work — for example TPC-H q11/q15/q22, where the subquery is a single aggregate that is then compared (`=`, `>`) against the outer rows. ## Describe the solution you'd like Execute the uncorrelated scalar subquery **first**, then substitute its value into the original plan: 1. Detect uncorrelated scalar subqueries during distributed planning. 2. Run each subquery's plan as its own job/stage(s) and collect its single scalar result (erroring if it returns more than one row, matching scalar-subquery semantics). 3. Replace the `ScalarSubquery` / `ScalarSubqueryExpr` in the main plan with the materialized value as a literal. 4. Plan and execute the (now subquery-free) main plan. This matches how the value is logically used — a constant — and avoids the join overhead entirely. It also removes the need to disable `enable_physical_uncorrelated_scalar_subquery`, since there would no longer be a `ScalarSubqueryExec` to serialize. This is essentially sequential/dependent subquery execution in the scheduler: the main query depends on the subquery's result, so the subquery's stages run to completion first and feed a constant into the dependent plan. ## Describe alternatives you've considered - **Decorrelate to a join (current behavior, #1909):** correct but adds join work for what is logically a constant. - **Keep `ScalarSubqueryExec` and serialize it across stages:** would require teaching `datafusion-proto` / the scheduler to carry the shared in-process results container across processes, which isn't possible — the value has to be materialized and transmitted regardless. ## Additional context Follow-on to #1909. Once this lands, the `enable_physical_uncorrelated_scalar_subquery = false` default set in #1909 could be revisited. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
