This is an automated email from the ASF dual-hosted git repository.

blaginin pushed a commit to branch annarose/dict-coercion
in repository https://gitbox.apache.org/repos/asf/datafusion-sandbox.git

commit b80bf2ca8ef74900fee96a1cc169bdedf53b36fc
Author: Huaijin <[email protected]>
AuthorDate: Wed Feb 4 05:26:51 2026 +0800

    fix: filter pushdown when merge filter (#20110)
    
    ## Which issue does this PR close?
    
    - Closes #20109
    
    ## Rationale for this change
    
    see issue #20109
    
    ## What changes are included in this PR?
    
    1. Remap parent filter expressions: When a FilterExec has a projection,
    remap unsupported parent filter expressions from output schema
    coordinates to input schema coordinates using `reassign_expr_columns()`
    before combining them with the current filter's predicates.
    
    2. Preserve projection: When creating the merged FilterExec, preserve
    the original projection instead of discarding it .
    
    ## Are these changes tested?
    
    yes, add some test case
    
    ## Are there any user-facing changes?
    
    ---------
    
    Co-authored-by: Adrian Garcia Badaracco 
<[email protected]>
---
 .../tests/physical_optimizer/filter_pushdown.rs    | 87 ++++++++++++++++++++++
 datafusion/physical-plan/src/filter.rs             | 30 ++++++--
 2 files changed, 110 insertions(+), 7 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs 
b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
index 3a0015068..31a21274a 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
@@ -3720,3 +3720,90 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used() 
{
         );
     }
 }
+
+/// Regression test for https://github.com/apache/datafusion/issues/20109
+#[tokio::test]
+async fn test_filter_with_projection_pushdown() {
+    use arrow::array::{Int64Array, RecordBatch, StringArray};
+    use datafusion_physical_plan::collect;
+    use datafusion_physical_plan::filter::FilterExecBuilder;
+
+    // Create schema: [time, event, size]
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("time", DataType::Int64, false),
+        Field::new("event", DataType::Utf8, false),
+        Field::new("size", DataType::Int64, false),
+    ]));
+
+    // Create sample data
+    let timestamps = vec![100i64, 200, 300, 400, 500];
+    let events = vec!["Ingestion", "Ingestion", "Query", "Ingestion", "Query"];
+    let sizes = vec![10i64, 20, 30, 40, 50];
+
+    let batch = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int64Array::from(timestamps)),
+            Arc::new(StringArray::from(events)),
+            Arc::new(Int64Array::from(sizes)),
+        ],
+    )
+    .unwrap();
+
+    // Create data source
+    let memory_exec = 
datafusion_datasource::memory::MemorySourceConfig::try_new_exec(
+        &[vec![batch]],
+        schema.clone(),
+        None,
+    )
+    .unwrap();
+
+    // First FilterExec: time < 350 with projection=[event@1, size@2]
+    let time_col = col("time", &memory_exec.schema()).unwrap();
+    let time_filter = Arc::new(BinaryExpr::new(
+        time_col,
+        Operator::Lt,
+        Arc::new(Literal::new(ScalarValue::Int64(Some(350)))),
+    ));
+    let filter1 = Arc::new(
+        FilterExecBuilder::new(time_filter, memory_exec)
+            .apply_projection(Some(vec![1, 2]))
+            .unwrap()
+            .build()
+            .unwrap(),
+    );
+
+    // Second FilterExec: event = 'Ingestion' with projection=[size@1]
+    let event_col = col("event", &filter1.schema()).unwrap();
+    let event_filter = Arc::new(BinaryExpr::new(
+        event_col,
+        Operator::Eq,
+        Arc::new(Literal::new(ScalarValue::Utf8(Some(
+            "Ingestion".to_string(),
+        )))),
+    ));
+    let filter2 = Arc::new(
+        FilterExecBuilder::new(event_filter, filter1)
+            .apply_projection(Some(vec![1]))
+            .unwrap()
+            .build()
+            .unwrap(),
+    );
+
+    // Apply filter pushdown optimization
+    let config = ConfigOptions::default();
+    let optimized_plan = FilterPushdown::new()
+        .optimize(Arc::clone(&filter2) as Arc<dyn ExecutionPlan>, &config)
+        .unwrap();
+
+    // Execute the optimized plan - this should not error
+    let ctx = SessionContext::new();
+    let result = collect(optimized_plan, ctx.task_ctx()).await.unwrap();
+
+    // Verify results: should return rows where time < 350 AND event = 
'Ingestion'
+    // That's rows with time=100,200 (both have event='Ingestion'), so sizes 
10,20
+    let expected = [
+        "+------+", "| size |", "+------+", "| 10   |", "| 20   |", "+------+",
+    ];
+    assert_batches_eq!(expected, &result);
+}
diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index 50fae84b8..0acf419e6 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -57,7 +57,7 @@ use datafusion_expr::Operator;
 use datafusion_physical_expr::equivalence::ProjectionMapping;
 use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit};
 use datafusion_physical_expr::intervals::utils::check_support;
-use datafusion_physical_expr::utils::collect_columns;
+use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
 use datafusion_physical_expr::{
     AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, 
PhysicalExpr, analyze,
     conjunction, split_conjunction,
@@ -623,10 +623,26 @@ impl ExecutionPlan for FilterExec {
             return 
Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
         }
         // We absorb any parent filters that were not handled by our children
-        let unsupported_parent_filters =
-            child_pushdown_result.parent_filters.iter().filter_map(|f| {
-                matches!(f.all(), 
PushedDown::No).then_some(Arc::clone(&f.filter))
-            });
+        let mut unsupported_parent_filters: Vec<Arc<dyn PhysicalExpr>> =
+            child_pushdown_result
+                .parent_filters
+                .iter()
+                .filter_map(|f| {
+                    matches!(f.all(), 
PushedDown::No).then_some(Arc::clone(&f.filter))
+                })
+                .collect();
+
+        // If this FilterExec has a projection, the unsupported parent filters
+        // are in the output schema (after projection) coordinates. We need to
+        // remap them to the input schema coordinates before combining with 
self filters.
+        if self.projection.is_some() {
+            let input_schema = self.input().schema();
+            unsupported_parent_filters = unsupported_parent_filters
+                .into_iter()
+                .map(|expr| reassign_expr_columns(expr, &input_schema))
+                .collect::<Result<Vec<_>>>()?;
+        }
+
         let unsupported_self_filters = child_pushdown_result
             .self_filters
             .first()
@@ -674,7 +690,7 @@ impl ExecutionPlan for FilterExec {
             // The new predicate is the same as our current predicate
             None
         } else {
-            // Create a new FilterExec with the new predicate
+            // Create a new FilterExec with the new predicate, preserving the 
projection
             let new = FilterExec {
                 predicate: Arc::clone(&new_predicate),
                 input: Arc::clone(&filter_input),
@@ -686,7 +702,7 @@ impl ExecutionPlan for FilterExec {
                     self.default_selectivity,
                     self.projection.as_ref(),
                 )?,
-                projection: None,
+                projection: self.projection.clone(),
                 batch_size: self.batch_size,
                 fetch: self.fetch,
             };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to