geoffreyclaude commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2036042823


##########
datafusion/expr/src/filter_pushdown.rs:
##########
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Result of attempting to push down a filter/predicate expression.
+///
+/// This is used by:
+/// * `FilterPushdownResult` in `ExecutionPlan` to do predicate pushdown at 
the physical plan level
+///   (e.g. pushing down dynamic fitlers from a hash join into scans).

Review Comment:
   ```suggestion
   ///   (e.g. pushing down dynamic filters from a hash join into scans).
   ```
   



##########
datafusion/expr/src/filter_pushdown.rs:
##########
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Result of attempting to push down a filter/predicate expression.
+///
+/// This is used by:
+/// * `FilterPushdownResult` in `ExecutionPlan` to do predicate pushdown at 
the physical plan level
+///   (e.g. pushing down dynamic fitlers from a hash join into scans).
+/// * `TableProvider` to do predicate pushdown at planning time (e.g. pruning 
partitions).
+///
+/// There are three possible outcomes of a filter pushdown:
+/// * [`FilterPushdown::Unsupported`] - the filter could not be applied / is 
not understood.
+/// * [`FilterPushdown::Inexact`] - the filter could be applied, but it may 
not be exact.

Review Comment:
   `Inexact` kind of hints that the filter might prune too much. How about 
