This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-52 by this push:
new 8aaa274aae [branch-52] fix: filter pushdown when merge filter (#20110)
(#20289)
8aaa274aae is described below
commit 8aaa274aae0dbeffb1ef51064afda878871c2f66
Author: Huaijin <[email protected]>
AuthorDate: Thu Feb 12 08:36:17 2026 +0800
[branch-52] fix: filter pushdown when merge filter (#20110) (#20289)
## Which issue does this PR close?
- related to https://github.com/apache/datafusion/issues/20287
## Rationale for this change
see issue #20109
## What changes are included in this PR?
1. Remap parent filter expressions: When a FilterExec has a projection,
remap unsupported parent filter expressions from output schema
coordinates to input schema coordinates using `reassign_expr_columns()`
before combining them with the current filter's predicates.
2. Preserve projection: When creating the merged FilterExec, preserve
the original projection instead of discarding it .
## Are these changes tested?
yes, add some test case
## Are there any user-facing changes?
---------
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Closes #.
## Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
## What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
## Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
---------
Co-authored-by: Adrian Garcia Badaracco
<[email protected]>
---
.../physical_optimizer/filter_pushdown/mod.rs | 85 ++++++++++++++++++++++
datafusion/physical-plan/src/filter.rs | 30 ++++++--
2 files changed, 108 insertions(+), 7 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
index d6357fdf6b..ad0d22e24a 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
@@ -3600,3 +3600,88 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used()
{
);
}
}
+
+/// Regression test for https://github.com/apache/datafusion/issues/20109
+#[tokio::test]
+async fn test_filter_with_projection_pushdown() {
+ use arrow::array::{Int64Array, RecordBatch, StringArray};
+ use datafusion_physical_plan::collect;
+ use datafusion_physical_plan::filter::FilterExec;
+
+ // Create schema: [time, event, size]
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("time", DataType::Int64, false),
+ Field::new("event", DataType::Utf8, false),
+ Field::new("size", DataType::Int64, false),
+ ]));
+
+ // Create sample data
+ let timestamps = vec![100i64, 200, 300, 400, 500];
+ let events = vec!["Ingestion", "Ingestion", "Query", "Ingestion", "Query"];
+ let sizes = vec![10i64, 20, 30, 40, 50];
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(Int64Array::from(timestamps)),
+ Arc::new(StringArray::from(events)),
+ Arc::new(Int64Array::from(sizes)),
+ ],
+ )
+ .unwrap();
+
+ // Create data source
+ let memory_exec =
datafusion_datasource::memory::MemorySourceConfig::try_new_exec(
+ &[vec![batch]],
+ schema.clone(),
+ None,
+ )
+ .unwrap();
+
+ // First FilterExec: time < 350 with projection=[event@1, size@2]
+ let time_col = col("time", &memory_exec.schema()).unwrap();
+ let time_filter = Arc::new(BinaryExpr::new(
+ time_col,
+ Operator::Lt,
+ Arc::new(Literal::new(ScalarValue::Int64(Some(350)))),
+ ));
+ let filter1 = Arc::new(
+ FilterExec::try_new(time_filter, memory_exec)
+ .unwrap()
+ .with_projection(Some(vec![1, 2]))
+ .unwrap(),
+ );
+
+ // Second FilterExec: event = 'Ingestion' with projection=[size@1]
+ let event_col = col("event", &filter1.schema()).unwrap();
+ let event_filter = Arc::new(BinaryExpr::new(
+ event_col,
+ Operator::Eq,
+ Arc::new(Literal::new(ScalarValue::Utf8(Some(
+ "Ingestion".to_string(),
+ )))),
+ ));
+ let filter2 = Arc::new(
+ FilterExec::try_new(event_filter, filter1)
+ .unwrap()
+ .with_projection(Some(vec![1]))
+ .unwrap(),
+ );
+
+ // Apply filter pushdown optimization
+ let config = ConfigOptions::default();
+ let optimized_plan = FilterPushdown::new()
+ .optimize(Arc::clone(&filter2) as Arc<dyn ExecutionPlan>, &config)
+ .unwrap();
+
+ // Execute the optimized plan - this should not error
+ let ctx = SessionContext::new();
+ let result = collect(optimized_plan, ctx.task_ctx()).await.unwrap();
+
+ // Verify results: should return rows where time < 350 AND event =
'Ingestion'
+ // That's rows with time=100,200 (both have event='Ingestion'), so sizes
10,20
+ let expected = [
+ "+------+", "| size |", "+------+", "| 10 |", "| 20 |", "+------+",
+ ];
+ assert_batches_eq!(expected, &result);
+}
diff --git a/datafusion/physical-plan/src/filter.rs
b/datafusion/physical-plan/src/filter.rs
index e724cdad64..82db5c7371 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -57,7 +57,7 @@ use datafusion_expr::Operator;
use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit};
use datafusion_physical_expr::intervals::utils::check_support;
-use datafusion_physical_expr::utils::collect_columns;
+use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
use datafusion_physical_expr::{
AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries,
PhysicalExpr, analyze,
conjunction, split_conjunction,
@@ -526,10 +526,26 @@ impl ExecutionPlan for FilterExec {
return
Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
}
// We absorb any parent filters that were not handled by our children
- let unsupported_parent_filters =
- child_pushdown_result.parent_filters.iter().filter_map(|f| {
- matches!(f.all(),
PushedDown::No).then_some(Arc::clone(&f.filter))
- });
+ let mut unsupported_parent_filters: Vec<Arc<dyn PhysicalExpr>> =
+ child_pushdown_result
+ .parent_filters
+ .iter()
+ .filter_map(|f| {
+ matches!(f.all(),
PushedDown::No).then_some(Arc::clone(&f.filter))
+ })
+ .collect();
+
+ // If this FilterExec has a projection, the unsupported parent filters
+ // are in the output schema (after projection) coordinates. We need to
+ // remap them to the input schema coordinates before combining with
self filters.
+ if self.projection.is_some() {
+ let input_schema = self.input().schema();
+ unsupported_parent_filters = unsupported_parent_filters
+ .into_iter()
+ .map(|expr| reassign_expr_columns(expr, &input_schema))
+ .collect::<Result<Vec<_>>>()?;
+ }
+
let unsupported_self_filters = child_pushdown_result
.self_filters
.first()
@@ -577,7 +593,7 @@ impl ExecutionPlan for FilterExec {
// The new predicate is the same as our current predicate
None
} else {
- // Create a new FilterExec with the new predicate
+ // Create a new FilterExec with the new predicate, preserving the
projection
let new = FilterExec {
predicate: Arc::clone(&new_predicate),
input: Arc::clone(&filter_input),
@@ -589,7 +605,7 @@ impl ExecutionPlan for FilterExec {
self.default_selectivity,
self.projection.as_ref(),
)?,
- projection: None,
+ projection: self.projection.clone(),
batch_size: self.batch_size,
fetch: self.fetch,
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]