alamb commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2031897452


##########
datafusion/physical-optimizer/src/filter_pushdown.rs:
##########
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion_common::{config::ConfigOptions, DataFusionError, Result};
+use datafusion_physical_plan::{
+    execution_plan::ExecutionPlanFilterPushdownResult, ExecutionPlan,
+};
+
+use crate::PhysicalOptimizerRule;
+
+/// A physical optimizer rule that pushes down filters in the execution plan.
+/// See [`ExecutionPlan::try_pushdown_filters`] for a detailed description of 
the algorithm.
+#[derive(Debug)]
+pub struct PushdownFilter {}

Review Comment:
   this rule is a thing of beauty



##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -0,0 +1,508 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::{
+    datasource::object_store::ObjectStoreUrl,
+    logical_expr::Operator,
+    physical_plan::{
+        expressions::{BinaryExpr, Column, Literal},
+        PhysicalExpr,
+    },
+    scalar::ScalarValue,
+};
+use datafusion_common::{config::ConfigOptions, Statistics};
+use datafusion_common::{internal_err, Result};
+use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_datasource::{
+    file::{FileSource, FileSourceFilterPushdownResult},
+    file_scan_config::FileScanConfig,
+    file_stream::FileOpener,
+};
+use datafusion_expr::test::function_stub::count_udaf;
+use datafusion_physical_expr::expressions::col;
+use datafusion_physical_expr::{
+    aggregate::AggregateExprBuilder, conjunction, Partitioning, 
PhysicalExprRef,
+};
+use datafusion_physical_expr_common::physical_expr::fmt_sql;
+use datafusion_physical_optimizer::filter_pushdown::PushdownFilter;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::{
+    aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
+    coalesce_batches::CoalesceBatchesExec,
+    filter::FilterExec,
+    repartition::RepartitionExec,
+};
+use datafusion_physical_plan::{
+    displayable, filter_pushdown::FilterPushdownSupport,
+    metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan,
+};
+use object_store::ObjectStore;
+use std::sync::{Arc, OnceLock};
+use std::{
+    any::Any,
+    fmt::{Display, Formatter},
+};
+
+/// A placeholder data source that accepts filter pushdown
+#[derive(Clone, Default)]
+struct TestSource {
+    support: Option<FilterPushdownSupport>,
+    predicate: Option<PhysicalExprRef>,
+    statistics: Option<Statistics>,
+}
+
+impl TestSource {
+    fn new(support: Option<FilterPushdownSupport>) -> Self {
+        Self {
+            support,
+            predicate: None,
+            statistics: None,
+        }
+    }
+}
+
+impl FileSource for TestSource {
+    fn create_file_opener(
+        &self,
+        _object_store: Arc<dyn ObjectStore>,
+        _base_config: &FileScanConfig,
+        _partition: usize,
+    ) -> Arc<dyn FileOpener> {
+        todo!("should not be called")
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        todo!("should not be called")
+    }
+
+    fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
+        todo!("should not be called")
+    }
+
+    fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
+        todo!("should not be called")
+    }
+
+    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> 
{
+        todo!("should not be called")
+    }
+
+    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
+        Arc::new(TestSource {
+            statistics: Some(statistics),
+            ..self.clone()
+        })
+    }
+
+    fn metrics(&self) -> &ExecutionPlanMetricsSet {
+        todo!("should not be called")
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        Ok(self
+            .statistics
+            .as_ref()
+            .expect("statistics not set")
+            .clone())
+    }
+
+    fn file_type(&self) -> &str {
+        "test"
+    }
+
+    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                let predicate_string = self
+                    .predicate
+                    .as_ref()
+                    .map(|p| format!(", predicate={p}"))
+                    .unwrap_or_default();
+
+                write!(f, "{}", predicate_string)
+            }
+            DisplayFormatType::TreeRender => {
+                if let Some(predicate) = &self.predicate {
+                    writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
+                }
+                Ok(())
+            }
+        }
+    }
+
+    fn try_pushdown_filters(
+        &self,
+        filters: &[PhysicalExprRef],
+        config: &ConfigOptions,
+    ) -> Result<FileSourceFilterPushdownResult> {
+        let support = match self.support {
+            Some(support) => support,
+            None => {
+                if config.execution.parquet.pushdown_filters {
+                    FilterPushdownSupport::Exact
+                } else {
+                    FilterPushdownSupport::Unsupported
+                }
+            }
+        };
+        let new = Arc::new(TestSource {
+            support: self.support,
+            predicate: Some(conjunction(filters.iter().map(Arc::clone))),
+            statistics: self.statistics.clone(),
+        });
+        Ok(FileSourceFilterPushdownResult::new(
+            new,
+            vec![support; filters.len()],
+        ))
+    }
+}
+
+fn test_scan(support: Option<FilterPushdownSupport>) -> Arc<dyn ExecutionPlan> 
{
+    let schema = schema();
+    let source = Arc::new(TestSource::new(support));
+    let base_config = FileScanConfigBuilder::new(
+        ObjectStoreUrl::parse("test://").unwrap(),
+        Arc::clone(schema),
+        source,
+    )
+    .build();
+    DataSourceExec::from_data_source(base_config)
+}
+
+#[test]
+fn test_pushdown_into_scan() {
+    let scan = test_scan(Some(FilterPushdownSupport::Exact));
+    let predicate = col_lit_predicate("a", "foo", schema());
+    let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap());
+
+    // expect the predicate to be pushed down into the DataSource
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownFilter{}),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = foo
+        -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test
+      output:
+        Ok:
+          - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test, predicate=a@0 = foo
+    "
+    );
+}
+
+/// Show that we can use config options to determine how to do pushdown.
+#[test]
+fn test_pushdown_into_scan_with_config_options() {
+    let scan = test_scan(None);
+    let predicate = col_lit_predicate("a", "foo", schema());
+    let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()) as _;
+
+    let mut cfg = ConfigOptions::default();
+    cfg.execution.parquet.pushdown_filters = false;
+    insta::assert_snapshot!(
+        OptimizationTest::new_with_config(
+            Arc::clone(&plan),
+            PushdownFilter {},
+            &cfg
+        ),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = foo
+        -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test
+      output:
+        Ok:
+          - FilterExec: a@0 = foo
+          -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, 
c], file_type=test, predicate=a@0 = foo
+    "
+    );
+
+    cfg.execution.parquet.pushdown_filters = true;
+    insta::assert_snapshot!(
+        OptimizationTest::new_with_config(
+            plan,
+            PushdownFilter {},
+            &cfg
+        ),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = foo
+        -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test
+      output:
+        Ok:
+          - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test, predicate=a@0 = foo
+    "
+    );
+}
+
+#[test]
+fn test_filter_collapse() {
+    // filter should be pushed down into the parquet scan with two filters
+    let scan = test_scan(Some(FilterPushdownSupport::Exact));
+    let predicate1 = col_lit_predicate("a", "foo", schema());
+    let filter1 = Arc::new(FilterExec::try_new(predicate1, scan).unwrap());
+    let predicate2 = col_lit_predicate("b", "bar", schema());
+    let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap());
+
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownFilter{}),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: b@1 = bar
+        -   FilterExec: a@0 = foo
+        -     DataSourceExec: file_groups={0 groups: []}, projection=[a, b, 
c], file_type=test
+      output:
+        Ok:
+          - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test, predicate=b@1 = bar AND a@0 = foo
+    "
+    );
+}
+
+#[test]
+fn test_filter_with_projection() {
+    let scan = test_scan(Some(FilterPushdownSupport::Exact));
+    let projection = vec![1, 0];
+    let projected_schema = Arc::new(schema().project(&projection).unwrap());
+    let predicate = col_lit_predicate("a", "foo", &projected_schema);
+    let plan = Arc::new(
+        FilterExec::try_new(predicate, scan)
+            .unwrap()
+            .with_projection(Some(projection))
+            .unwrap(),
+    );
+    // expect the predicate to be pushed down into the DataSource but the 
FilterExec to be kept for its projection
+    // the pushed down filters should have their indices adjusted
+
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownFilter{}),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@1 = foo, projection=[b@1, a@0]
+        -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test
+      output:
+        Ok:
+          - FilterExec: true, projection=[b@1, a@0]
+          -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, 
c], file_type=test, predicate=a@0 = foo
+    ",
+    );
+}

