berkaysynnada commented on code in PR #15955: URL: https://github.com/apache/datafusion/pull/15955#discussion_r2075024967
########## datafusion/core/tests/physical_optimizer/push_down_filter/mod.rs: ########## @@ -0,0 +1,514 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + any::Any, + sync::{Arc, LazyLock}, +}; + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ + logical_expr::Operator, + physical_plan::{ + expressions::{BinaryExpr, Column, Literal}, + PhysicalExpr, + }, + scalar::ScalarValue, +}; +use datafusion_common::{config::ConfigOptions, Result}; +use datafusion_functions_aggregate::count::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{aggregate::AggregateExprBuilder, Partitioning}; +use datafusion_physical_optimizer::filter_pushdown::FilterPushdown; +use datafusion_physical_plan::{ + aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, + coalesce_batches::CoalesceBatchesExec, + filter::FilterExec, + filter_pushdown::{ + ChildPushdownResult, FilterDescription, FilterPushdownPropagation, + PredicateSupport, + }, + repartition::RepartitionExec, + DisplayAs, ExecutionPlan, PlanProperties, +}; + +use util::{OptimizationTest, TestScanBuilder}; + +mod util; + +#[test] +fn test_pushdown_into_scan() { + let scan = TestScanBuilder::new(schema()).with_support(true).build(); + let predicate = col_lit_predicate("a", "foo", &schema()); + let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()); + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown{}, true), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo + " + ); +} + +/// Show that we can use config options to determine how to do pushdown. +#[test] +fn test_pushdown_into_scan_with_config_options() { + let scan = TestScanBuilder::new(schema()).with_support(true).build(); + let predicate = col_lit_predicate("a", "foo", &schema()); + let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()) as _; + + let mut cfg = ConfigOptions::default(); + insta::assert_snapshot!( + OptimizationTest::new( + Arc::clone(&plan), + FilterPushdown {}, + false + ), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + " + ); + + cfg.execution.parquet.pushdown_filters = true; + insta::assert_snapshot!( + OptimizationTest::new( + plan, + FilterPushdown {}, + true + ), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo + " + ); +} + +#[test] +fn test_filter_collapse() { + // filter should be pushed down into the parquet scan with two filters + let scan = TestScanBuilder::new(schema()).with_support(true).build(); + let predicate1 = col_lit_predicate("a", "foo", &schema()); + let filter1 = Arc::new(FilterExec::try_new(predicate1, scan).unwrap()); + let predicate2 = col_lit_predicate("b", "bar", &schema()); + let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown{}, true), + @r" + OptimizationTest: + input: + - FilterExec: b@1 = bar + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar + " + ); +} + +#[test] +fn test_filter_with_projection() { + let scan = TestScanBuilder::new(schema()).with_support(true).build(); + let projection = vec![1, 0]; + let predicate = col_lit_predicate("a", "foo", &schema()); + let plan = Arc::new( + FilterExec::try_new(predicate, Arc::clone(&scan)) + .unwrap() + .with_projection(Some(projection)) + .unwrap(), + ); + + // expect the predicate to be pushed down into the DataSource but the FilterExec to be converted to ProjectionExec + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown{}, true), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo, projection=[b@1, a@0] + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - ProjectionExec: expr=[b@1 as b, a@0 as a] + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo + ", + ); + + // add a test where the filter is on a column that isn't included in the output + let projection = vec![1]; + let predicate = col_lit_predicate("a", "foo", &schema()); + let plan = Arc::new( + FilterExec::try_new(predicate, scan) + .unwrap() + .with_projection(Some(projection)) + .unwrap(), + ); + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown{},true), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo, projection=[b@1] + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - ProjectionExec: expr=[b@1 as b] + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo + " + ); +} + +#[test] +fn test_push_down_through_transparent_nodes() { + // expect the predicate to be pushed down into the DataSource + let scan = TestScanBuilder::new(schema()).with_support(true).build(); + let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 1)); + let predicate = col_lit_predicate("a", "foo", &schema()); + let filter = Arc::new(FilterExec::try_new(predicate, coalesce).unwrap()); + let repartition = Arc::new( + RepartitionExec::try_new(filter, Partitioning::RoundRobinBatch(1)).unwrap(), + ); + let predicate = col_lit_predicate("b", "bar", &schema()); + let plan = Arc::new(FilterExec::try_new(predicate, repartition).unwrap()); + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown{},true), + @r" + OptimizationTest: + input: + - FilterExec: b@1 = bar + - RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=1 + - FilterExec: a@0 = foo + - CoalesceBatchesExec: target_batch_size=1 + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=1 + - CoalesceBatchesExec: target_batch_size=1 + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar + " + ); +} + +#[test] +fn test_no_pushdown_through_aggregates() { + // There are 2 important points here: + // 1. The outer filter **is not** pushed down at all because we haven't implemented pushdown support + // yet for AggregateExec. + // 2. The inner filter **is** pushed down into the DataSource. + let scan = TestScanBuilder::new(schema()).with_support(true).build(); + + let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 10)); + + let filter = Arc::new( + FilterExec::try_new(col_lit_predicate("a", "foo", &schema()), coalesce).unwrap(), + ); + + let aggregate_expr = + vec![ + AggregateExprBuilder::new(count_udaf(), vec![col("a", &schema()).unwrap()]) + .schema(schema()) + .alias("cnt") + .build() + .map(Arc::new) + .unwrap(), + ]; + let group_by = PhysicalGroupBy::new_single(vec![ + (col("a", &schema()).unwrap(), "a".to_string()), + (col("b", &schema()).unwrap(), "b".to_string()), + ]); + let aggregate = Arc::new( + AggregateExec::try_new( + AggregateMode::Final, + group_by, + aggregate_expr.clone(), + vec![None], + filter, + schema(), + ) + .unwrap(), + ); + + let coalesce = Arc::new(CoalesceBatchesExec::new(aggregate, 100)); + + let predicate = col_lit_predicate("b", "bar", &schema()); + let plan = Arc::new(FilterExec::try_new(predicate, coalesce).unwrap()); + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown{}, true), + @r" + OptimizationTest: + input: + - FilterExec: b@1 = bar + - CoalesceBatchesExec: target_batch_size=100 + - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=PartiallySorted([0]) + - FilterExec: a@0 = foo + - CoalesceBatchesExec: target_batch_size=10 + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - FilterExec: b@1 = bar + - CoalesceBatchesExec: target_batch_size=100 + - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt] + - CoalesceBatchesExec: target_batch_size=10 + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo + " + ); +} + +/// Test various combinations of handling of child pushdown results +/// in an ExectionPlan in combination with support/not support in a DataSource. +#[test] +fn test_node_handles_child_pushdown_result() { + #[derive(Debug)] + struct TestNode { + inject_filter: bool, + input: Arc<dyn ExecutionPlan>, + predicate: Arc<dyn PhysicalExpr>, + } + + impl TestNode { + fn new( + inject_filter: bool, + input: Arc<dyn ExecutionPlan>, + predicate: Arc<dyn PhysicalExpr>, + ) -> Self { + Self { + inject_filter, + input, + predicate, + } + } + } + + impl DisplayAs for TestNode { + fn fmt_as( + &self, + _t: datafusion_physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!( + f, + "TestInsertExec {{ inject_filter: {} }}", + self.inject_filter + ) + } + } + + impl ExecutionPlan for TestNode { + fn name(&self) -> &str { + "TestInsertExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + self.input.properties() + } + + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + vec![&self.input] + } + + fn with_new_children( + self: Arc<Self>, + children: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + assert!(children.len() == 1); + Ok(Arc::new(TestNode::new( + self.inject_filter, + children[0].clone(), + self.predicate.clone(), + ))) + } + + fn execute( + &self, + _partition: usize, + _context: Arc<datafusion_execution::TaskContext>, + ) -> Result<datafusion_execution::SendableRecordBatchStream> { + unimplemented!("TestInsertExec is a stub for testing.") + } + + fn gather_filters_for_pushdown( + &self, + parent_filters: Vec<Arc<dyn PhysicalExpr>>, + _config: &ConfigOptions, + ) -> Result<FilterDescription> { + Ok(FilterDescription::new_with_child_count(1) + .all_parent_filters_supported(parent_filters) + .with_self_filter(Arc::clone(&self.predicate))) + } + + fn handle_child_pushdown_result( + &self, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> { + if self.inject_filter { + // Add a FilterExec if our own filter was not handled by the child + + // We have 1 child + assert_eq!(child_pushdown_result.self_filters.len(), 1); + let self_pushdown_result = child_pushdown_result.self_filters[0].clone(); + // And pushed down 1 filter + assert_eq!(self_pushdown_result.len(), 1); + let self_pushdown_result = self_pushdown_result.into_inner(); + + match &self_pushdown_result[0] { + PredicateSupport::Unsupported(filter) => { + // We have a filter to push down + let new_child = FilterExec::try_new( Review Comment: I see, it'll be an exception. Then it makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org