andygrove opened a new pull request, #1912:
URL: https://github.com/apache/datafusion-ballista/pull/1912
> **Stacked on #1906** (DataFusion 54 upgrade). This branch is based on
> `upgrade/datafusion-54-fresh`, so until #1906 merges the diff here also
> includes its commits. Draft until #1906 lands; I'll rebase onto `main` for
a
> clean diff and mark it ready then. Review the last commit
> (`feat: execute uncorrelated scalar subqueries first and inline the value`)
> for the changes specific to this PR.
# Which issue does this PR close?
Closes #1910.
# Rationale for this change
#1909 made uncorrelated scalar subqueries work in distributed execution by
disabling
`datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery`,
which makes the optimizer rewrite them to joins (DataFusion 54's physical
`ScalarSubqueryExec` cannot be serialized across Ballista's stage
boundaries).
That is correct but not efficient: an uncorrelated scalar subquery produces a
single value, but rewriting it to a join turns it into a (cross / left) join
against the subquery's one-row result, adding shuffle/build/probe work for
what
is logically a constant.
This PR runs the subquery first and substitutes its value, which is both
efficient and avoids the `ScalarSubqueryExec` serialization problem entirely.
# What changes are included in this PR?
- In the client query planner (`BallistaQueryPlanner::create_physical_plan`),
before sending the plan to the scheduler:
- detect **uncorrelated** `Expr::ScalarSubquery` nodes in the optimized
logical plan (correlated subqueries are left to DataFusion's existing
decorrelation rules);
- run each subquery as its own distributed query and reduce its result to a
single `ScalarValue` (0 rows → typed null, 1 row → the value, more than
one
row → an error);
- substitute the value as a literal. Subqueries nested inside a subquery
are
materialized recursively, innermost first.
- Reverts the `enable_physical_uncorrelated_scalar_subquery = false` default
added in #1909, since the subqueries are now materialized rather than
decorrelated. This supersedes that workaround.
# Are there any user-facing changes?
No API or SQL semantics change. Uncorrelated scalar subqueries (e.g. TPC-H
q11/q15/q22) now execute on a Ballista cluster by computing the subquery
first
and inlining its value.
Tested with new unit tests for the detection/substitution helpers (including
that correlated subqueries are skipped) and a standalone + remote integration
test that runs `where id = (select max(id) from test)` end to end. `cargo
test`,
`cargo fmt`, and `cargo clippy --all-targets -- -D warnings` are clean.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]