alamb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2036145638
########## datafusion/core/tests/physical_optimizer/filter_pushdown.rs: ########## @@ -0,0 +1,529 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ + datasource::object_store::ObjectStoreUrl, + logical_expr::Operator, + physical_plan::{ + expressions::{BinaryExpr, Column, Literal}, + PhysicalExpr, + }, + scalar::ScalarValue, +}; +use datafusion_common::{config::ConfigOptions, Statistics}; +use datafusion_common::{internal_err, Result}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{ + file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, +}; +use datafusion_expr::test::function_stub::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{ + aggregate::AggregateExprBuilder, conjunction, Partitioning, PhysicalExprRef, +}; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_optimizer::filter_pushdown::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::filter_pushdown::{FilterPushdown, FilterPushdownResult}; +use datafusion_physical_plan::{ + aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, + coalesce_batches::CoalesceBatchesExec, + filter::FilterExec, + repartition::RepartitionExec, +}; +use datafusion_physical_plan::{ + displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan, +}; +use object_store::ObjectStore; +use std::sync::{Arc, OnceLock}; +use std::{ + any::Any, + fmt::{Display, Formatter}, +}; + +/// A placeholder data source that accepts filter pushdown +#[derive(Clone, Default)] +struct TestSource { + support: Option<FilterPushdown>, + predicate: Option<PhysicalExprRef>, + statistics: Option<Statistics>, +} + +impl TestSource { + fn new(support: Option<FilterPushdown>) -> Self { + Self { + support, + predicate: None, + statistics: None, + } + } +} + +impl FileSource for TestSource { + fn create_file_opener( + &self, + _object_store: Arc<dyn ObjectStore>, + _base_config: &FileScanConfig, + _partition: usize, + ) -> Arc<dyn FileOpener> { + todo!("should not be called") + } + + fn as_any(&self) -> &dyn Any { + todo!("should not be called") + } + + fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> { + todo!("should not be called") + } + + fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> { + todo!("should not be called") + } + + fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> { + todo!("should not be called") + } + + fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { + Arc::new(TestSource { + statistics: Some(statistics), + ..self.clone() + }) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + todo!("should not be called") + } + + fn statistics(&self) -> Result<Statistics> { + Ok(self + .statistics + .as_ref() + .expect("statistics not set") + .clone()) + } + + fn file_type(&self) -> &str { + "test" + } + + fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let predicate_string = self + .predicate + .as_ref() + .map(|p| format!(", predicate={p}")) + .unwrap_or_default(); + + write!(f, "{}", predicate_string) + } + DisplayFormatType::TreeRender => { + if let Some(predicate) = &self.predicate { + writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; + } + Ok(()) + } + } + } + + fn try_pushdown_filters( + &self, + filters: &[PhysicalExprRef], + config: &ConfigOptions, + ) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> { + let support = match self.support { + Some(support) => support, + None => { + if config.execution.parquet.pushdown_filters { + FilterPushdown::Exact + } else { + FilterPushdown::Inexact + } + } + }; + let new = Arc::new(TestSource { + support: self.support, + predicate: Some(conjunction(filters.iter().map(Arc::clone))), + statistics: self.statistics.clone(), + }); + Ok(FilterPushdownResult::Pushed { + inner: new, + support: vec![support; filters.len()], + }) + } +} + +fn test_scan(support: Option<FilterPushdown>) -> Arc<dyn ExecutionPlan> { + let schema = schema(); + let source = Arc::new(TestSource::new(support)); + let base_config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test://").unwrap(), + Arc::clone(schema), + source, + ) + .build(); + DataSourceExec::from_data_source(base_config) +} + +#[test] +fn test_pushdown_into_scan() { + let scan = test_scan(Some(FilterPushdown::Exact)); + let predicate = col_lit_predicate("a", "foo", schema()); + let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()); + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo + " + ); +} + +/// Show that we can use config options to determine how to do pushdown. +#[test] +fn test_pushdown_into_scan_with_config_options() { + let scan = test_scan(None); + let predicate = col_lit_predicate("a", "foo", schema()); + let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()) as _; + + let mut cfg = ConfigOptions::default(); + cfg.execution.parquet.pushdown_filters = false; + insta::assert_snapshot!( + OptimizationTest::new_with_config( + Arc::clone(&plan), + PushdownFilter {}, + &cfg + ), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo + " + ); + + cfg.execution.parquet.pushdown_filters = true; + insta::assert_snapshot!( + OptimizationTest::new_with_config( + plan, + PushdownFilter {}, + &cfg + ), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo + " + ); +} + +#[test] +fn test_filter_collapse() { + // filter should be pushed down into the parquet scan with two filters + let scan = test_scan(Some(FilterPushdown::Exact)); + let predicate1 = col_lit_predicate("a", "foo", schema()); + let filter1 = Arc::new(FilterExec::try_new(predicate1, scan).unwrap()); + let predicate2 = col_lit_predicate("b", "bar", schema()); + let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - FilterExec: b@1 = bar + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=b@1 = bar AND a@0 = foo + " + ); +} + +#[test] +fn test_filter_with_projection() { + let scan = test_scan(Some(FilterPushdown::Exact)); + let projection = vec![1, 0]; + let projected_schema = Arc::new(schema().project(&projection).unwrap()); + let predicate = col_lit_predicate("a", "foo", &projected_schema); + let plan = Arc::new( + FilterExec::try_new(predicate, Arc::clone(&scan)) + .unwrap() + .with_projection(Some(projection)) + .unwrap(), + ); + // expect the predicate to be pushed down into the DataSource but the FilterExec to be kept for its projection + // the pushed down filters should have their indices adjusted + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - FilterExec: a@1 = foo, projection=[b@1, a@0] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - FilterExec: true, projection=[b@1, a@0] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo + ", + ); + + // add a test where the filter is on a column that isn't included in the output + let projection = vec![1]; + let predicate = col_lit_predicate("a", "foo", schema()); + let plan = Arc::new( + FilterExec::try_new(predicate, scan) + .unwrap() + .with_projection(Some(projection)) + .unwrap(), + ); + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo, projection=[b@1] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - FilterExec: true, projection=[b@1] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo + " + ); +} + +#[test] +fn test_push_down_through_transparent_nodes() { + // expect the predicate to be pushed down into the DataSource + let scan = test_scan(Some(FilterPushdown::Exact)); + let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 1)); + let predicate = col_lit_predicate("a", "foo", schema()); + let filter = Arc::new(FilterExec::try_new(predicate, coalesce).unwrap()); + let repartition = Arc::new( + RepartitionExec::try_new(filter, Partitioning::RoundRobinBatch(1)).unwrap(), + ); + let predicate = col_lit_predicate("a", "bar", schema()); + let plan = Arc::new(FilterExec::try_new(predicate, repartition).unwrap()); + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: Review Comment: I think the initial usecase for this rule is filters that we don't know about during logical planning (dynamic filters in particular) that are then pushed into the scans However, I think this PR (rightly) sets us up for more sophisticated filter pushdowns (or for example, will allow filter pushdown after custom optimizer passes). So TLDR I think is important to keep the future usecases in mind as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org