This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 48cc4c8af3 Optimize planning for projected nested union (#18713)
48cc4c8af3 is described below
commit 48cc4c8af3a5ad500a44c8625ea44d6b4827af1e
Author: Vedic Chawla <[email protected]>
AuthorDate: Tue Nov 25 19:39:26 2025 +0530
Optimize planning for projected nested union (#18713)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- part of #17261.
## Rationale for this change
as explained in
https://github.com/apache/datafusion/issues/17261#issuecomment-3529432027
and discussion in the above mentioned issue
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
## What changes are included in this PR?
Projected Nested Union are now merged with their parent union.
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
## Are these changes tested?
Tested it by Github CI. Works for the scenario discussed in the above
mentioned issue.
```
> explain analyze (SELECT c0, 1 as c1, 2 as c2, 3 as c3, 4 as c4, 5 as c5,
6 as c6, 7 as c7, 8 as c8, 9 as c9 FROM t1 ORDER BY c0)
UNION ALL
(SELECT 0 as c0, c1, 2 as c2, 3 as c3, 4 as c4, 5 as c5, 6 as c6, 7 as c7,
8 as c8, 9 as c9 FROM t1 ORDER BY c1)
UNION ALL
(SELECT 0 as c0, 1 as c1, c2, 3 as c3, 4 as c4, 5 as c5, 6 as c6, 7 as c7,
8 as c8, 9 as c9 FROM t1 ORDER BY c2)
UNION ALL
(SELECT 0 as c0, 1 as c1, 2 as c2, c3, 4 as c4, 5 as c5, 6 as c6, 7 as c7,
8 as c8, 9 as c9 FROM t1 ORDER BY c3)
UNION ALL
(SELECT 0 as c0, 1 as c1, 2 as c2, 3 as c3, c4, 5 as c5, 6 as c6, 7 as c7,
8 as c8, 9 as c9 FROM t1 ORDER BY c4)
UNION ALL
(SELECT 0 as c0, 1 as c1, 2 as c2, 3 as c3, 4 as c4, c5, 6 as c6, 7 as c7,
8 as c8, 9 as c9 FROM t1 ORDER BY c5)
UNION ALL
(SELECT 0 as c0, 1 as c1, 2 as c2, 3 as c3, 4 as c4, 5 as c5, c6, 7 as c7,
8 as c8, 9 as c9 FROM t1 ORDER BY c6)
UNION ALL
(SELECT 0 as c0, 1 as c1, 2 as c2, 3 as c3, 4 as c4, 5 as c5, 6 as c6, c7,
8 as c8, 9 as c9 FROM t1 ORDER BY c7)
UNION ALL
(SELECT 0 as c0, 1 as c1, 2 as c2, 3 as c3, 4 as c4, 5 as c5, 6 as c6, 7 as
c7, c8, 9 as c9 FROM t1 ORDER BY c8)
UNION ALL
(SELECT 0 as c0, 1 as c1, 2 as c2, 3 as c3, 4 as c4, 5 as c5, 6 as c6, 7 as
c7, 8 as c8, c9 FROM t1 ORDER BY c9)
ORDER BY c0, c1, c2, c3, c4, c5, c6, c7, c8, c9;
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
| plan_type | plan
[...]
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
| Plan with Metrics | SortPreservingMergeExec: [c0@0 ASC NULLS LAST, c1@1
ASC NULLS LAST, c2@2 ASC NULLS LAST, c3@3 ASC NULLS LAST, c4@4 ASC NULLS LAST,
c5@5 ASC NULLS LAST, c6@6 ASC NULLS LAST, c7@7 ASC NULLS LAST, c8@8 ASC NULLS
LAST, c9@9 ASC NULLS LAST], metrics=[output_rows=0, elapsed_compute=3.817µs,
output_bytes=0.0 B, output_batches=0]
[...]
| | UnionExec, metrics=[output_rows=0,
elapsed_compute=345.775µs, output_bytes=0.0 B, output_batches=0]
[...]
| | SortExec: expr=[c0@0 ASC NULLS LAST],
preserve_partitioning=[false], metrics=[output_rows=0, elapsed_compute=1ns,
output_bytes=0.0 B, output_batches=0, spill_count=0, spilled_bytes=0.0 B,
spilled_rows=0, batches_split=0]
[...]
| | ProjectionExec: expr=[CAST(c0@0 AS
Decimal128(20, 0)) as c0, CAST(1 AS Decimal128(20, 0)) as c1, CAST(2 AS
Decimal128(20, 0)) as c2, CAST(3 AS Decimal128(20, 0)) as c3, CAST(4 AS
Decimal128(20, 0)) as c4, CAST(5 AS Decimal128(20, 0)) as c5, CAST(6 AS
Decimal128(20, 0)) as c6, CAST(7 AS Decimal128(20, 0)) as c7, CAST(8 AS
Decimal128(20, 0)) as c8, CAST(9 AS Decimal128(20, 0)) as c9],
metrics=[output_rows=0, elapsed_compute=1ns, output_bytes=0.0 B, output_bat
[...]
| | DataSourceExec: partitions=1,
partition_sizes=[0], metrics=[]
[...]
| | SortExec: expr=[c1@1 ASC NULLS LAST],
preserve_partitioning=[false], metrics=[output_rows=0, elapsed_compute=1ns,
output_bytes=0.0 B, output_batches=0, spill_count=0, spilled_bytes=0.0 B,
spilled_rows=0, batches_split=0]
[...]
| | ProjectionExec: expr=[CAST(0 AS Decimal128(20,
0)) as c0, CAST(c1@0 AS Decimal128(20, 0)) as c1, CAST(2 AS Decimal128(20, 0))
as c2, CAST(3 AS Decimal128(20, 0)) as c3, CAST(4 AS Decimal128(20, 0)) as c4,
CAST(5 AS Decimal128(20, 0)) as c5, CAST(6 AS Decimal128(20, 0)) as c6, CAST(7
AS Decimal128(20, 0)) as c7, CAST(8 AS Decimal128(20, 0)) as c8, CAST(9 AS
Decimal128(20, 0)) as c9], metrics=[output_rows=0, elapsed_compute=1ns,
output_bytes=0.0 B, output_bat [...]
| | DataSourceExec: partitions=1,
partition_sizes=[0], metrics=[]
[...]
| | SortExec: expr=[c2@2 ASC NULLS LAST],
preserve_partitioning=[false], metrics=[output_rows=0, elapsed_compute=1ns,
output_bytes=0.0 B, output_batches=0, spill_count=0, spilled_bytes=0.0 B,
spilled_rows=0, batches_split=0]
[...]
| | ProjectionExec: expr=[CAST(0 AS Decimal128(20,
0)) as c0, CAST(1 AS Decimal128(20, 0)) as c1, CAST(c2@0 AS Decimal128(20, 0))
as c2, CAST(3 AS Decimal128(20, 0)) as c3, CAST(4 AS Decimal128(20, 0)) as c4,
CAST(5 AS Decimal128(20, 0)) as c5, CAST(6 AS Decimal128(20, 0)) as c6, CAST(7
AS Decimal128(20, 0)) as c7, CAST(8 AS Decimal128(20, 0)) as c8, CAST(9 AS
Decimal128(20, 0)) as c9], metrics=[output_rows=0, elapsed_compute=1ns,
output_bytes=0.0 B, output_bat [...]
| | DataSourceExec: partitions=1,
partition_sizes=[0], metrics=[]
[...]
| | SortExec: expr=[c3@3 ASC NULLS LAST],
preserve_partitioning=[false], metrics=[output_rows=0, elapsed_compute=1ns,
output_bytes=0.0 B, output_batches=0, spill_count=0, spilled_bytes=0.0 B,
spilled_rows=0, batches_split=0]
[...]
| | ProjectionExec: expr=[CAST(0 AS Decimal128(20,
0)) as c0, CAST(1 AS Decimal128(20, 0)) as c1, CAST(2 AS Decimal128(20, 0)) as
c2, CAST(c3@0 AS Decimal128(20, 0)) as c3, CAST(4 AS Decimal128(20, 0)) as c4,
CAST(5 AS Decimal128(20, 0)) as c5, CAST(6 AS Decimal128(20, 0)) as c6, CAST(7
AS Decimal128(20, 0)) as c7, CAST(8 AS Decimal128(20, 0)) as c8, CAST(9 AS
Decimal128(20, 0)) as c9], metrics=[output_rows=0, elapsed_compute=1ns,
output_bytes=0.0 B, output_bat [...]
| | DataSourceExec: partitions=1,
partition_sizes=[0], metrics=[]
[...]
| | SortExec: expr=[c4@4 ASC NULLS LAST],
preserve_partitioning=[false], metrics=[output_rows=0, elapsed_compute=1ns,
output_bytes=0.0 B, output_batches=0, spill_count=0, spilled_bytes=0.0 B,
spilled_rows=0, batches_split=0]
[...]
| | ProjectionExec: expr=[CAST(0 AS Decimal128(20,
0)) as c0, CAST(1 AS Decimal128(20, 0)) as c1, CAST(2 AS Decimal128(20, 0)) as
c2, CAST(3 AS Decimal128(20, 0)) as c3, CAST(c4@0 AS Decimal128(20, 0)) as c4,
CAST(5 AS Decimal128(20, 0)) as c5, CAST(6 AS Decimal128(20, 0)) as c6, CAST(7
AS Decimal128(20, 0)) as c7, CAST(8 AS Decimal128(20, 0)) as c8, CAST(9 AS
Decimal128(20, 0)) as c9], metrics=[output_rows=0, elapsed_compute=1ns,
output_bytes=0.0 B, output_bat [...]
| | DataSourceExec: partitions=1,
partition_sizes=[0], metrics=[]
[...]
| | SortExec: expr=[c5@5 ASC NULLS LAST],
preserve_partitioning=[false], metrics=[output_rows=0, elapsed_compute=1ns,
output_bytes=0.0 B, output_batches=0, spill_count=0, spilled_bytes=0.0 B,
spilled_rows=0, batches_split=0]
[...]
| | ProjectionExec: expr=[CAST(0 AS Decimal128(20,
0)) as c0, CAST(1 AS Decimal128(20, 0)) as c1, CAST(2 AS Decimal128(20, 0)) as
c2, CAST(3 AS Decimal128(20, 0)) as c3, CAST(4 AS Decimal128(20, 0)) as c4,
CAST(c5@0 AS Decimal128(20, 0)) as c5, CAST(6 AS Decimal128(20, 0)) as c6,
CAST(7 AS Decimal128(20, 0)) as c7, CAST(8 AS Decimal128(20, 0)) as c8, CAST(9
AS Decimal128(20, 0)) as c9], metrics=[output_rows=0, elapsed_compute=1ns,
output_bytes=0.0 B, output_bat [...]
| | DataSourceExec: partitions=1,
partition_sizes=[0], metrics=[]
[...]
| | SortExec: expr=[c6@6 ASC NULLS LAST],
preserve_partitioning=[false], metrics=[output_rows=0, elapsed_compute=1ns,
output_bytes=0.0 B, output_batches=0, spill_count=0, spilled_bytes=0.0 B,
spilled_rows=0, batches_split=0]
[...]
| | ProjectionExec: expr=[CAST(0 AS Decimal128(20,
0)) as c0, CAST(1 AS Decimal128(20, 0)) as c1, CAST(2 AS Decimal128(20, 0)) as
c2, CAST(3 AS Decimal128(20, 0)) as c3, CAST(4 AS Decimal128(20, 0)) as c4,
CAST(5 AS Decimal128(20, 0)) as c5, CAST(c6@0 AS Decimal128(20, 0)) as c6,
CAST(7 AS Decimal128(20, 0)) as c7, CAST(8 AS Decimal128(20, 0)) as c8, CAST(9
AS Decimal128(20, 0)) as c9], metrics=[output_rows=0, elapsed_compute=1ns,
output_bytes=0.0 B, output_bat [...]
| | DataSourceExec: partitions=1,
partition_sizes=[0], metrics=[]
[...]
| | SortExec: expr=[c7@7 ASC NULLS LAST],
preserve_partitioning=[false], metrics=[output_rows=0, elapsed_compute=1ns,
output_bytes=0.0 B, output_batches=0, spill_count=0, spilled_bytes=0.0 B,
spilled_rows=0, batches_split=0]
[...]
| | ProjectionExec: expr=[CAST(0 AS Decimal128(20,
0)) as c0, CAST(1 AS Decimal128(20, 0)) as c1, CAST(2 AS Decimal128(20, 0)) as
c2, CAST(3 AS Decimal128(20, 0)) as c3, CAST(4 AS Decimal128(20, 0)) as c4,
CAST(5 AS Decimal128(20, 0)) as c5, CAST(6 AS Decimal128(20, 0)) as c6,
CAST(c7@0 AS Decimal128(20, 0)) as c7, CAST(8 AS Decimal128(20, 0)) as c8,
CAST(9 AS Decimal128(20, 0)) as c9], metrics=[output_rows=0,
elapsed_compute=1ns, output_bytes=0.0 B, output_bat [...]
| | DataSourceExec: partitions=1,
partition_sizes=[0], metrics=[]
[...]
| | SortExec: expr=[c8@8 ASC NULLS LAST],
preserve_partitioning=[false], metrics=[output_rows=0, elapsed_compute=1ns,
output_bytes=0.0 B, output_batches=0, spill_count=0, spilled_bytes=0.0 B,
spilled_rows=0, batches_split=0]
[...]
| | ProjectionExec: expr=[CAST(0 AS Decimal128(20,
0)) as c0, CAST(1 AS Decimal128(20, 0)) as c1, CAST(2 AS Decimal128(20, 0)) as
c2, CAST(3 AS Decimal128(20, 0)) as c3, CAST(4 AS Decimal128(20, 0)) as c4,
CAST(5 AS Decimal128(20, 0)) as c5, CAST(6 AS Decimal128(20, 0)) as c6, CAST(7
AS Decimal128(20, 0)) as c7, CAST(c8@0 AS Decimal128(20, 0)) as c8, CAST(9 AS
Decimal128(20, 0)) as c9], metrics=[output_rows=0, elapsed_compute=1ns,
output_bytes=0.0 B, output_bat [...]
| | DataSourceExec: partitions=1,
partition_sizes=[0], metrics=[]
[...]
| | SortExec: expr=[c9@9 ASC NULLS LAST],
preserve_partitioning=[false], metrics=[output_rows=0, elapsed_compute=1ns,
output_bytes=0.0 B, output_batches=0, spill_count=0, spilled_bytes=0.0 B,
spilled_rows=0, batches_split=0]
[...]
| | ProjectionExec: expr=[CAST(0 AS Decimal128(20,
0)) as c0, CAST(1 AS Decimal128(20, 0)) as c1, CAST(2 AS Decimal128(20, 0)) as
c2, CAST(3 AS Decimal128(20, 0)) as c3, CAST(4 AS Decimal128(20, 0)) as c4,
CAST(5 AS Decimal128(20, 0)) as c5, CAST(6 AS Decimal128(20, 0)) as c6, CAST(7
AS Decimal128(20, 0)) as c7, CAST(8 AS Decimal128(20, 0)) as c8, CAST(c9@0 AS
Decimal128(20, 0)) as c9], metrics=[output_rows=0, elapsed_compute=1ns,
output_bytes=0.0 B, output_bat [...]
| | DataSourceExec: partitions=1,
partition_sizes=[0], metrics=[]
[...]
| |
[...]
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
```
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
## Are there any user-facing changes?
Yes? logical plans for some queries should be different(and better) now.
No syntax changes.
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
---
datafusion/optimizer/src/optimize_unions.rs | 55 ++++++++++++++++++++++++++++-
1 file changed, 54 insertions(+), 1 deletion(-)
diff --git a/datafusion/optimizer/src/optimize_unions.rs
b/datafusion/optimizer/src/optimize_unions.rs
index cfabd512b4..23a6fe95e5 100644
--- a/datafusion/optimizer/src/optimize_unions.rs
+++ b/datafusion/optimizer/src/optimize_unions.rs
@@ -21,7 +21,7 @@ use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::tree_node::Transformed;
use datafusion_common::Result;
use datafusion_expr::expr_rewriter::coerce_plan_expr_for_schema;
-use datafusion_expr::{Distinct, LogicalPlan, Union};
+use datafusion_expr::{Distinct, LogicalPlan, Projection, Union};
use itertools::Itertools;
use std::sync::Arc;
@@ -105,6 +105,38 @@ fn extract_plans_from_union(plan: Arc<LogicalPlan>) ->
Vec<LogicalPlan> {
.into_iter()
.map(Arc::unwrap_or_clone)
.collect::<Vec<_>>(),
+ // While unnesting, unwrap a Projection whose input is a nested Union,
+ // flatten the inner Union, and push the same Projection down onto
+ // each of the nested Union’s children.
+ //
+ // Example:
+ // Union { Projection { Union { plan1, plan2 } }, plan3 }
+ // => Union { Projection { plan1 }, Projection { plan2 }, plan3 }
+ LogicalPlan::Projection(Projection {
+ expr,
+ input,
+ schema,
+ ..
+ }) => match Arc::unwrap_or_clone(input) {
+ LogicalPlan::Union(Union { inputs, .. }) => inputs
+ .into_iter()
+ .map(Arc::unwrap_or_clone)
+ .map(|plan| {
+ LogicalPlan::Projection(
+ Projection::try_new_with_schema(
+ expr.clone(),
+ Arc::new(plan),
+ Arc::clone(&schema),
+ )
+ .unwrap(),
+ )
+ })
+ .collect::<Vec<_>>(),
+
+ plan => vec![LogicalPlan::Projection(
+ Projection::try_new_with_schema(expr, Arc::new(plan),
schema).unwrap(),
+ )],
+ },
plan => vec![plan],
}
}
@@ -331,6 +363,27 @@ mod tests {
")
}
+ #[test]
+ fn eliminate_nested_union_in_projection() -> Result<()> {
+ let plan_builder = table_scan(Some("table"), &schema(), None)?;
+
+ let plan = plan_builder
+ .clone()
+ .union(plan_builder.clone().build()?)?
+ .project(vec![col("id").alias("table_id"), col("key"),
col("value")])?
+ .union(plan_builder.build()?)?
+ .build()?;
+
+ assert_optimized_plan_equal!(plan, @r"
+ Union
+ Projection: id AS table_id, key, value
+ TableScan: table
+ Projection: id AS table_id, key, value
+ TableScan: table
+ TableScan: table
+ ")
+ }
+
#[test]
fn eliminate_nested_union_with_type_cast_projection() -> Result<()> {
let table_1 = table_scan(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]