Review Comment:
   I think we should also add a test where the filter is on a column that isn't 
included in the output -- which is one of the main usecases for the projection 
as I understand
   
   So that would be projection `b` and filter `a = 1`



##########
datafusion/physical-plan/src/filter_pushdown.rs:
##########
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Result of trying to push down fitlers to a child plan.
+/// This is used by [`FilterPushdownResult`] to indicate whether the filter was
+/// "absorbed" by the child ([`FilterPushdownSupport::Exact`]) or not
+/// ([`FilterPushdownSupport::Unsupported`]).
+/// If the filter was not absorbed, the parent plan must apply the filter
+/// itself, or return to the caller that it was not pushed down.
+/// If the filter was absorbed, the parent plan can drop the filter or
+/// tell the caller that it was pushed down by forwarding on the 
[`FilterPushdownSupport::Exact`]
+/// information.
+#[derive(Debug, Clone, Copy)]
+pub enum FilterPushdownSupport {
+    /// Filter may not have been pushed down to the child plan, or the child 
plan
+    /// can only partially apply the filter but may have false positives (but 
not false negatives).
+    /// In this case the parent **must** behave as if the filter was not 
pushed down
+    /// and must apply the filter itself.
+    Unsupported,
+    /// Filter was pushed down to the child plan and the child plan promises 
that
+    /// it will apply the filter correctly with no false positives or false 
negatives.
+    /// The parent can safely drop the filter.
+    Exact,
+}
+
+/// The combined result of a filter pushdown operation.
+/// This includes:
+/// * The inner plan that was produced by the pushdown operation.
+/// * The support for each filter that was pushed down.
+pub enum FilterPushdownResult<T> {
+    /// No pushdown was possible, keep this node as is in the tree.
+    NotPushed,
+    /// Pushed some or all filters into this node.
+    /// The caller should replace the node in the tree with the new one 
provided
+    /// and should transmit to parents the support for each filter.
+    Pushed {
+        /// The inner node that was produced by the pushdown operation.
+        inner: T,
+        /// The support for each filter that was pushed down.
+        support: Vec<FilterPushdownSupport>,
+    },
+}
+
+impl<T> FilterPushdownResult<T> {
+    /// Create a new [`FilterPushdownResult`].
+    pub fn new(inner: T, support: Vec<FilterPushdownSupport>) -> Self {
+        Self::Pushed { inner, support }
+    }

Review Comment:
   It would probably make more sense to have `new_pushed` and `new_not_pushed` 
here rather than new() that just made one variant 🤔 



##########
datafusion/physical-plan/src/filter_pushdown.rs:
##########
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Result of trying to push down fitlers to a child plan.
+/// This is used by [`FilterPushdownResult`] to indicate whether the filter was
+/// "absorbed" by the child ([`FilterPushdownSupport::Exact`]) or not
+/// ([`FilterPushdownSupport::Unsupported`]).
+/// If the filter was not absorbed, the parent plan must apply the filter
+/// itself, or return to the caller that it was not pushed down.
+/// If the filter was absorbed, the parent plan can drop the filter or
+/// tell the caller that it was pushed down by forwarding on the 
[`FilterPushdownSupport::Exact`]
+/// information.
+#[derive(Debug, Clone, Copy)]
+pub enum FilterPushdownSupport {
+    /// Filter may not have been pushed down to the child plan, or the child 
plan
+    /// can only partially apply the filter but may have false positives (but 
not false negatives).
+    /// In this case the parent **must** behave as if the filter was not 
pushed down
+    /// and must apply the filter itself.
+    Unsupported,

Review Comment:
   I found name "unsupported" enum  confusing at first as it meant that some 
filters could actually be pushed down, just not precisely



##########
datafusion/physical-plan/src/filter_pushdown.rs:
##########
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// The answer to the question: "Can this operator handle this filter itself?"
+/// Note that this is different from [`FilterPushdownAllowed`] which is the 
answer to "Can *this* plan handle this filter?"
+#[derive(Debug, Clone, Copy)]
+pub enum FilterPushdownSupport {
+    /// Filter may not have been pushed down to the child plan, or the child 
plan
+    /// can only partially apply the filter but may have false positives (but 
not false negatives).
+    /// In this case the parent **must** behave as if the filter was not 
pushed down
+    /// and must apply the filter itself.
+    Unsupported,
+    /// Filter was pushed down to the child plan and the child plan promises 
that
+    /// it will apply the filter correctly with no false positives or false 
negatives.
+    /// The parent can safely drop the filter.
+    Exact,
+}