`Partial` or some other word that makes it clearer only false positives (rows 
that should have been filtered but weren't) are possible? In this context it's 
obvious of course, but without the context a bit less. Eg, someone implementing 
a custom exec node.



##########
datafusion/expr/src/filter_pushdown.rs:
##########
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Result of attempting to push down a filter/predicate expression.
+///
+/// This is used by:
+/// * `FilterPushdownResult` in `ExecutionPlan` to do predicate pushdown at 
the physical plan level
+///   (e.g. pushing down dynamic fitlers from a hash join into scans).
+/// * `TableProvider` to do predicate pushdown at planning time (e.g. pruning 
partitions).
+///
+/// There are three possible outcomes of a filter pushdown:
+/// * [`FilterPushdown::Unsupported`] - the filter could not be applied / is 
not understood.
+/// * [`FilterPushdown::Inexact`] - the filter could be applied, but it may 
not be exact.
+///   The caller should treat this the same as [`FilterPushdown::Unsupported`] 
for the most part
+///   and must not assume that any pruning / filter was done since there may 
be false positives.
+/// * [`FilterPushdown::Exact`] - the filter was absorbed by the child plan 
and it promises
+///   to apply the filter correctly.
+///   The parent plan can drop the filter and assume that the child plan will 
apply it correctly.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum FilterPushdown {
+    /// No child plan was able to absorb the filter.
+    /// In this case the parent **must** behave as if the filter was not 
pushed down
+    /// and must apply the filter itself.
+    Unsupported,
+    /// A child plan may be able to partially apply the filter or a less 
selective version of it,
+    /// but it might return false positives (but no false negatives).
+    /// In this case the parent **must** behave as if the filter was not 
pushed down
+    /// and must apply the filter itself.
+    Inexact,
+    /// Filter was pushed down to the child plan and the child plan promises 
that
+    /// it will apply the filter correctly with no false positives or false 
negatives.

Review Comment:
   ```suggestion
       /// Filter was pushed down to the child plan and the child plan 
guarantees that
       /// it will apply the filter correctly.
   ```
   `no false positives or false negatives` is redundant with `correctly`. 



##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -0,0 +1,529 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::{
+    datasource::object_store::ObjectStoreUrl,
+    logical_expr::Operator,
+    physical_plan::{
+        expressions::{BinaryExpr, Column, Literal},
+        PhysicalExpr,
+    },
+    scalar::ScalarValue,
+};
+use datafusion_common::{config::ConfigOptions, Statistics};
+use datafusion_common::{internal_err, Result};
+use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_datasource::{
+    file::FileSource, file_scan_config::FileScanConfig, 
file_stream::FileOpener,
+};
+use datafusion_expr::test::function_stub::count_udaf;
+use datafusion_physical_expr::expressions::col;
+use datafusion_physical_expr::{
+    aggregate::AggregateExprBuilder, conjunction, Partitioning, 
PhysicalExprRef,
+};
+use datafusion_physical_expr_common::physical_expr::fmt_sql;
+use datafusion_physical_optimizer::filter_pushdown::PushdownFilter;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::filter_pushdown::{FilterPushdown, 
FilterPushdownResult};
+use datafusion_physical_plan::{
+    aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
+    coalesce_batches::CoalesceBatchesExec,
+    filter::FilterExec,
+    repartition::RepartitionExec,
+};
+use datafusion_physical_plan::{
+    displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, 
ExecutionPlan,
+};
+use object_store::ObjectStore;
+use std::sync::{Arc, OnceLock};
+use std::{
+    any::Any,
+    fmt::{Display, Formatter},
+};
+
+/// A placeholder data source that accepts filter pushdown
+#[derive(Clone, Default)]
+struct TestSource {
+    support: Option<FilterPushdown>,
+    predicate: Option<PhysicalExprRef>,
+    statistics: Option<Statistics>,
+}
+
+impl TestSource {
+    fn new(support: Option<FilterPushdown>) -> Self {
+        Self {
+            support,
+            predicate: None,
+            statistics: None,
+        }
+    }
+}
+
+impl FileSource for TestSource {
+    fn create_file_opener(
+        &self,
+        _object_store: Arc<dyn ObjectStore>,
+        _base_config: &FileScanConfig,
+        _partition: usize,
+    ) -> Arc<dyn FileOpener> {
+        todo!("should not be called")
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        todo!("should not be called")
+    }
+
+    fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
+        todo!("should not be called")
+    }
+
+    fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
+        todo!("should not be called")
+    }
+
+    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> 
{
+        todo!("should not be called")
+    }
+
+    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
+        Arc::new(TestSource {
+            statistics: Some(statistics),
+            ..self.clone()
+        })
+    }
+
+    fn metrics(&self) -> &ExecutionPlanMetricsSet {
+        todo!("should not be called")
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        Ok(self
+            .statistics
+            .as_ref()
+            .expect("statistics not set")
+            .clone())
+    }
+
+    fn file_type(&self) -> &str {
+        "test"
+    }
+
+    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                let predicate_string = self
+                    .predicate
+                    .as_ref()
+                    .map(|p| format!(", predicate={p}"))
+                    .unwrap_or_default();
+
+                write!(f, "{}", predicate_string)
+            }
+            DisplayFormatType::TreeRender => {
+                if let Some(predicate) = &self.predicate {
+                    writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
+                }
+                Ok(())
+            }
+        }
+    }
+
+    fn try_pushdown_filters(
+        &self,
+        parent_filters: &[PhysicalExprRef],
+        config: &ConfigOptions,
+    ) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> {
+        let support = match self.support {
+            Some(support) => support,
+            None => {
+                if config.execution.parquet.pushdown_filters {
+                    FilterPushdown::Exact
+                } else {
+                    FilterPushdown::Inexact
+                }
+            }
+        };
+        let new = Arc::new(TestSource {
+            support: self.support,
+            predicate: 
Some(conjunction(parent_filters.iter().map(Arc::clone))),
+            statistics: self.statistics.clone(),
+        });
+        Ok(FilterPushdownResult::Pushed {
+            updated: new,
+            support: vec![support; parent_filters.len()],
+        })
+    }
+}
+
+fn test_scan(support: Option<FilterPushdown>) -> Arc<dyn ExecutionPlan> {
+    let schema = schema();
+    let source = Arc::new(TestSource::new(support));
+    let base_config = FileScanConfigBuilder::new(
+        ObjectStoreUrl::parse("test://").unwrap(),
+        Arc::clone(schema),
+        source,
+    )
+    .build();
+    DataSourceExec::from_data_source(base_config)
+}
+
+#[test]
+fn test_pushdown_into_scan() {
+    let scan = test_scan(Some(FilterPushdown::Exact));
+    let predicate = col_lit_predicate("a", "foo", schema());
+    let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap());
+
+    // expect the predicate to be pushed down into the DataSource
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownFilter{}),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = foo
+        -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test
+      output:
+        Ok:
+          - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test, predicate=a@0 = foo
+    "
+    );
+}
+
+/// Show that we can use config options to determine how to do pushdown.
+#[test]
+fn test_pushdown_into_scan_with_config_options() {
+    let scan = test_scan(None);
+    let predicate = col_lit_predicate("a", "foo", schema());
+    let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()) as _;
+
+    let mut cfg = ConfigOptions::default();
+    cfg.execution.parquet.pushdown_filters = false;
+    insta::assert_snapshot!(
+        OptimizationTest::new_with_config(
+            Arc::clone(&plan),
+            PushdownFilter {},
+            &cfg
+        ),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = foo
+        -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test
+      output:
+        Ok:
+          - FilterExec: a@0 = foo
+          -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, 
c], file_type=test, predicate=a@0 = foo
+    "
+    );
+
+    cfg.execution.parquet.pushdown_filters = true;
+    insta::assert_snapshot!(
+        OptimizationTest::new_with_config(
+            plan,
+            PushdownFilter {},
+            &cfg
+        ),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = foo
+        -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test
+      output:
+        Ok:
+          - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test, predicate=a@0 = foo
+    "
+    );
+}
+
+#[test]
+fn test_filter_collapse() {
+    // filter should be pushed down into the parquet scan with two filters
+    let scan = test_scan(Some(FilterPushdown::Exact));
+    let predicate1 = col_lit_predicate("a", "foo", schema());
+    let filter1 = Arc::new(FilterExec::try_new(predicate1, scan).unwrap());
+    let predicate2 = col_lit_predicate("b", "bar", schema());
+    let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap());
+
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownFilter{}),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: b@1 = bar
+        -   FilterExec: a@0 = foo
+        -     DataSourceExec: file_groups={0 groups: []}, projection=[a, b, 
c], file_type=test
+      output:
+        Ok:
+          - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test, predicate=b@1 = bar AND a@0 = foo
+    "
+    );
+}
+
+#[test]
+fn test_filter_with_projection() {
+    let scan = test_scan(Some(FilterPushdown::Exact));
+    let projection = vec![1, 0];
+    let projected_schema = Arc::new(schema().project(&projection).unwrap());
+    let predicate = col_lit_predicate("a", "foo", &projected_schema);
+    let plan = Arc::new(
+        FilterExec::try_new(predicate, Arc::clone(&scan))
+            .unwrap()
+            .with_projection(Some(projection))
+            .unwrap(),
+    );
+    // expect the predicate to be pushed down into the DataSource but the 
FilterExec to be kept for its projection
+    // the pushed down filters should have their indices adjusted
+
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownFilter{}),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@1 = foo, projection=[b@1, a@0]
+        -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test
+      output:
+        Ok:
+          - FilterExec: true, projection=[b@1, a@0]
+          -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, 
c], file_type=test, predicate=a@0 = foo
+    ",
+    );
+
+    // add a test where the filter is on a column that isn't included in the 
output
+    let projection = vec![1];
+    let predicate = col_lit_predicate("a", "foo", schema());
+    let plan = Arc::new(
+        FilterExec::try_new(predicate, scan)
+            .unwrap()
+            .with_projection(Some(projection))
+            .unwrap(),
+    );
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownFilter{}),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = foo, projection=[b@1]
+        -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test
+      output:
+        Ok:
+          - FilterExec: true, projection=[b@1]
+          -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, 
c], file_type=test, predicate=a@0 = foo
+    "
+    );
+}
+
+#[test]
+fn test_push_down_through_transparent_nodes() {
+    // expect the predicate to be pushed down into the DataSource
+    let scan = test_scan(Some(FilterPushdown::Exact));
+    let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 1));
+    let predicate = col_lit_predicate("a", "foo", schema());
+    let filter = Arc::new(FilterExec::try_new(predicate, coalesce).unwrap());
+    let repartition = Arc::new(
+        RepartitionExec::try_new(filter, 
Partitioning::RoundRobinBatch(1)).unwrap(),
+    );
+    let predicate = col_lit_predicate("a", "bar", schema());
+    let plan = Arc::new(FilterExec::try_new(predicate, repartition).unwrap());
+
+    // expect the predicate to be pushed down into the DataSource
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownFilter{}),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = bar
+        -   RepartitionExec: partitioning=RoundRobinBatch(1), 
input_partitions=0
+        -     FilterExec: a@0 = foo
+        -       CoalesceBatchesExec: target_batch_size=1
+        -         DataSourceExec: file_groups={0 groups: []}, projection=[a, 
b, c], file_type=test
+      output:
+        Ok:
+          - RepartitionExec: partitioning=RoundRobinBatch(1), 
input_partitions=0
+          -   CoalesceBatchesExec: target_batch_size=1
+          -     DataSourceExec: file_groups={0 groups: []}, projection=[a, b, 
c], file_type=test, predicate=a@0 = bar AND a@0 = foo
+    "
+    );
+}
+
+#[test]
+fn test_no_pushdown_through_aggregates() {
+    // There are 2 important points here:
+    // 1. The outer filter is not pushed down into the aggregate because we 
haven't
+    //    implemented that yet.
+    // 2. The inner filter **is** pushed down into the DataSource.
+    let scan = test_scan(Some(FilterPushdown::Exact));
+    let filter = Arc::new(
+        FilterExec::try_new(col_lit_predicate("a", "foo", schema()), 
scan.clone())
+            .unwrap(),
+    );
+    let aggregate_expr =
+        vec![
+            AggregateExprBuilder::new(count_udaf(), vec![col("a", 
schema()).unwrap()])
+                .schema(Arc::clone(schema()))
+                .alias("cnt")
+                .build()
+                .map(Arc::new)
+                .unwrap(),
+        ];
+    let group_by = PhysicalGroupBy::new_single(vec![
+        (col("a", schema()).unwrap(), "a".to_string()),
+        (col("b", schema()).unwrap(), "b".to_string()),
+    ]);
+    let aggregate = Arc::new(
+        AggregateExec::try_new(
+            AggregateMode::Final,
+            group_by,
+            aggregate_expr.clone(),
+            vec![None],
+            filter,
+            Arc::clone(schema()),
+        )
+        .unwrap(),
+    );
+    let predicate = col_lit_predicate("a", "foo", schema());
+    let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
+
+    // expect the predicate to be pushed down into the DataSource
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownFilter{}),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = foo
+        -   AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], 
ordering_mode=PartiallySorted([0])
+        -     FilterExec: a@0 = foo
+        -       DataSourceExec: file_groups={0 groups: []}, projection=[a, b, 
c], file_type=test
+      output:
+        Ok:
+          - FilterExec: a@0 = foo
+          -   AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt]
+          -     DataSourceExec: file_groups={0 groups: []}, projection=[a, b, 
c], file_type=test, predicate=a@0 = foo
+    "
+    );
+}
+
+/// Schema:
+/// a: String
+/// b: String
+/// c: f64
+static TEST_SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
+
+fn schema() -> &'static SchemaRef {
+    TEST_SCHEMA.get_or_init(|| {
+        let fields = vec![
+            Field::new("a", DataType::Utf8, false),
+            Field::new("b", DataType::Utf8, false),
+            Field::new("c", DataType::Float64, false),
+        ];
+        Arc::new(Schema::new(fields))
+    })
+}
+
+/// Returns a predicate that is a binary expression col = lit
+fn col_lit_predicate(
+    column_name: &str,
+    scalar_value: impl Into<ScalarValue>,
+    schema: &Schema,
+) -> Arc<dyn PhysicalExpr> {
+    let scalar_value = scalar_value.into();
+    Arc::new(BinaryExpr::new(
+        Arc::new(Column::new_with_schema(column_name, schema).unwrap()),
+        Operator::Eq,
+        Arc::new(Literal::new(scalar_value)),
+    ))
+}
+
+/// A harness for testing physical optimizers.
+///
+/// You can use this to test the output of a physical optimizer rule using 
insta snapshots
+#[derive(Debug)]
+pub struct OptimizationTest {
+    input: Vec<String>,
+    output: Result<Vec<String>, String>,
+}
+
+impl OptimizationTest {
+    pub fn new<O>(input_plan: Arc<dyn ExecutionPlan>, opt: O) -> Self
+    where
+        O: PhysicalOptimizerRule,
+    {
+        Self::new_with_config(input_plan, opt, &ConfigOptions::default())
+    }
+
+    pub fn new_with_config<O>(
+        input_plan: Arc<dyn ExecutionPlan>,
+        opt: O,
+        config: &ConfigOptions,
+    ) -> Self
+    where
+        O: PhysicalOptimizerRule,
+    {
+        let input = format_execution_plan(&input_plan);
+
+        let input_schema = input_plan.schema();
+
+        let output_result = opt.optimize(input_plan, config);
+        let output = output_result
+            .and_then(|plan| {
+                if opt.schema_check() && (plan.schema() != input_schema) {
+                    internal_err!(
+                        "Schema mismatch:\n\nBefore:\n{:?}\n\nAfter:\n{:?}",
+                        input_schema,
+                        plan.schema()
+                    )
+                } else {
+                    Ok(plan)
+                }
+            })
+            .map(|plan| format_execution_plan(&plan))
+            .map_err(|e| e.to_string());
+
+        Self { input, output }
+    }
+}
+
+impl Display for OptimizationTest {

Review Comment:
   This output format is super clear and legible!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to