This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new b80bf2ca8e fix: filter pushdown when merge filter (#20110)
b80bf2ca8e is described below
commit b80bf2ca8ef74900fee96a1cc169bdedf53b36fc
Author: Huaijin <[email protected]>
AuthorDate: Wed Feb 4 05:26:51 2026 +0800
fix: filter pushdown when merge filter (#20110)
## Which issue does this PR close?
- Closes #20109
## Rationale for this change
see issue #20109
## What changes are included in this PR?
1. Remap parent filter expressions: When a FilterExec has a projection,
remap unsupported parent filter expressions from output schema
coordinates to input schema coordinates using `reassign_expr_columns()`
before combining them with the current filter's predicates.
2. Preserve projection: When creating the merged FilterExec, preserve
the original projection instead of discarding it .
## Are these changes tested?
yes, add some test case
## Are there any user-facing changes?
---------
Co-authored-by: Adrian Garcia Badaracco
<[email protected]>
---
.../tests/physical_optimizer/filter_pushdown.rs | 87 ++++++++++++++++++++++
datafusion/physical-plan/src/filter.rs | 30 ++++++--
2 files changed, 110 insertions(+), 7 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
index 3a00150685..31a21274ad 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
@@ -3720,3 +3720,90 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used()
{
);
}
}
+
+/// Regression test for https://github.com/apache/datafusion/issues/20109
+#[tokio::test]
+async fn test_filter_with_projection_pushdown() {
+ use arrow::array::{Int64Array, RecordBatch, StringArray};
+ use datafusion_physical_plan::collect;
+ use datafusion_physical_plan::filter::FilterExecBuilder;
+
+ // Create schema: [time, event, size]
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("time", DataType::Int64, false),
+ Field::new("event", DataType::Utf8, false),
+ Field::new("size", DataType::Int64, false),
+ ]));
+
+ // Create sample data
+ let timestamps = vec![100i64, 200, 300, 400, 500];
+ let events = vec!["Ingestion", "Ingestion", "Query", "Ingestion", "Query"];
+ let sizes = vec![10i64, 20, 30, 40, 50];
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(Int64Array::from(timestamps)),
+ Arc::new(StringArray::from(events)),
+ Arc::new(Int64Array::from(sizes)),
+ ],
+ )
+ .unwrap();
+
+ // Create data source
+ let memory_exec =
datafusion_datasource::memory::MemorySourceConfig::try_new_exec(
+ &[vec![batch]],
+ schema.clone(),
+ None,
+ )
+ .unwrap();
+
+ // First FilterExec: time < 350 with projection=[event@1, size@2]
+ let time_col = col("time", &memory_exec.schema()).unwrap();
+ let time_filter = Arc::new(BinaryExpr::new(
+ time_col,
+ Operator::Lt,
+ Arc::new(Literal::new(ScalarValue::Int64(Some(350)))),
+ ));
+ let filter1 = Arc::new(
+ FilterExecBuilder::new(time_filter, memory_exec)
+ .apply_projection(Some(vec![1, 2]))
+ .unwrap()
+ .build()
+ .unwrap(),
+ );
+
+ // Second FilterExec: event = 'Ingestion' with projection=[size@1]
+ let event_col = col("event", &filter1.schema()).unwrap();
+ let event_filter = Arc::new(BinaryExpr::new(
+ event_col,
+ Operator::Eq,
+ Arc::new(Literal::new(ScalarValue::Utf8(Some(
+ "Ingestion".to_string(),
+ )))),
+ ));
+ let filter2 = Arc::new(
+ FilterExecBuilder::new(event_filter, filter1)
+ .apply_projection(Some(vec![1]))
+ .unwrap()
+ .build()
+ .unwrap(),
+ );
+
+ // Apply filter pushdown optimization
+ let config = ConfigOptions::default();
+ let optimized_plan = FilterPushdown::new()
+ .optimize(Arc::clone(&filter2) as Arc<dyn ExecutionPlan>, &config)
+ .unwrap();
+
+ // Execute the optimized plan - this should not error
+ let ctx = SessionContext::new();
+ let result = collect(optimized_plan, ctx.task_ctx()).await.unwrap();
+
+ // Verify results: should return rows where time < 350 AND event =
'Ingestion'
+ // That's rows with time=100,200 (both have event='Ingestion'), so sizes
10,20
+ let expected = [
+ "+------+", "| size |", "+------+", "| 10 |", "| 20 |", "+------+",
+ ];
+ assert_batches_eq!(expected, &result);
+}
diff --git a/datafusion/physical-plan/src/filter.rs
b/datafusion/physical-plan/src/filter.rs
index 50fae84b85..0acf419e67 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -57,7 +57,7 @@ use datafusion_expr::Operator;
use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit};
use datafusion_physical_expr::intervals::utils::check_support;
-use datafusion_physical_expr::utils::collect_columns;
+use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
use datafusion_physical_expr::{
AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries,
PhysicalExpr, analyze,
conjunction, split_conjunction,
@@ -623,10 +623,26 @@ impl ExecutionPlan for FilterExec {
return
Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
}
// We absorb any parent filters that were not handled by our children
- let unsupported_parent_filters =
- child_pushdown_result.parent_filters.iter().filter_map(|f| {
- matches!(f.all(),
PushedDown::No).then_some(Arc::clone(&f.filter))
- });
+ let mut unsupported_parent_filters: Vec<Arc<dyn PhysicalExpr>> =
+ child_pushdown_result
+ .parent_filters
+ .iter()
+ .filter_map(|f| {
+ matches!(f.all(),
PushedDown::No).then_some(Arc::clone(&f.filter))
+ })
+ .collect();
+
+ // If this FilterExec has a projection, the unsupported parent filters
+ // are in the output schema (after projection) coordinates. We need to
+ // remap them to the input schema coordinates before combining with
self filters.
+ if self.projection.is_some() {
+ let input_schema = self.input().schema();
+ unsupported_parent_filters = unsupported_parent_filters
+ .into_iter()
+ .map(|expr| reassign_expr_columns(expr, &input_schema))
+ .collect::<Result<Vec<_>>>()?;
+ }
+
let unsupported_self_filters = child_pushdown_result
.self_filters
.first()
@@ -674,7 +690,7 @@ impl ExecutionPlan for FilterExec {
// The new predicate is the same as our current predicate
None
} else {
- // Create a new FilterExec with the new predicate
+ // Create a new FilterExec with the new predicate, preserving the
projection
let new = FilterExec {
predicate: Arc::clone(&new_predicate),
input: Arc::clone(&filter_input),
@@ -686,7 +702,7 @@ impl ExecutionPlan for FilterExec {
self.default_selectivity,
self.projection.as_ref(),
)?,
- projection: None,
+ projection: self.projection.clone(),
batch_size: self.batch_size,
fetch: self.fetch,
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]