This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new dc0c21bbc8 feat: Pushdown filters through `UnionExec` nodes (#20145)
dc0c21bbc8 is described below

commit dc0c21bbc83be6af1d041c5d1c313ba442036dc5
Author: Huaijin <[email protected]>
AuthorDate: Tue Feb 10 00:18:00 2026 +0800

    feat: Pushdown filters through `UnionExec` nodes (#20145)
    
    ## Which issue does this PR close?
    
    - Closes #20144
    
    ## Rationale for this change
    
    see #20144
    
    ## What changes are included in this PR?
    
    This PR impl `handle_child_pushdown_result` for `UnionExec`, for any
    case, the filter will always pushdown to UnionExec
    
    UnionExec needs specialized filter pushdown handling when children have
    heterogeneous pushdown support. Without this, when some children support
    pushdown and others don't, the default behavior would leave FilterExec
    above UnionExec, re-applying filters to outputs of all
    children—including those that already applied the filters via pushdown.
    This specialized implementation adds FilterExec only to children that
    don't support pushdown, avoiding redundant filtering and improving
    performance.
    
    ```
    Example: Given Child1 (no pushdown support) and Child2 (has pushdown 
support)
    Default behavior:          This implementation:
    FilterExec                 UnionExec
      UnionExec                  FilterExec
        Child1                     Child1
        Child2(filter)           Child2(filter)
    ```
    
    ## Are these changes tested?
    
    yes, add two test cases
    
    ## Are there any user-facing changes?
---
 .../tests/physical_optimizer/filter_pushdown.rs    | 61 +++++++++++++++
 datafusion/physical-plan/src/union.rs              | 87 +++++++++++++++++++++-
 .../test_files/parquet_filter_pushdown.slt         | 63 ++++++++++++++++
 3 files changed, 209 insertions(+), 2 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs 
b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
index 31a21274ad..b3ed8d9653 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
@@ -1808,6 +1808,67 @@ fn test_filter_pushdown_through_union() {
     );
 }
 
+#[test]
+fn test_filter_pushdown_through_union_mixed_support() {
+    // Test case where one child supports filter pushdown and one doesn't
+    let scan1 = TestScanBuilder::new(schema()).with_support(true).build();
+    let scan2 = TestScanBuilder::new(schema()).with_support(false).build();
+
+    let union = UnionExec::try_new(vec![scan1, scan2]).unwrap();
+
+    let predicate = col_lit_predicate("a", "foo", &schema());
+    let plan = Arc::new(FilterExec::try_new(predicate, union).unwrap());
+
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, FilterPushdown::new(), true),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = foo
+        -   UnionExec
+        -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
+        -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=false
+      output:
+        Ok:
+          - UnionExec
+          -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = 
foo
+          -   FilterExec: a@0 = foo
+          -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=false
+    "
+    );
+}
+
+#[test]
+fn test_filter_pushdown_through_union_does_not_support() {
+    // Test case where one child supports filter pushdown and one doesn't
+    let scan1 = TestScanBuilder::new(schema()).with_support(false).build();
+    let scan2 = TestScanBuilder::new(schema()).with_support(false).build();
+
+    let union = UnionExec::try_new(vec![scan1, scan2]).unwrap();
+
+    let predicate = col_lit_predicate("a", "foo", &schema());
+    let plan = Arc::new(FilterExec::try_new(predicate, union).unwrap());
+
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, FilterPushdown::new(), true),
+        @"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = foo
+        -   UnionExec
+        -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=false
+        -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=false
+      output:
+        Ok:
+          - UnionExec
+          -   FilterExec: a@0 = foo
+          -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=false
+          -   FilterExec: a@0 = foo
+          -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=false
+    "
+    );
+}
+
 /// Schema:
 /// a: String
 /// b: String
diff --git a/datafusion/physical-plan/src/union.rs 
b/datafusion/physical-plan/src/union.rs
index b6f943886e..4ebb8910fa 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -36,7 +36,11 @@ use crate::execution_plan::{
     InvariantLevel, boundedness_from_children, check_default_invariants,
     emission_type_from_children,
 };
-use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
+use crate::filter::FilterExec;
+use crate::filter_pushdown::{
+    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
+    FilterPushdownPropagation, PushedDown,
+};
 use crate::metrics::BaselineMetrics;
 use crate::projection::{ProjectionExec, make_with_child};
 use crate::stream::ObservedStream;
@@ -49,7 +53,9 @@ use datafusion_common::{
     Result, assert_or_internal_err, exec_err, internal_datafusion_err,
 };
 use datafusion_execution::TaskContext;
-use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr, 
calculate_union};
+use datafusion_physical_expr::{
+    EquivalenceProperties, PhysicalExpr, calculate_union, conjunction,
+};
 
 use futures::Stream;
 use itertools::Itertools;
@@ -370,6 +376,83 @@ impl ExecutionPlan for UnionExec {
     ) -> Result<FilterDescription> {
         FilterDescription::from_children(parent_filters, &self.children())
     }
