This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new dc0c21bbc8 feat: Pushdown filters through `UnionExec` nodes (#20145)
dc0c21bbc8 is described below
commit dc0c21bbc83be6af1d041c5d1c313ba442036dc5
Author: Huaijin <[email protected]>
AuthorDate: Tue Feb 10 00:18:00 2026 +0800
feat: Pushdown filters through `UnionExec` nodes (#20145)
## Which issue does this PR close?
- Closes #20144
## Rationale for this change
see #20144
## What changes are included in this PR?
This PR impl `handle_child_pushdown_result` for `UnionExec`, for any
case, the filter will always pushdown to UnionExec
UnionExec needs specialized filter pushdown handling when children have
heterogeneous pushdown support. Without this, when some children support
pushdown and others don't, the default behavior would leave FilterExec
above UnionExec, re-applying filters to outputs of all
children—including those that already applied the filters via pushdown.
This specialized implementation adds FilterExec only to children that
don't support pushdown, avoiding redundant filtering and improving
performance.
```
Example: Given Child1 (no pushdown support) and Child2 (has pushdown
support)
Default behavior: This implementation:
FilterExec UnionExec
UnionExec FilterExec
Child1 Child1
Child2(filter) Child2(filter)
```
## Are these changes tested?
yes, add two test cases
## Are there any user-facing changes?
---
.../tests/physical_optimizer/filter_pushdown.rs | 61 +++++++++++++++
datafusion/physical-plan/src/union.rs | 87 +++++++++++++++++++++-
.../test_files/parquet_filter_pushdown.slt | 63 ++++++++++++++++
3 files changed, 209 insertions(+), 2 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
index 31a21274ad..b3ed8d9653 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
@@ -1808,6 +1808,67 @@ fn test_filter_pushdown_through_union() {
);
}
+#[test]
+fn test_filter_pushdown_through_union_mixed_support() {
+ // Test case where one child supports filter pushdown and one doesn't
+ let scan1 = TestScanBuilder::new(schema()).with_support(true).build();
+ let scan2 = TestScanBuilder::new(schema()).with_support(false).build();
+
+ let union = UnionExec::try_new(vec![scan1, scan2]).unwrap();
+
+ let predicate = col_lit_predicate("a", "foo", &schema());
+ let plan = Arc::new(FilterExec::try_new(predicate, union).unwrap());
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, FilterPushdown::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: a@0 = foo
+ - UnionExec
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=false
+ output:
+ Ok:
+ - UnionExec
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 =
foo
+ - FilterExec: a@0 = foo
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=false
+ "
+ );
+}
+
+#[test]
+fn test_filter_pushdown_through_union_does_not_support() {
+ // Test case where one child supports filter pushdown and one doesn't
+ let scan1 = TestScanBuilder::new(schema()).with_support(false).build();
+ let scan2 = TestScanBuilder::new(schema()).with_support(false).build();
+
+ let union = UnionExec::try_new(vec![scan1, scan2]).unwrap();
+
+ let predicate = col_lit_predicate("a", "foo", &schema());
+ let plan = Arc::new(FilterExec::try_new(predicate, union).unwrap());
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, FilterPushdown::new(), true),
+ @"
+ OptimizationTest:
+ input:
+ - FilterExec: a@0 = foo
+ - UnionExec
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=false
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=false
+ output:
+ Ok:
+ - UnionExec
+ - FilterExec: a@0 = foo
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=false
+ - FilterExec: a@0 = foo
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=false
+ "
+ );
+}
+
/// Schema:
/// a: String
/// b: String
diff --git a/datafusion/physical-plan/src/union.rs
b/datafusion/physical-plan/src/union.rs
index b6f943886e..4ebb8910fa 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -36,7 +36,11 @@ use crate::execution_plan::{
InvariantLevel, boundedness_from_children, check_default_invariants,
emission_type_from_children,
};
-use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
+use crate::filter::FilterExec;
+use crate::filter_pushdown::{
+ ChildPushdownResult, FilterDescription, FilterPushdownPhase,
+ FilterPushdownPropagation, PushedDown,
+};
use crate::metrics::BaselineMetrics;
use crate::projection::{ProjectionExec, make_with_child};
use crate::stream::ObservedStream;
@@ -49,7 +53,9 @@ use datafusion_common::{
Result, assert_or_internal_err, exec_err, internal_datafusion_err,
};
use datafusion_execution::TaskContext;
-use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr,
calculate_union};
+use datafusion_physical_expr::{
+ EquivalenceProperties, PhysicalExpr, calculate_union, conjunction,
+};
use futures::Stream;
use itertools::Itertools;
@@ -370,6 +376,83 @@ impl ExecutionPlan for UnionExec {
) -> Result<FilterDescription> {
FilterDescription::from_children(parent_filters, &self.children())
}
+
+ fn handle_child_pushdown_result(
+ &self,
+ phase: FilterPushdownPhase,
+ child_pushdown_result: ChildPushdownResult,
+ _config: &ConfigOptions,
+ ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
+ // Pre phase: handle heterogeneous pushdown by wrapping individual
+ // children with FilterExec and reporting all filters as handled.
+ // Post phase: use default behavior to let the filter creator decide
how to handle
+ // filters that weren't fully pushed down.
+ if !matches!(phase, FilterPushdownPhase::Pre) {
+ return
Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
+ }
+
+ // UnionExec needs specialized filter pushdown handling when children
have
+ // heterogeneous pushdown support. Without this, when some children
support
+ // pushdown and others don't, the default behavior would leave
FilterExec
+ // above UnionExec, re-applying filters to outputs of all
children—including
+ // those that already applied the filters via pushdown. This
specialized
+ // implementation adds FilterExec only to children that don't support
+ // pushdown, avoiding redundant filtering and improving performance.
+ //
+ // Example: Given Child1 (no pushdown support) and Child2 (has
pushdown support)
+ // Default behavior: This implementation:
+ // FilterExec UnionExec
+ // UnionExec FilterExec
+ // Child1 Child1
+ // Child2(filter) Child2(filter)
+
+ // Collect unsupported filters for each child
+ let mut unsupported_filters_per_child = vec![Vec::new();
self.inputs.len()];
+ for parent_filter_result in
child_pushdown_result.parent_filters.iter() {
+ for (child_idx, &child_result) in
+ parent_filter_result.child_results.iter().enumerate()
+ {
+ if matches!(child_result, PushedDown::No) {
+ unsupported_filters_per_child[child_idx]
+ .push(Arc::clone(&parent_filter_result.filter));
+ }
+ }
+ }
+
+ // Wrap children that have unsupported filters with FilterExec
+ let mut new_children = self.inputs.clone();
+ for (child_idx, unsupported_filters) in
+ unsupported_filters_per_child.iter().enumerate()
+ {
+ if !unsupported_filters.is_empty() {
+ let combined_filter = conjunction(unsupported_filters.clone());
+ new_children[child_idx] = Arc::new(FilterExec::try_new(
+ combined_filter,
+ Arc::clone(&self.inputs[child_idx]),
+ )?);
+ }
+ }
+
+ // Check if any children were modified
+ let children_modified = new_children
+ .iter()
+ .zip(self.inputs.iter())
+ .any(|(new, old)| !Arc::ptr_eq(new, old));
+
+ let all_filters_pushed =
+ vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()];
+ let propagation = if children_modified {
+ let updated_node = UnionExec::try_new(new_children)?;
+
FilterPushdownPropagation::with_parent_pushdown_result(all_filters_pushed)
+ .with_updated_node(updated_node)
+ } else {
+
FilterPushdownPropagation::with_parent_pushdown_result(all_filters_pushed)
+ };
+
+ // Report all parent filters as supported since we've ensured they're
applied
+ // on all children (either pushed down or via FilterExec)
+ Ok(propagation)
+ }
}
/// Combines multiple input streams by interleaving them.
diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
index aa94e2e2f2..e2473ee328 100644
--- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
@@ -674,3 +674,66 @@ logical_plan
physical_plan
01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]},
projection=[id, tags], file_type=parquet, predicate=id@0 > 1 AND
array_has(tags@1, rust), pruning_predicate=id_null_count@1 != row_count@2 AND
id_max@0 > 1, required_guarantees=[]
+
+###
+# Test filter pushdown through UNION with mixed support
+# This tests the case where one child supports filter pushdown (parquet) and
one doesn't (memory table)
+###
+
+# enable filter pushdown
+statement ok
+set datafusion.execution.parquet.pushdown_filters = true;
+
+statement ok
+set datafusion.optimizer.max_passes = 0;
+
+# Create memory table with matching schema (a: VARCHAR, b: BIGINT)
+statement ok
+CREATE TABLE t_union_mem(a VARCHAR, b BIGINT) AS VALUES ('qux', 4), ('quux',
5);
+
+# Create parquet table with matching schema
+statement ok
+CREATE EXTERNAL TABLE t_union_parquet(a VARCHAR, b BIGINT) STORED AS PARQUET
+LOCATION 'test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet';
+
+# Query results combining memory table and Parquet with filter
+query I rowsort
+SELECT b FROM (
+ SELECT a, b FROM t_union_mem
+ UNION ALL
+ SELECT a, b FROM t_union_parquet
+) WHERE b > 2;
+----
+3
+4
+5
+50
+
+# Explain the union query - filter should be pushed to parquet but not memory
table
+query TT
+EXPLAIN SELECT b FROM (
+ SELECT a, b FROM t_union_mem
+ UNION ALL
+ SELECT a, b FROM t_union_parquet
+) WHERE b > 2;
+----
+logical_plan
+01)Projection: b
+02)--Filter: b > Int64(2)
+03)----Union
+04)------Projection: t_union_mem.a, t_union_mem.b
+05)--------TableScan: t_union_mem
+06)------Projection: t_union_parquet.a, t_union_parquet.b
+07)--------TableScan: t_union_parquet
+physical_plan
+01)UnionExec
+02)--FilterExec: b@0 > 2
+03)----DataSourceExec: partitions=1, partition_sizes=[1]
+04)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet]]},
projection=[b], file_type=parquet, predicate=b@1 > 2,
pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2,
required_guarantees=[]
+
+# Clean up union test tables
+statement ok
+DROP TABLE t_union_mem;
+
+statement ok
+DROP TABLE t_union_parquet;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]