Review Comment:
   While there is no difference in behavior now between `Inexact` and 
`Unsupported` I still think we should use the same three enums as I can see it 
being useful in the future to know the difference



##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -0,0 +1,322 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::{
+    datasource::object_store::ObjectStoreUrl,
+    logical_expr::Operator,
+    physical_plan::{
+        expressions::{BinaryExpr, Column, Literal},
+        PhysicalExpr,
+    },
+    scalar::ScalarValue,
+};
+use datafusion_common::internal_err;
+use datafusion_common::{config::ConfigOptions, Statistics};
+use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_datasource::{
+    file::{FileSource, FileSourceFilterPushdownResult},
+    file_scan_config::FileScanConfig,
+    file_stream::FileOpener,
+};
+use datafusion_physical_expr::{conjunction, PhysicalExprRef};
+use datafusion_physical_expr_common::physical_expr::fmt_sql;
+use datafusion_physical_optimizer::filter_pushdown::PushdownFilter;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::{
+    displayable, execution_plan::FilterSupport, 
metrics::ExecutionPlanMetricsSet,
+    DisplayFormatType, ExecutionPlan,
+};
+use object_store::ObjectStore;
+use std::sync::{Arc, OnceLock};
+use std::{
+    any::Any,
+    fmt::{Display, Formatter},
+};
+
+/// A placeholder data source that accepts filter pushdown
+#[derive(Clone)]
+struct TestSource {
+    support: FilterSupport,
+    predicate: Option<PhysicalExprRef>,
+    statistics: Option<Statistics>,
+}
+
+impl TestSource {
+    fn new(support: FilterSupport) -> Self {
+        Self {
+            support,
+            predicate: None,
+            statistics: None,
+        }
+    }
+}
+
+impl FileSource for TestSource {
+    fn create_file_opener(
+        &self,
+        _object_store: Arc<dyn ObjectStore>,
+        _base_config: &FileScanConfig,
+        _partition: usize,
+    ) -> Arc<dyn FileOpener> {
+        todo!("should not be called")
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        todo!("should not be called")
+    }
+
+    fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
+        todo!("should not be called")
+    }
+
+    fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
+        todo!("should not be called")
+    }
+
+    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> 
{
+        todo!("should not be called")
+    }
+
+    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
+        Arc::new(TestSource {
+            statistics: Some(statistics),
+            ..self.clone()
+        })
+    }
+
+    fn metrics(&self) -> &ExecutionPlanMetricsSet {
+        todo!("should not be called")
+    }
+
+    fn statistics(&self) -> datafusion_common::Result<Statistics> {
+        Ok(self
+            .statistics
+            .as_ref()
+            .expect("statistics not set")
+            .clone())
+    }
+
+    fn file_type(&self) -> &str {
+        "test"
+    }
+
+    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                let predicate_string = self
+                    .predicate
+                    .as_ref()
+                    .map(|p| format!(", predicate={p}"))
+                    .unwrap_or_default();
+
+                write!(f, "{}", predicate_string)
+            }
+            DisplayFormatType::TreeRender => {
+                if let Some(predicate) = &self.predicate {
+                    writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
+                }
+                Ok(())
+            }
+        }
+    }
+
+    fn push_down_filters(
+        &self,
+        filters: &[PhysicalExprRef],
+    ) -> datafusion_common::Result<Option<FileSourceFilterPushdownResult>> {
+        let new = Arc::new(TestSource {
+            support: self.support,
+            predicate: Some(conjunction(filters.iter().map(Arc::clone))),
+            statistics: self.statistics.clone(),
+        });
+        Ok(Some(FileSourceFilterPushdownResult::new(
+            new,
+            vec![self.support; filters.len()],
+        )))
+    }
+}
+
+fn test_scan(support: FilterSupport) -> Arc<dyn ExecutionPlan> {
+    let schema = schema();
+    let source = Arc::new(TestSource::new(support));
+    let base_config = FileScanConfigBuilder::new(
+        ObjectStoreUrl::parse("test://").unwrap(),
+        Arc::clone(schema),
+        source,
+    )
+    .build();
+    DataSourceExec::from_data_source(base_config)
+}
+
+#[test]
+fn test_pushdown_into_scan() {
+    let scan = test_scan(FilterSupport::HandledExact);
+    let predicate = col_lit_predicate("a", "foo", schema());
+    let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap());
+
+    // expect the predicate to be pushed down into the DataSource
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownFilter{}),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = foo
+        -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test
+      output:
+        Ok:
+          - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test, predicate=a@0 = foo
+    "
+    );
+}
+
+#[test]
+fn test_parquet_pushdown() {
+    // filter should be pushed down into the parquet scan with two filters
+    let scan = test_scan(FilterSupport::HandledExact);
+    let predicate1 = col_lit_predicate("a", "foo", schema());
+    let filter1 = Arc::new(FilterExec::try_new(predicate1, scan).unwrap());
+    let predicate2 = col_lit_predicate("b", "bar", schema());
+    let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap());
+
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownFilter{}),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: b@1 = bar
+        -   FilterExec: a@0 = foo
+        -     DataSourceExec: file_groups={0 groups: []}, projection=[a, b, 
c], file_type=test
+      output:
+        Ok:
+          - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test, predicate=a@0 = foo AND b@1 = bar
+    "
+    );
+}
+

Review Comment:
   I think this makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to