+
+    fn handle_child_pushdown_result(
+        &self,
+        phase: FilterPushdownPhase,
+        child_pushdown_result: ChildPushdownResult,
+        _config: &ConfigOptions,
+    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
+        // Pre phase: handle heterogeneous pushdown by wrapping individual
+        // children with FilterExec and reporting all filters as handled.
+        // Post phase: use default behavior to let the filter creator decide 
how to handle
+        // filters that weren't fully pushed down.
+        if !matches!(phase, FilterPushdownPhase::Pre) {
+            return 
Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
+        }
+
+        // UnionExec needs specialized filter pushdown handling when children 
have
+        // heterogeneous pushdown support. Without this, when some children 
support
+        // pushdown and others don't, the default behavior would leave 
FilterExec
+        // above UnionExec, re-applying filters to outputs of all 
children—including
+        // those that already applied the filters via pushdown. This 
specialized
+        // implementation adds FilterExec only to children that don't support
+        // pushdown, avoiding redundant filtering and improving performance.
+        //
+        // Example: Given Child1 (no pushdown support) and Child2 (has 
pushdown support)
+        //   Default behavior:          This implementation:
+        //   FilterExec                 UnionExec
+        //     UnionExec                  FilterExec
+        //       Child1                     Child1
+        //       Child2(filter)           Child2(filter)
+
+        // Collect unsupported filters for each child
+        let mut unsupported_filters_per_child = vec![Vec::new(); 
self.inputs.len()];
+        for parent_filter_result in 
child_pushdown_result.parent_filters.iter() {
+            for (child_idx, &child_result) in
+                parent_filter_result.child_results.iter().enumerate()
+            {
+                if matches!(child_result, PushedDown::No) {
+                    unsupported_filters_per_child[child_idx]
+                        .push(Arc::clone(&parent_filter_result.filter));
+                }
+            }
+        }
+
+        // Wrap children that have unsupported filters with FilterExec
+        let mut new_children = self.inputs.clone();
+        for (child_idx, unsupported_filters) in
+            unsupported_filters_per_child.iter().enumerate()
+        {
+            if !unsupported_filters.is_empty() {
+                let combined_filter = conjunction(unsupported_filters.clone());
+                new_children[child_idx] = Arc::new(FilterExec::try_new(
+                    combined_filter,
+                    Arc::clone(&self.inputs[child_idx]),
+                )?);
+            }
+        }
+
+        // Check if any children were modified
+        let children_modified = new_children
+            .iter()
+            .zip(self.inputs.iter())
+            .any(|(new, old)| !Arc::ptr_eq(new, old));
+
+        let all_filters_pushed =
+            vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()];
+        let propagation = if children_modified {
+            let updated_node = UnionExec::try_new(new_children)?;
+            
FilterPushdownPropagation::with_parent_pushdown_result(all_filters_pushed)
+                .with_updated_node(updated_node)
+        } else {
+            
FilterPushdownPropagation::with_parent_pushdown_result(all_filters_pushed)
+        };
+
+        // Report all parent filters as supported since we've ensured they're 
applied
+        // on all children (either pushed down or via FilterExec)
+        Ok(propagation)
+    }
 }
 
 /// Combines multiple input streams by interleaving them.
diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt 
b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
index aa94e2e2f2..e2473ee328 100644
--- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
@@ -674,3 +674,66 @@ logical_plan
 physical_plan
 01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
 02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]},
 projection=[id, tags], file_type=parquet, predicate=id@0 > 1 AND 
array_has(tags@1, rust), pruning_predicate=id_null_count@1 != row_count@2 AND 
id_max@0 > 1, required_guarantees=[]
+
+###
+# Test filter pushdown through UNION with mixed support
+# This tests the case where one child supports filter pushdown (parquet) and 
one doesn't (memory table)
+###
+
+# enable filter pushdown
+statement ok
+set datafusion.execution.parquet.pushdown_filters = true;
+
+statement ok
+set datafusion.optimizer.max_passes = 0;
+
+# Create memory table with matching schema (a: VARCHAR, b: BIGINT)
+statement ok
+CREATE TABLE t_union_mem(a VARCHAR, b BIGINT) AS VALUES ('qux', 4), ('quux', 
5);
+
+# Create parquet table with matching schema
+statement ok
+CREATE EXTERNAL TABLE t_union_parquet(a VARCHAR, b BIGINT) STORED AS PARQUET
+LOCATION 'test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet';
+
+# Query results combining memory table and Parquet with filter
+query I rowsort
+SELECT b FROM (
+  SELECT a, b FROM t_union_mem
+  UNION ALL
+  SELECT a, b FROM t_union_parquet
+) WHERE b > 2;
+----
+3
+4
+5
+50
+
+# Explain the union query - filter should be pushed to parquet but not memory 
table
+query TT
+EXPLAIN SELECT b FROM (
+  SELECT a, b FROM t_union_mem
+  UNION ALL
+  SELECT a, b FROM t_union_parquet
+) WHERE b > 2;
+----
+logical_plan
+01)Projection: b
+02)--Filter: b > Int64(2)
+03)----Union
+04)------Projection: t_union_mem.a, t_union_mem.b
+05)--------TableScan: t_union_mem
+06)------Projection: t_union_parquet.a, t_union_parquet.b
+07)--------TableScan: t_union_parquet
+physical_plan
+01)UnionExec
+02)--FilterExec: b@0 > 2
+03)----DataSourceExec: partitions=1, partition_sizes=[1]
+04)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet]]},
 projection=[b], file_type=parquet, predicate=b@1 > 2, 
pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, 
required_guarantees=[]
+
+# Clean up union test tables
+statement ok
+DROP TABLE t_union_mem;
+
+statement ok
+DROP TABLE t_union_parquet;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to