berkaysynnada commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2035176129
########## datafusion/core/tests/physical_optimizer/filter_pushdown.rs: ########## @@ -0,0 +1,529 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ + datasource::object_store::ObjectStoreUrl, + logical_expr::Operator, + physical_plan::{ + expressions::{BinaryExpr, Column, Literal}, + PhysicalExpr, + }, + scalar::ScalarValue, +}; +use datafusion_common::{config::ConfigOptions, Statistics}; +use datafusion_common::{internal_err, Result}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{ + file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, +}; +use datafusion_expr::test::function_stub::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{ + aggregate::AggregateExprBuilder, conjunction, Partitioning, PhysicalExprRef, +}; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_optimizer::filter_pushdown::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::filter_pushdown::{FilterPushdown, FilterPushdownResult}; +use datafusion_physical_plan::{ + aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, + coalesce_batches::CoalesceBatchesExec, + filter::FilterExec, + repartition::RepartitionExec, +}; +use datafusion_physical_plan::{ + displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan, +}; +use object_store::ObjectStore; +use std::sync::{Arc, OnceLock}; +use std::{ + any::Any, + fmt::{Display, Formatter}, +}; + +/// A placeholder data source that accepts filter pushdown +#[derive(Clone, Default)] +struct TestSource { + support: Option<FilterPushdown>, Review Comment: If I don't misunderstand, DataSource's or ExecutionPlan's don't store or keep this FilterPushdown in their internal states. That info just passes through the operators during the pushdown attempt. So, maybe we can remove this "support" field, and instead define 2 different TestSource one with support and one without support (or simply define a generic boolean). I suggest this to not misguide people considering this test as an example usage. ########## datafusion/core/tests/physical_optimizer/filter_pushdown.rs: ########## @@ -0,0 +1,529 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ + datasource::object_store::ObjectStoreUrl, + logical_expr::Operator, + physical_plan::{ + expressions::{BinaryExpr, Column, Literal}, + PhysicalExpr, + }, + scalar::ScalarValue, +}; +use datafusion_common::{config::ConfigOptions, Statistics}; +use datafusion_common::{internal_err, Result}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{ + file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, +}; +use datafusion_expr::test::function_stub::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{ + aggregate::AggregateExprBuilder, conjunction, Partitioning, PhysicalExprRef, +}; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_optimizer::filter_pushdown::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::filter_pushdown::{FilterPushdown, FilterPushdownResult}; +use datafusion_physical_plan::{ + aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, + coalesce_batches::CoalesceBatchesExec, + filter::FilterExec, + repartition::RepartitionExec, +}; +use datafusion_physical_plan::{ + displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan, +}; +use object_store::ObjectStore; +use std::sync::{Arc, OnceLock}; +use std::{ + any::Any, + fmt::{Display, Formatter}, +}; + +/// A placeholder data source that accepts filter pushdown +#[derive(Clone, Default)] +struct TestSource { + support: Option<FilterPushdown>, + predicate: Option<PhysicalExprRef>, + statistics: Option<Statistics>, +} + +impl TestSource { + fn new(support: Option<FilterPushdown>) -> Self { + Self { + support, + predicate: None, + statistics: None, + } + } +} + +impl FileSource for TestSource { + fn create_file_opener( + &self, + _object_store: Arc<dyn ObjectStore>, + _base_config: &FileScanConfig, + _partition: usize, + ) -> Arc<dyn FileOpener> { + todo!("should not be called") + } + + fn as_any(&self) -> &dyn Any { + todo!("should not be called") + } + + fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> { + todo!("should not be called") + } + + fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> { + todo!("should not be called") + } + + fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> { + todo!("should not be called") + } + + fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { + Arc::new(TestSource { + statistics: Some(statistics), + ..self.clone() + }) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + todo!("should not be called") + } + + fn statistics(&self) -> Result<Statistics> { + Ok(self + .statistics + .as_ref() + .expect("statistics not set") + .clone()) + } + + fn file_type(&self) -> &str { + "test" + } + + fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let predicate_string = self + .predicate + .as_ref() + .map(|p| format!(", predicate={p}")) + .unwrap_or_default(); + + write!(f, "{}", predicate_string) + } + DisplayFormatType::TreeRender => { + if let Some(predicate) = &self.predicate { + writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; + } + Ok(()) + } + } + } + + fn try_pushdown_filters( + &self, + filters: &[PhysicalExprRef], Review Comment: for convenience and consistency, can we rename this as parent_filters as well? ########## datafusion/physical-plan/src/execution_plan.rs: ########## @@ -467,6 +468,439 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Result<Option<Arc<dyn ExecutionPlan>>> { Ok(None) } + + /// Attempts to recursively push given filters into this `ExecutionPlan` or + /// its children and push any filters from this node into its children. + /// + /// This is used to implement filter pushdown in the physical plan. Note + /// that DataFusion also implements filter pushdown in the logical plan, + /// which is a different code path. This method is here to support + /// additional optimizations that may be only be possible in the physical + /// plan such as dynamic filtering (see below). + /// + /// See [`try_pushdown_filters_to_input`] for a simple implementation + /// + /// # Arguments + /// * `plan`: `Arc`d instance of self + /// * `parent_filters`: A vector of [`PhysicalExpr`]s from the parent of this node + /// to try and push down + /// * `config`: The configuration options for execution + /// + /// # Default Implementation + /// + /// The default implementation assumes: + /// * Parent filters can't be passed onto children. + /// * This node has no filters to contribute. + /// + /// Note the default implementation still recurses into children to + /// recursively call `try_pushdown_filters` on subtrees that may have + /// filters to pushdown. + /// + /// # Example: Push filter into a `DataSourceExec` + /// + /// For example, consider the following plan: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ CoalesceBatchesExec │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = [ id=1] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// ``` + /// + /// Our goal is to move the `id = 1` filter from the [`FilterExec`] node to the `DataSourceExec` node. + /// + /// If this filter is selective pushing it into the scan can avoid massive + /// amounts of data being read from the source (the projection is `*` so all + /// matching columns are read). + /// + /// In this simple case we: + /// 1. Enter the recursion with no filters. + /// 2. We find the [`FilterExec`] node and call [`ExecutionPlan::try_pushdown_filters`] on it. + /// 3. The [`FilterExec`] node tries to push it's filters + the filters from the parent nodes (in this case empty) + /// down into it's input, which is the `DataSourceExec` node. + /// 4. The `DataSourceExec` node accepts the filter and returns a [`FilterPushdownResult`] with a new copy of itself + /// and [`FilterPushdown::Exact`] to indicate that the filter was pushed down and the caller no longer + /// needs to handle it. + /// 5. The [`FilterExec`] seeing that all filters were pushed down returns a [`FilterPushdownResult`] that directly + /// returns the new `DataSourceExec` node, effectively removing the [`FilterExec`] node from the plan. + /// + /// The new plan looks like: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ CoalesceBatchesExec │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// │ filters = [ id=1] │ + /// └──────────────────────┘ + /// ``` + /// + /// # Example: Push filters with `ProjectionExec` + /// + /// Let's consider a more complex example involving a [`ProjectionExec`] + /// node in between the [`FilterExec`] and `DataSourceExec` nodes that + /// creates a new column that the filter depends on. + /// + /// ```text + /// ┌──────────────────────┐ + /// │ CoalesceBatchesExec │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = │ + /// │ [cost>50,id=1] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ ProjectionExec │ + /// │ cost = price * 1.2 │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// ``` + /// + /// We want to push down the filters `[id=1]` to the `DataSourceExec` node, + /// but can't push down `cost>50` because it requires the [`ProjectionExec`] + /// node to be executed first. A simple thing to do would be to split up the + /// filter into two separate filters and push down the first one: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ CoalesceBatchesExec │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = │ + /// │ [cost>50] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ ProjectionExec │ + /// │ cost = price * 1.2 │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// │ filters = [ id=1] │ + /// └──────────────────────┘ + /// ``` + /// + /// We can actually however do better by pushing down `price * 1.2 > 50` + /// instead of `cost > 50`: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ CoalesceBatchesExec │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ ProjectionExec │ + /// │ cost = price * 1.2 │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// │ filters = [id=1, │ + /// │ price * 1.2 > 50] │ + /// └──────────────────────┘ + /// ``` + /// + /// # Example: Push filters within a subtree + /// + /// There are also cases where we may be able to push down filters within a + /// subtree but not the entire tree. A good example of this is aggregation + /// nodes: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ ProjectionExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = [sum > 10] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌───────────────────────┐ + /// │ AggregateExec │ + /// │ group by = [id] │ + /// │ aggregate = │ + /// │ [sum(price)] │ + /// └───────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = [id=1] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// ``` + /// + /// The transformation here is to push down the `id=1` filter to the + /// `DataSourceExec` node: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ ProjectionExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = [sum > 10] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌───────────────────────┐ + /// │ AggregateExec │ + /// │ group by = [id] │ + /// │ aggregate = │ + /// │ [sum(price)] │ + /// └───────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// │ filters = [id=1] │ + /// └──────────────────────┘ + /// ``` + /// + /// The point here is that: + /// 1. We cannot push down `sum > 10` through the `AggregateExec` node into the `DataSourceExec` node. + /// Any filters above the `AggregateExec` node are not pushed down. + /// This is determined by calling [`ExecutionPlan::try_pushdown_filters`] on the [`AggregateExec`] node. + /// 2. We need to keep recursing into the tree so that we can discover the other [`FilterExec`] node and push + /// down the `id=1` filter. + /// + /// # Example: Push filters through Joins + /// + /// It is also possible to push down filters through joins and filters that + /// originate from joins. For example, a hash join where we build a hash + /// table of the left side and probe the right side (ignoring why we would + /// choose this order, typically it depends on the size of each table, + /// etc.). + /// + /// ```text + /// ┌─────────────────────┐ + /// │ FilterExec │ + /// │ filters = │ + /// │ [d.size > 100] │ + /// └─────────────────────┘ + /// │ + /// │ + /// ┌──────────▼──────────┐ + /// │ │ + /// │ HashJoinExec │ + /// │ [u.dept@hash(d.id)] │ + /// │ │ + /// └─────────────────────┘ + /// │ + /// ┌────────────┴────────────┐ + /// ┌──────────▼──────────┐ ┌──────────▼──────────┐ + /// │ DataSourceExec │ │ DataSourceExec │ + /// │ alias [users as u] │ │ alias [dept as d] │ + /// │ │ │ │ + /// └─────────────────────┘ └─────────────────────┘ + /// ``` + /// + /// There are two pushdowns we can do here: + /// 1. Push down the `d.size > 100` filter through the `HashJoinExec` node to the `DataSourceExec` + /// node for the `departments` table. + /// 2. Push down the hash table state from the `HashJoinExec` node to the `DataSourceExec` node to avoid reading + /// rows from the `users` table that will be eliminated by the join. + /// This can be done via a bloom filter or similar and is not (yet) supported + /// in DataFusion. See <https://github.com/apache/datafusion/issues/7955>. + /// + /// ```text + /// ┌─────────────────────┐ + /// │ │ + /// │ HashJoinExec │ + /// │ [u.dept@hash(d.id)] │ + /// │ │ + /// └─────────────────────┘ + /// │ + /// ┌────────────┴────────────┐ + /// ┌──────────▼──────────┐ ┌──────────▼──────────┐ + /// │ DataSourceExec │ │ DataSourceExec │ + /// │ alias [users as u] │ │ alias [dept as d] │ + /// │ filters = │ │ filters = │ + /// │ [depg@hash(d.id)] │ │ [ d.size > 100] │ + /// └─────────────────────┘ └─────────────────────┘ + /// ``` + /// + /// You may notice in this case that the filter is *dynamic*: the hash table + /// is built _after_ the `departments` table is read and at runtime. We + /// don't have a concrete `InList` filter or similar to push down at + /// optimization time. These sorts of dynamic filters are handled by + /// building a specialized [`PhysicalExpr`] that can be evaluated at runtime + /// and internally maintains a reference to the hash table or other state. + /// + /// To make working with these sorts of dynamic filters more tractable we have the method `PhysicalExpr::snapshot` + /// (TODO: add reference after <https://github.com/apache/datafusion/pull/15568> is merged) + /// which attempts to simplify a dynamic filter into a "basic" non-dynamic filter. + /// For a join this could mean converting it to an `InList` filter or a min/max filter for example. + /// See `datafusion/physical-plan/src/dynamic_filters.rs` for more details. + /// + /// # Example: Push TopK filters into Scans + /// + /// Another form of dynamic filter is pushing down the state of a `TopK` + /// operator for queries like `SELECT * FROM t ORDER BY id LIMIT 10`: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ TopK │ + /// │ limit = 10 │ + /// │ order by = [id] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// ``` + /// + /// We can avoid large amounts of data processing by transforming this into: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ TopK │ + /// │ limit = 10 │ + /// │ order by = [id] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// │ filters = │ + /// │ [id < @ TopKHeap] │ + /// └──────────────────────┘ + /// ``` + /// + /// Now as we fill our `TopK` heap we can push down the state of the heap to + /// the `DataSourceExec` node to avoid reading files / row groups / pages / + /// rows that could not possibly be in the top 10. + /// + /// This is not yet implemented in DataFusion. See + /// <https://github.com/apache/datafusion/issues/15037> + /// + /// [`FilterExec`]: crate::filter::FilterExec + /// [`ProjectionExec`]: crate::projection::ProjectionExec + /// [`AggregateExec`]: crate::aggregates::AggregateExec Review Comment: Perfect docs, really helps people a lot ########## datafusion/physical-plan/src/execution_plan.rs: ########## @@ -467,6 +468,439 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Result<Option<Arc<dyn ExecutionPlan>>> { Ok(None) } + + /// Attempts to recursively push given filters into this `ExecutionPlan` or + /// its children and push any filters from this node into its children. + /// + /// This is used to implement filter pushdown in the physical plan. Note + /// that DataFusion also implements filter pushdown in the logical plan, + /// which is a different code path. This method is here to support + /// additional optimizations that may be only be possible in the physical + /// plan such as dynamic filtering (see below). + /// + /// See [`try_pushdown_filters_to_input`] for a simple implementation + /// + /// # Arguments + /// * `plan`: `Arc`d instance of self + /// * `parent_filters`: A vector of [`PhysicalExpr`]s from the parent of this node + /// to try and push down + /// * `config`: The configuration options for execution + /// + /// # Default Implementation + /// + /// The default implementation assumes: + /// * Parent filters can't be passed onto children. + /// * This node has no filters to contribute. + /// + /// Note the default implementation still recurses into children to + /// recursively call `try_pushdown_filters` on subtrees that may have + /// filters to pushdown. + /// + /// # Example: Push filter into a `DataSourceExec` + /// + /// For example, consider the following plan: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ CoalesceBatchesExec │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = [ id=1] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// ``` + /// + /// Our goal is to move the `id = 1` filter from the [`FilterExec`] node to the `DataSourceExec` node. + /// + /// If this filter is selective pushing it into the scan can avoid massive + /// amounts of data being read from the source (the projection is `*` so all + /// matching columns are read). + /// + /// In this simple case we: + /// 1. Enter the recursion with no filters. + /// 2. We find the [`FilterExec`] node and call [`ExecutionPlan::try_pushdown_filters`] on it. + /// 3. The [`FilterExec`] node tries to push it's filters + the filters from the parent nodes (in this case empty) + /// down into it's input, which is the `DataSourceExec` node. + /// 4. The `DataSourceExec` node accepts the filter and returns a [`FilterPushdownResult`] with a new copy of itself + /// and [`FilterPushdown::Exact`] to indicate that the filter was pushed down and the caller no longer + /// needs to handle it. + /// 5. The [`FilterExec`] seeing that all filters were pushed down returns a [`FilterPushdownResult`] that directly + /// returns the new `DataSourceExec` node, effectively removing the [`FilterExec`] node from the plan. + /// + /// The new plan looks like: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ CoalesceBatchesExec │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// │ filters = [ id=1] │ + /// └──────────────────────┘ + /// ``` + /// + /// # Example: Push filters with `ProjectionExec` + /// + /// Let's consider a more complex example involving a [`ProjectionExec`] + /// node in between the [`FilterExec`] and `DataSourceExec` nodes that + /// creates a new column that the filter depends on. + /// + /// ```text + /// ┌──────────────────────┐ + /// │ CoalesceBatchesExec │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = │ + /// │ [cost>50,id=1] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ ProjectionExec │ + /// │ cost = price * 1.2 │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// ``` + /// + /// We want to push down the filters `[id=1]` to the `DataSourceExec` node, + /// but can't push down `cost>50` because it requires the [`ProjectionExec`] + /// node to be executed first. A simple thing to do would be to split up the + /// filter into two separate filters and push down the first one: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ CoalesceBatchesExec │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = │ + /// │ [cost>50] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ ProjectionExec │ + /// │ cost = price * 1.2 │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// │ filters = [ id=1] │ + /// └──────────────────────┘ + /// ``` + /// + /// We can actually however do better by pushing down `price * 1.2 > 50` + /// instead of `cost > 50`: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ CoalesceBatchesExec │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ ProjectionExec │ + /// │ cost = price * 1.2 │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// │ filters = [id=1, │ + /// │ price * 1.2 > 50] │ + /// └──────────────────────┘ + /// ``` + /// + /// # Example: Push filters within a subtree + /// + /// There are also cases where we may be able to push down filters within a + /// subtree but not the entire tree. A good example of this is aggregation + /// nodes: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ ProjectionExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = [sum > 10] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌───────────────────────┐ + /// │ AggregateExec │ + /// │ group by = [id] │ + /// │ aggregate = │ + /// │ [sum(price)] │ + /// └───────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = [id=1] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// ``` + /// + /// The transformation here is to push down the `id=1` filter to the + /// `DataSourceExec` node: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ ProjectionExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = [sum > 10] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌───────────────────────┐ + /// │ AggregateExec │ + /// │ group by = [id] │ + /// │ aggregate = │ + /// │ [sum(price)] │ + /// └───────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// │ filters = [id=1] │ + /// └──────────────────────┘ + /// ``` + /// + /// The point here is that: + /// 1. We cannot push down `sum > 10` through the `AggregateExec` node into the `DataSourceExec` node. + /// Any filters above the `AggregateExec` node are not pushed down. + /// This is determined by calling [`ExecutionPlan::try_pushdown_filters`] on the [`AggregateExec`] node. + /// 2. We need to keep recursing into the tree so that we can discover the other [`FilterExec`] node and push + /// down the `id=1` filter. + /// + /// # Example: Push filters through Joins + /// + /// It is also possible to push down filters through joins and filters that + /// originate from joins. For example, a hash join where we build a hash + /// table of the left side and probe the right side (ignoring why we would + /// choose this order, typically it depends on the size of each table, + /// etc.). + /// + /// ```text + /// ┌─────────────────────┐ + /// │ FilterExec │ + /// │ filters = │ + /// │ [d.size > 100] │ + /// └─────────────────────┘ + /// │ + /// │ + /// ┌──────────▼──────────┐ + /// │ │ + /// │ HashJoinExec │ + /// │ [u.dept@hash(d.id)] │ + /// │ │ + /// └─────────────────────┘ + /// │ + /// ┌────────────┴────────────┐ + /// ┌──────────▼──────────┐ ┌──────────▼──────────┐ + /// │ DataSourceExec │ │ DataSourceExec │ + /// │ alias [users as u] │ │ alias [dept as d] │ + /// │ │ │ │ + /// └─────────────────────┘ └─────────────────────┘ + /// ``` + /// + /// There are two pushdowns we can do here: + /// 1. Push down the `d.size > 100` filter through the `HashJoinExec` node to the `DataSourceExec` + /// node for the `departments` table. + /// 2. Push down the hash table state from the `HashJoinExec` node to the `DataSourceExec` node to avoid reading + /// rows from the `users` table that will be eliminated by the join. + /// This can be done via a bloom filter or similar and is not (yet) supported + /// in DataFusion. See <https://github.com/apache/datafusion/issues/7955>. + /// + /// ```text + /// ┌─────────────────────┐ + /// │ │ + /// │ HashJoinExec │ + /// │ [u.dept@hash(d.id)] │ + /// │ │ + /// └─────────────────────┘ + /// │ + /// ┌────────────┴────────────┐ + /// ┌──────────▼──────────┐ ┌──────────▼──────────┐ + /// │ DataSourceExec │ │ DataSourceExec │ + /// │ alias [users as u] │ │ alias [dept as d] │ + /// │ filters = │ │ filters = │ + /// │ [depg@hash(d.id)] │ │ [ d.size > 100] │ + /// └─────────────────────┘ └─────────────────────┘ + /// ``` + /// + /// You may notice in this case that the filter is *dynamic*: the hash table + /// is built _after_ the `departments` table is read and at runtime. We + /// don't have a concrete `InList` filter or similar to push down at + /// optimization time. These sorts of dynamic filters are handled by + /// building a specialized [`PhysicalExpr`] that can be evaluated at runtime + /// and internally maintains a reference to the hash table or other state. + /// + /// To make working with these sorts of dynamic filters more tractable we have the method `PhysicalExpr::snapshot` + /// (TODO: add reference after <https://github.com/apache/datafusion/pull/15568> is merged) + /// which attempts to simplify a dynamic filter into a "basic" non-dynamic filter. + /// For a join this could mean converting it to an `InList` filter or a min/max filter for example. + /// See `datafusion/physical-plan/src/dynamic_filters.rs` for more details. + /// + /// # Example: Push TopK filters into Scans + /// + /// Another form of dynamic filter is pushing down the state of a `TopK` + /// operator for queries like `SELECT * FROM t ORDER BY id LIMIT 10`: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ TopK │ + /// │ limit = 10 │ + /// │ order by = [id] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// ``` + /// + /// We can avoid large amounts of data processing by transforming this into: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ TopK │ + /// │ limit = 10 │ + /// │ order by = [id] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// │ filters = │ + /// │ [id < @ TopKHeap] │ + /// └──────────────────────┘ + /// ``` + /// + /// Now as we fill our `TopK` heap we can push down the state of the heap to + /// the `DataSourceExec` node to avoid reading files / row groups / pages / + /// rows that could not possibly be in the top 10. + /// + /// This is not yet implemented in DataFusion. See + /// <https://github.com/apache/datafusion/issues/15037> + /// + /// [`FilterExec`]: crate::filter::FilterExec + /// [`ProjectionExec`]: crate::projection::ProjectionExec + /// [`AggregateExec`]: crate::aggregates::AggregateExec + fn try_pushdown_filters( + &self, + plan: &Arc<dyn ExecutionPlan>, + parent_filters: &[PhysicalExprRef], + config: &ConfigOptions, + ) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> { Review Comment: After applying my suggestion about moving the recursion into rule's itself, this API should only define how the operator treats the predicates, and update them. If you see a challenge, maybe we can separate these two into 2 API's ########## datafusion/datasource/src/file_scan_config.rs: ########## @@ -587,6 +589,27 @@ impl DataSource for FileScanConfig { ) as _ })) } + + fn try_pushdown_filters( + &self, + filters: &[PhysicalExprRef], + config: &ConfigOptions, + ) -> Result<FilterPushdownResult<Arc<dyn DataSource>>> { + match self.file_source.try_pushdown_filters(filters, config)? { + FilterPushdownResult::NotPushed => Ok(FilterPushdownResult::NotPushed), + FilterPushdownResult::Pushed { inner, support } => { + let new_self = Arc::new( + FileScanConfigBuilder::from(self.clone()) + .with_source(inner) + .build(), + ); + Ok(FilterPushdownResult::Pushed { + inner: new_self, Review Comment: Rather than saying "inner", can we emphasize this is the new self having the filter pushed down into it? Maybe renaming it as "updated" ? WDYT? ########## datafusion/datasource/src/source.rs: ########## @@ -254,3 +284,13 @@ impl DataSourceExec { }) } } + +/// Create a new `DataSourceExec` from a `DataSource` Review Comment: There is ```rust pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> { Arc::new(Self::new(Arc::new(data_source))) } ``` in impl DataSourceExec. Doesn't that work for you? ########## datafusion/physical-expr/src/utils/mod.rs: ########## @@ -47,6 +47,31 @@ pub fn split_conjunction( split_impl(Operator::And, predicate, vec![]) } +/// Create a conjunction of the given predicates. +/// If the input is empty, return a literal true. +/// If the input contains a single predicate, return the predicate. +/// Otherwise, return a conjunction of the predicates (e.g. `a AND b AND c`). +pub fn conjunction( + predicates: impl IntoIterator<Item = Arc<dyn PhysicalExpr>>, +) -> Arc<dyn PhysicalExpr> { + conjunction_opt(predicates).unwrap_or_else(|| crate::expressions::lit(true)) +} + +/// Create a conjunction of the given predicates. Review Comment: Why not just merging these two? I don't think people prefer something other than "true" if they provide an empty iterator ########## datafusion/core/tests/physical_optimizer/filter_pushdown.rs: ########## @@ -0,0 +1,529 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ + datasource::object_store::ObjectStoreUrl, + logical_expr::Operator, + physical_plan::{ + expressions::{BinaryExpr, Column, Literal}, + PhysicalExpr, + }, + scalar::ScalarValue, +}; +use datafusion_common::{config::ConfigOptions, Statistics}; +use datafusion_common::{internal_err, Result}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{ + file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, +}; +use datafusion_expr::test::function_stub::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{ + aggregate::AggregateExprBuilder, conjunction, Partitioning, PhysicalExprRef, +}; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_optimizer::filter_pushdown::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::filter_pushdown::{FilterPushdown, FilterPushdownResult}; +use datafusion_physical_plan::{ + aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, + coalesce_batches::CoalesceBatchesExec, + filter::FilterExec, + repartition::RepartitionExec, +}; +use datafusion_physical_plan::{ + displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan, +}; +use object_store::ObjectStore; +use std::sync::{Arc, OnceLock}; +use std::{ + any::Any, + fmt::{Display, Formatter}, +}; + +/// A placeholder data source that accepts filter pushdown +#[derive(Clone, Default)] +struct TestSource { + support: Option<FilterPushdown>, + predicate: Option<PhysicalExprRef>, + statistics: Option<Statistics>, +} + +impl TestSource { + fn new(support: Option<FilterPushdown>) -> Self { + Self { + support, + predicate: None, + statistics: None, + } + } +} + +impl FileSource for TestSource { + fn create_file_opener( + &self, + _object_store: Arc<dyn ObjectStore>, + _base_config: &FileScanConfig, + _partition: usize, + ) -> Arc<dyn FileOpener> { + todo!("should not be called") + } + + fn as_any(&self) -> &dyn Any { + todo!("should not be called") + } + + fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> { + todo!("should not be called") + } + + fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> { + todo!("should not be called") + } + + fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> { + todo!("should not be called") + } + + fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { + Arc::new(TestSource { + statistics: Some(statistics), + ..self.clone() + }) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + todo!("should not be called") + } + + fn statistics(&self) -> Result<Statistics> { + Ok(self + .statistics + .as_ref() + .expect("statistics not set") + .clone()) + } + + fn file_type(&self) -> &str { + "test" + } + + fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let predicate_string = self + .predicate + .as_ref() + .map(|p| format!(", predicate={p}")) + .unwrap_or_default(); + + write!(f, "{}", predicate_string) + } + DisplayFormatType::TreeRender => { + if let Some(predicate) = &self.predicate { + writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; + } + Ok(()) + } + } + } + + fn try_pushdown_filters( + &self, + filters: &[PhysicalExprRef], + config: &ConfigOptions, + ) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> { + let support = match self.support { + Some(support) => support, + None => { + if config.execution.parquet.pushdown_filters { + FilterPushdown::Exact + } else { + FilterPushdown::Inexact + } + } + }; + let new = Arc::new(TestSource { + support: self.support, + predicate: Some(conjunction(filters.iter().map(Arc::clone))), + statistics: self.statistics.clone(), + }); + Ok(FilterPushdownResult::Pushed { + inner: new, + support: vec![support; filters.len()], + }) + } +} + +fn test_scan(support: Option<FilterPushdown>) -> Arc<dyn ExecutionPlan> { + let schema = schema(); + let source = Arc::new(TestSource::new(support)); + let base_config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test://").unwrap(), + Arc::clone(schema), + source, + ) + .build(); + DataSourceExec::from_data_source(base_config) +} + +#[test] +fn test_pushdown_into_scan() { + let scan = test_scan(Some(FilterPushdown::Exact)); + let predicate = col_lit_predicate("a", "foo", schema()); + let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()); + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo + " + ); +} + +/// Show that we can use config options to determine how to do pushdown. +#[test] +fn test_pushdown_into_scan_with_config_options() { + let scan = test_scan(None); + let predicate = col_lit_predicate("a", "foo", schema()); + let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()) as _; + + let mut cfg = ConfigOptions::default(); + cfg.execution.parquet.pushdown_filters = false; + insta::assert_snapshot!( + OptimizationTest::new_with_config( + Arc::clone(&plan), + PushdownFilter {}, + &cfg + ), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo + " + ); + + cfg.execution.parquet.pushdown_filters = true; + insta::assert_snapshot!( + OptimizationTest::new_with_config( + plan, + PushdownFilter {}, + &cfg + ), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo + " + ); +} + +#[test] +fn test_filter_collapse() { + // filter should be pushed down into the parquet scan with two filters + let scan = test_scan(Some(FilterPushdown::Exact)); + let predicate1 = col_lit_predicate("a", "foo", schema()); + let filter1 = Arc::new(FilterExec::try_new(predicate1, scan).unwrap()); + let predicate2 = col_lit_predicate("b", "bar", schema()); + let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - FilterExec: b@1 = bar + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=b@1 = bar AND a@0 = foo + " + ); +} + +#[test] +fn test_filter_with_projection() { + let scan = test_scan(Some(FilterPushdown::Exact)); + let projection = vec![1, 0]; + let projected_schema = Arc::new(schema().project(&projection).unwrap()); + let predicate = col_lit_predicate("a", "foo", &projected_schema); + let plan = Arc::new( + FilterExec::try_new(predicate, Arc::clone(&scan)) + .unwrap() + .with_projection(Some(projection)) + .unwrap(), + ); + // expect the predicate to be pushed down into the DataSource but the FilterExec to be kept for its projection + // the pushed down filters should have their indices adjusted + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - FilterExec: a@1 = foo, projection=[b@1, a@0] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - FilterExec: true, projection=[b@1, a@0] Review Comment: future optimization tasks are clearly visible here :D ########## datafusion/physical-optimizer/src/filter_pushdown.rs: ########## @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion_common::{config::ConfigOptions, DataFusionError, Result}; +use datafusion_physical_plan::{ExecutionPlan, FilterPushdownResult}; + +use crate::PhysicalOptimizerRule; + +/// A physical optimizer rule that pushes down filters in the execution plan. +/// See [`ExecutionPlan::try_pushdown_filters`] for a detailed description of the algorithm. +#[derive(Debug)] +pub struct PushdownFilter {} + +impl Default for PushdownFilter { + fn default() -> Self { + Self::new() + } +} + +impl PushdownFilter { + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for PushdownFilter { + fn optimize( + &self, + plan: Arc<dyn ExecutionPlan>, + config: &ConfigOptions, + ) -> Result<Arc<dyn ExecutionPlan>> { + match plan.try_pushdown_filters(&plan, &Vec::new(), config)? { Review Comment: My major suggestion is we should implement the recursion logic in this rule, not in ExecutionPlan trait. Firstly, all other rules are in this pattern, and easy to debug. Secondly, there already are many recursion tools for traversing and updating mechanisms, e.g transform_down, PlanContext<T>... As we implement the other operators like joins etc. we will be easily writing only the filter pushdown related logic in ExecutionPlan implementation, not the traversal or update logic. ########## datafusion/core/tests/physical_optimizer/filter_pushdown.rs: ########## @@ -0,0 +1,529 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ + datasource::object_store::ObjectStoreUrl, + logical_expr::Operator, + physical_plan::{ + expressions::{BinaryExpr, Column, Literal}, + PhysicalExpr, + }, + scalar::ScalarValue, +}; +use datafusion_common::{config::ConfigOptions, Statistics}; +use datafusion_common::{internal_err, Result}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{ + file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, +}; +use datafusion_expr::test::function_stub::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{ + aggregate::AggregateExprBuilder, conjunction, Partitioning, PhysicalExprRef, +}; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_optimizer::filter_pushdown::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::filter_pushdown::{FilterPushdown, FilterPushdownResult}; +use datafusion_physical_plan::{ + aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, + coalesce_batches::CoalesceBatchesExec, + filter::FilterExec, + repartition::RepartitionExec, +}; +use datafusion_physical_plan::{ + displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan, +}; +use object_store::ObjectStore; +use std::sync::{Arc, OnceLock}; +use std::{ + any::Any, + fmt::{Display, Formatter}, +}; + +/// A placeholder data source that accepts filter pushdown +#[derive(Clone, Default)] +struct TestSource { + support: Option<FilterPushdown>, + predicate: Option<PhysicalExprRef>, + statistics: Option<Statistics>, +} + +impl TestSource { + fn new(support: Option<FilterPushdown>) -> Self { + Self { + support, + predicate: None, + statistics: None, + } + } +} + +impl FileSource for TestSource { + fn create_file_opener( + &self, + _object_store: Arc<dyn ObjectStore>, + _base_config: &FileScanConfig, + _partition: usize, + ) -> Arc<dyn FileOpener> { + todo!("should not be called") + } + + fn as_any(&self) -> &dyn Any { + todo!("should not be called") + } + + fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> { + todo!("should not be called") + } + + fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> { + todo!("should not be called") + } + + fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> { + todo!("should not be called") + } + + fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { + Arc::new(TestSource { + statistics: Some(statistics), + ..self.clone() + }) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + todo!("should not be called") + } + + fn statistics(&self) -> Result<Statistics> { + Ok(self + .statistics + .as_ref() + .expect("statistics not set") + .clone()) + } + + fn file_type(&self) -> &str { + "test" + } + + fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let predicate_string = self + .predicate + .as_ref() + .map(|p| format!(", predicate={p}")) + .unwrap_or_default(); + + write!(f, "{}", predicate_string) + } + DisplayFormatType::TreeRender => { + if let Some(predicate) = &self.predicate { + writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; + } + Ok(()) + } + } + } + + fn try_pushdown_filters( + &self, + filters: &[PhysicalExprRef], + config: &ConfigOptions, + ) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> { + let support = match self.support { + Some(support) => support, + None => { + if config.execution.parquet.pushdown_filters { + FilterPushdown::Exact + } else { + FilterPushdown::Inexact + } + } + }; + let new = Arc::new(TestSource { + support: self.support, + predicate: Some(conjunction(filters.iter().map(Arc::clone))), + statistics: self.statistics.clone(), + }); + Ok(FilterPushdownResult::Pushed { + inner: new, + support: vec![support; filters.len()], + }) + } +} + +fn test_scan(support: Option<FilterPushdown>) -> Arc<dyn ExecutionPlan> { + let schema = schema(); + let source = Arc::new(TestSource::new(support)); + let base_config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test://").unwrap(), + Arc::clone(schema), + source, + ) + .build(); + DataSourceExec::from_data_source(base_config) +} + +#[test] +fn test_pushdown_into_scan() { + let scan = test_scan(Some(FilterPushdown::Exact)); + let predicate = col_lit_predicate("a", "foo", schema()); + let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()); + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo + " + ); +} + +/// Show that we can use config options to determine how to do pushdown. +#[test] +fn test_pushdown_into_scan_with_config_options() { + let scan = test_scan(None); + let predicate = col_lit_predicate("a", "foo", schema()); + let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()) as _; + + let mut cfg = ConfigOptions::default(); + cfg.execution.parquet.pushdown_filters = false; + insta::assert_snapshot!( + OptimizationTest::new_with_config( + Arc::clone(&plan), + PushdownFilter {}, + &cfg + ), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo + " + ); + + cfg.execution.parquet.pushdown_filters = true; + insta::assert_snapshot!( + OptimizationTest::new_with_config( + plan, + PushdownFilter {}, + &cfg + ), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo + " + ); +} + +#[test] +fn test_filter_collapse() { + // filter should be pushed down into the parquet scan with two filters + let scan = test_scan(Some(FilterPushdown::Exact)); + let predicate1 = col_lit_predicate("a", "foo", schema()); + let filter1 = Arc::new(FilterExec::try_new(predicate1, scan).unwrap()); + let predicate2 = col_lit_predicate("b", "bar", schema()); + let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - FilterExec: b@1 = bar + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=b@1 = bar AND a@0 = foo + " + ); +} + +#[test] +fn test_filter_with_projection() { + let scan = test_scan(Some(FilterPushdown::Exact)); + let projection = vec![1, 0]; + let projected_schema = Arc::new(schema().project(&projection).unwrap()); + let predicate = col_lit_predicate("a", "foo", &projected_schema); + let plan = Arc::new( + FilterExec::try_new(predicate, Arc::clone(&scan)) + .unwrap() + .with_projection(Some(projection)) + .unwrap(), + ); + // expect the predicate to be pushed down into the DataSource but the FilterExec to be kept for its projection + // the pushed down filters should have their indices adjusted + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - FilterExec: a@1 = foo, projection=[b@1, a@0] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - FilterExec: true, projection=[b@1, a@0] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo + ", + ); + + // add a test where the filter is on a column that isn't included in the output + let projection = vec![1]; + let predicate = col_lit_predicate("a", "foo", schema()); + let plan = Arc::new( + FilterExec::try_new(predicate, scan) + .unwrap() + .with_projection(Some(projection)) + .unwrap(), + ); + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo, projection=[b@1] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - FilterExec: true, projection=[b@1] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo + " + ); +} + +#[test] +fn test_push_down_through_transparent_nodes() { + // expect the predicate to be pushed down into the DataSource + let scan = test_scan(Some(FilterPushdown::Exact)); + let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 1)); + let predicate = col_lit_predicate("a", "foo", schema()); + let filter = Arc::new(FilterExec::try_new(predicate, coalesce).unwrap()); + let repartition = Arc::new( + RepartitionExec::try_new(filter, Partitioning::RoundRobinBatch(1)).unwrap(), + ); + let predicate = col_lit_predicate("a", "bar", schema()); + let plan = Arc::new(FilterExec::try_new(predicate, repartition).unwrap()); + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: Review Comment: I don't have any intuition, but can people prefer applying filter over repartitioned (to do it in parallel) or coalesced (to do it over possibly more rows) batches? ########## datafusion/core/tests/physical_optimizer/filter_pushdown.rs: ########## @@ -0,0 +1,529 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ + datasource::object_store::ObjectStoreUrl, + logical_expr::Operator, + physical_plan::{ + expressions::{BinaryExpr, Column, Literal}, + PhysicalExpr, + }, + scalar::ScalarValue, +}; +use datafusion_common::{config::ConfigOptions, Statistics}; +use datafusion_common::{internal_err, Result}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{ + file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, +}; +use datafusion_expr::test::function_stub::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{ + aggregate::AggregateExprBuilder, conjunction, Partitioning, PhysicalExprRef, +}; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_optimizer::filter_pushdown::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::filter_pushdown::{FilterPushdown, FilterPushdownResult}; +use datafusion_physical_plan::{ + aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, + coalesce_batches::CoalesceBatchesExec, + filter::FilterExec, + repartition::RepartitionExec, +}; +use datafusion_physical_plan::{ + displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan, +}; +use object_store::ObjectStore; +use std::sync::{Arc, OnceLock}; +use std::{ + any::Any, + fmt::{Display, Formatter}, +}; + +/// A placeholder data source that accepts filter pushdown +#[derive(Clone, Default)] +struct TestSource { + support: Option<FilterPushdown>, + predicate: Option<PhysicalExprRef>, + statistics: Option<Statistics>, +} + +impl TestSource { + fn new(support: Option<FilterPushdown>) -> Self { + Self { + support, + predicate: None, + statistics: None, + } + } +} + +impl FileSource for TestSource { + fn create_file_opener( + &self, + _object_store: Arc<dyn ObjectStore>, + _base_config: &FileScanConfig, + _partition: usize, + ) -> Arc<dyn FileOpener> { + todo!("should not be called") + } + + fn as_any(&self) -> &dyn Any { + todo!("should not be called") + } + + fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> { + todo!("should not be called") + } + + fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> { + todo!("should not be called") + } + + fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> { + todo!("should not be called") + } + + fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { + Arc::new(TestSource { + statistics: Some(statistics), + ..self.clone() + }) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + todo!("should not be called") + } + + fn statistics(&self) -> Result<Statistics> { + Ok(self + .statistics + .as_ref() + .expect("statistics not set") + .clone()) + } + + fn file_type(&self) -> &str { + "test" + } + + fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let predicate_string = self + .predicate + .as_ref() + .map(|p| format!(", predicate={p}")) + .unwrap_or_default(); + + write!(f, "{}", predicate_string) + } + DisplayFormatType::TreeRender => { + if let Some(predicate) = &self.predicate { + writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; + } + Ok(()) + } + } + } + + fn try_pushdown_filters( + &self, + filters: &[PhysicalExprRef], + config: &ConfigOptions, + ) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> { + let support = match self.support { + Some(support) => support, + None => { + if config.execution.parquet.pushdown_filters { + FilterPushdown::Exact + } else { + FilterPushdown::Inexact + } + } + }; + let new = Arc::new(TestSource { + support: self.support, + predicate: Some(conjunction(filters.iter().map(Arc::clone))), + statistics: self.statistics.clone(), + }); + Ok(FilterPushdownResult::Pushed { + inner: new, + support: vec![support; filters.len()], + }) + } +} + +fn test_scan(support: Option<FilterPushdown>) -> Arc<dyn ExecutionPlan> { + let schema = schema(); + let source = Arc::new(TestSource::new(support)); + let base_config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test://").unwrap(), + Arc::clone(schema), + source, + ) + .build(); + DataSourceExec::from_data_source(base_config) +} + +#[test] +fn test_pushdown_into_scan() { + let scan = test_scan(Some(FilterPushdown::Exact)); + let predicate = col_lit_predicate("a", "foo", schema()); + let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()); + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo Review Comment: maybe display "exactness" as well to be prettier ########## datafusion/physical-plan/src/filter_pushdown.rs: ########## @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub use datafusion_expr::FilterPushdown; +use datafusion_physical_expr::PhysicalExprRef; + +/// The combined result of a filter pushdown operation. +/// This includes: +/// * The inner plan that was produced by the pushdown operation. +/// * The support for each filter that was pushed down. +pub enum FilterPushdownResult<T> { Review Comment: There is no restriction on this "T", but comments say it's a plan. Not a big issue but maybe we can reflect what is T better ########## datafusion/physical-plan/src/filter_pushdown.rs: ########## @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub use datafusion_expr::FilterPushdown; +use datafusion_physical_expr::PhysicalExprRef; + +/// The combined result of a filter pushdown operation. +/// This includes: +/// * The inner plan that was produced by the pushdown operation. +/// * The support for each filter that was pushed down. +pub enum FilterPushdownResult<T> { + /// No pushdown was possible, keep this node as is in the tree. + NotPushed, + /// Pushed some or all filters into this node. + /// The caller should replace the node in the tree with the new one provided + /// and should transmit to parents the support for each filter. + Pushed { + /// The inner node that was produced by the pushdown operation. + inner: T, Review Comment: if we can rename this field with a keyword like "new_" or "updated_", it can be more self-explanatory, IMO ########## datafusion/physical-plan/src/execution_plan.rs: ########## @@ -467,6 +468,439 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Result<Option<Arc<dyn ExecutionPlan>>> { Ok(None) } + + /// Attempts to recursively push given filters into this `ExecutionPlan` or + /// its children and push any filters from this node into its children. + /// + /// This is used to implement filter pushdown in the physical plan. Note + /// that DataFusion also implements filter pushdown in the logical plan, + /// which is a different code path. This method is here to support + /// additional optimizations that may be only be possible in the physical + /// plan such as dynamic filtering (see below). + /// + /// See [`try_pushdown_filters_to_input`] for a simple implementation + /// + /// # Arguments + /// * `plan`: `Arc`d instance of self + /// * `parent_filters`: A vector of [`PhysicalExpr`]s from the parent of this node + /// to try and push down + /// * `config`: The configuration options for execution + /// + /// # Default Implementation + /// + /// The default implementation assumes: + /// * Parent filters can't be passed onto children. + /// * This node has no filters to contribute. + /// + /// Note the default implementation still recurses into children to + /// recursively call `try_pushdown_filters` on subtrees that may have + /// filters to pushdown. + /// + /// # Example: Push filter into a `DataSourceExec` + /// + /// For example, consider the following plan: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ CoalesceBatchesExec │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = [ id=1] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// ``` + /// + /// Our goal is to move the `id = 1` filter from the [`FilterExec`] node to the `DataSourceExec` node. + /// + /// If this filter is selective pushing it into the scan can avoid massive + /// amounts of data being read from the source (the projection is `*` so all + /// matching columns are read). + /// + /// In this simple case we: + /// 1. Enter the recursion with no filters. + /// 2. We find the [`FilterExec`] node and call [`ExecutionPlan::try_pushdown_filters`] on it. + /// 3. The [`FilterExec`] node tries to push it's filters + the filters from the parent nodes (in this case empty) + /// down into it's input, which is the `DataSourceExec` node. + /// 4. The `DataSourceExec` node accepts the filter and returns a [`FilterPushdownResult`] with a new copy of itself + /// and [`FilterPushdown::Exact`] to indicate that the filter was pushed down and the caller no longer + /// needs to handle it. + /// 5. The [`FilterExec`] seeing that all filters were pushed down returns a [`FilterPushdownResult`] that directly + /// returns the new `DataSourceExec` node, effectively removing the [`FilterExec`] node from the plan. + /// + /// The new plan looks like: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ CoalesceBatchesExec │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// │ filters = [ id=1] │ + /// └──────────────────────┘ + /// ``` + /// + /// # Example: Push filters with `ProjectionExec` + /// + /// Let's consider a more complex example involving a [`ProjectionExec`] + /// node in between the [`FilterExec`] and `DataSourceExec` nodes that + /// creates a new column that the filter depends on. + /// + /// ```text + /// ┌──────────────────────┐ + /// │ CoalesceBatchesExec │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = │ + /// │ [cost>50,id=1] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ ProjectionExec │ + /// │ cost = price * 1.2 │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// ``` + /// + /// We want to push down the filters `[id=1]` to the `DataSourceExec` node, + /// but can't push down `cost>50` because it requires the [`ProjectionExec`] + /// node to be executed first. A simple thing to do would be to split up the + /// filter into two separate filters and push down the first one: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ CoalesceBatchesExec │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = │ + /// │ [cost>50] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ ProjectionExec │ + /// │ cost = price * 1.2 │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// │ filters = [ id=1] │ + /// └──────────────────────┘ + /// ``` + /// + /// We can actually however do better by pushing down `price * 1.2 > 50` + /// instead of `cost > 50`: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ CoalesceBatchesExec │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ ProjectionExec │ + /// │ cost = price * 1.2 │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// │ filters = [id=1, │ + /// │ price * 1.2 > 50] │ + /// └──────────────────────┘ + /// ``` + /// + /// # Example: Push filters within a subtree + /// + /// There are also cases where we may be able to push down filters within a + /// subtree but not the entire tree. A good example of this is aggregation + /// nodes: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ ProjectionExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = [sum > 10] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌───────────────────────┐ + /// │ AggregateExec │ + /// │ group by = [id] │ + /// │ aggregate = │ + /// │ [sum(price)] │ + /// └───────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = [id=1] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// ``` + /// + /// The transformation here is to push down the `id=1` filter to the + /// `DataSourceExec` node: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ ProjectionExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = [sum > 10] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌───────────────────────┐ + /// │ AggregateExec │ + /// │ group by = [id] │ + /// │ aggregate = │ + /// │ [sum(price)] │ + /// └───────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// │ filters = [id=1] │ + /// └──────────────────────┘ + /// ``` + /// + /// The point here is that: + /// 1. We cannot push down `sum > 10` through the `AggregateExec` node into the `DataSourceExec` node. + /// Any filters above the `AggregateExec` node are not pushed down. + /// This is determined by calling [`ExecutionPlan::try_pushdown_filters`] on the [`AggregateExec`] node. + /// 2. We need to keep recursing into the tree so that we can discover the other [`FilterExec`] node and push + /// down the `id=1` filter. + /// + /// # Example: Push filters through Joins + /// + /// It is also possible to push down filters through joins and filters that + /// originate from joins. For example, a hash join where we build a hash + /// table of the left side and probe the right side (ignoring why we would + /// choose this order, typically it depends on the size of each table, + /// etc.). + /// + /// ```text + /// ┌─────────────────────┐ + /// │ FilterExec │ + /// │ filters = │ + /// │ [d.size > 100] │ + /// └─────────────────────┘ + /// │ + /// │ + /// ┌──────────▼──────────┐ + /// │ │ + /// │ HashJoinExec │ + /// │ [u.dept@hash(d.id)] │ + /// │ │ + /// └─────────────────────┘ + /// │ + /// ┌────────────┴────────────┐ + /// ┌──────────▼──────────┐ ┌──────────▼──────────┐ + /// │ DataSourceExec │ │ DataSourceExec │ + /// │ alias [users as u] │ │ alias [dept as d] │ + /// │ │ │ │ + /// └─────────────────────┘ └─────────────────────┘ + /// ``` + /// + /// There are two pushdowns we can do here: + /// 1. Push down the `d.size > 100` filter through the `HashJoinExec` node to the `DataSourceExec` + /// node for the `departments` table. + /// 2. Push down the hash table state from the `HashJoinExec` node to the `DataSourceExec` node to avoid reading + /// rows from the `users` table that will be eliminated by the join. + /// This can be done via a bloom filter or similar and is not (yet) supported + /// in DataFusion. See <https://github.com/apache/datafusion/issues/7955>. + /// + /// ```text + /// ┌─────────────────────┐ + /// │ │ + /// │ HashJoinExec │ + /// │ [u.dept@hash(d.id)] │ + /// │ │ + /// └─────────────────────┘ + /// │ + /// ┌────────────┴────────────┐ + /// ┌──────────▼──────────┐ ┌──────────▼──────────┐ + /// │ DataSourceExec │ │ DataSourceExec │ + /// │ alias [users as u] │ │ alias [dept as d] │ + /// │ filters = │ │ filters = │ + /// │ [depg@hash(d.id)] │ │ [ d.size > 100] │ + /// └─────────────────────┘ └─────────────────────┘ + /// ``` + /// + /// You may notice in this case that the filter is *dynamic*: the hash table + /// is built _after_ the `departments` table is read and at runtime. We + /// don't have a concrete `InList` filter or similar to push down at + /// optimization time. These sorts of dynamic filters are handled by + /// building a specialized [`PhysicalExpr`] that can be evaluated at runtime + /// and internally maintains a reference to the hash table or other state. + /// + /// To make working with these sorts of dynamic filters more tractable we have the method `PhysicalExpr::snapshot` + /// (TODO: add reference after <https://github.com/apache/datafusion/pull/15568> is merged) + /// which attempts to simplify a dynamic filter into a "basic" non-dynamic filter. + /// For a join this could mean converting it to an `InList` filter or a min/max filter for example. + /// See `datafusion/physical-plan/src/dynamic_filters.rs` for more details. + /// + /// # Example: Push TopK filters into Scans + /// + /// Another form of dynamic filter is pushing down the state of a `TopK` + /// operator for queries like `SELECT * FROM t ORDER BY id LIMIT 10`: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ TopK │ + /// │ limit = 10 │ + /// │ order by = [id] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// ``` + /// + /// We can avoid large amounts of data processing by transforming this into: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ TopK │ + /// │ limit = 10 │ + /// │ order by = [id] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// │ filters = │ + /// │ [id < @ TopKHeap] │ + /// └──────────────────────┘ + /// ``` + /// + /// Now as we fill our `TopK` heap we can push down the state of the heap to + /// the `DataSourceExec` node to avoid reading files / row groups / pages / + /// rows that could not possibly be in the top 10. + /// + /// This is not yet implemented in DataFusion. See + /// <https://github.com/apache/datafusion/issues/15037> + /// + /// [`FilterExec`]: crate::filter::FilterExec + /// [`ProjectionExec`]: crate::projection::ProjectionExec + /// [`AggregateExec`]: crate::aggregates::AggregateExec + fn try_pushdown_filters( + &self, + plan: &Arc<dyn ExecutionPlan>, + parent_filters: &[PhysicalExprRef], + config: &ConfigOptions, + ) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> { + // By default assume that: + // * Parent filters can't be passed onto children. + // * We have no filters to contribute. + // But we still want to recurse into our children in case a subtree has pushdowns within + // it and thus we need to replace our children with the new plans. + let mut new_children = Vec::with_capacity(self.children().len()); + let mut pushed = false; + for child in self.children() { + match child.try_pushdown_filters(child, &Vec::new(), config)? { + FilterPushdownResult::NotPushed => { + // No pushdown possible, keep this child as is + new_children.push(Arc::clone(child)); + } + FilterPushdownResult::Pushed { inner, support } => { + // We have a child that has pushed down some filters + new_children.push(inner); + pushed = true; + // Support should be empty, we didn't pass any filters + if !support.is_empty() { + return internal_err!( + "No filters passed, but child plan reported pushed filters" + ); + } + } + } + } + if pushed { + let new_inner = + with_new_children_if_necessary(Arc::clone(plan), new_children)?; + Ok(FilterPushdownResult::Pushed { + inner: new_inner, + support: vec![FilterPushdown::Unsupported; parent_filters.len()], + }) + } else { + Ok(FilterPushdownResult::NotPushed) + } + } +} + +/// A default implementation of [`ExecutionPlan::try_pushdown_filters`] that +/// pushes down filters transparently to an input. +/// +/// Requires that the input: +/// * Has a single input / child node. +/// * Supports transparent filter pushdown (does not modify the cardinality or schema of the data). +/// * Does not have any filters of its own. +pub fn try_pushdown_filters_to_input( Review Comment: This util can go to optimizer code as well ########## datafusion/core/tests/physical_optimizer/filter_pushdown.rs: ########## @@ -0,0 +1,529 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ + datasource::object_store::ObjectStoreUrl, + logical_expr::Operator, + physical_plan::{ + expressions::{BinaryExpr, Column, Literal}, + PhysicalExpr, + }, + scalar::ScalarValue, +}; +use datafusion_common::{config::ConfigOptions, Statistics}; +use datafusion_common::{internal_err, Result}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{ + file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, +}; +use datafusion_expr::test::function_stub::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{ + aggregate::AggregateExprBuilder, conjunction, Partitioning, PhysicalExprRef, +}; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_optimizer::filter_pushdown::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::filter_pushdown::{FilterPushdown, FilterPushdownResult}; +use datafusion_physical_plan::{ + aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, + coalesce_batches::CoalesceBatchesExec, + filter::FilterExec, + repartition::RepartitionExec, +}; +use datafusion_physical_plan::{ + displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan, +}; +use object_store::ObjectStore; +use std::sync::{Arc, OnceLock}; +use std::{ + any::Any, + fmt::{Display, Formatter}, +}; + +/// A placeholder data source that accepts filter pushdown +#[derive(Clone, Default)] +struct TestSource { + support: Option<FilterPushdown>, + predicate: Option<PhysicalExprRef>, + statistics: Option<Statistics>, +} + +impl TestSource { + fn new(support: Option<FilterPushdown>) -> Self { + Self { + support, + predicate: None, + statistics: None, + } + } +} + +impl FileSource for TestSource { + fn create_file_opener( + &self, + _object_store: Arc<dyn ObjectStore>, + _base_config: &FileScanConfig, + _partition: usize, + ) -> Arc<dyn FileOpener> { + todo!("should not be called") + } + + fn as_any(&self) -> &dyn Any { + todo!("should not be called") + } + + fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> { + todo!("should not be called") + } + + fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> { + todo!("should not be called") + } + + fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> { + todo!("should not be called") + } + + fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { + Arc::new(TestSource { + statistics: Some(statistics), + ..self.clone() + }) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + todo!("should not be called") + } + + fn statistics(&self) -> Result<Statistics> { + Ok(self + .statistics + .as_ref() + .expect("statistics not set") + .clone()) + } + + fn file_type(&self) -> &str { + "test" + } + + fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let predicate_string = self + .predicate + .as_ref() + .map(|p| format!(", predicate={p}")) + .unwrap_or_default(); + + write!(f, "{}", predicate_string) + } + DisplayFormatType::TreeRender => { + if let Some(predicate) = &self.predicate { + writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; + } + Ok(()) + } + } + } + + fn try_pushdown_filters( + &self, + filters: &[PhysicalExprRef], + config: &ConfigOptions, + ) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> { + let support = match self.support { + Some(support) => support, + None => { + if config.execution.parquet.pushdown_filters { + FilterPushdown::Exact + } else { + FilterPushdown::Inexact + } + } + }; + let new = Arc::new(TestSource { + support: self.support, + predicate: Some(conjunction(filters.iter().map(Arc::clone))), + statistics: self.statistics.clone(), + }); + Ok(FilterPushdownResult::Pushed { + inner: new, + support: vec![support; filters.len()], + }) + } +} + +fn test_scan(support: Option<FilterPushdown>) -> Arc<dyn ExecutionPlan> { + let schema = schema(); + let source = Arc::new(TestSource::new(support)); + let base_config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test://").unwrap(), + Arc::clone(schema), + source, + ) + .build(); + DataSourceExec::from_data_source(base_config) +} + +#[test] +fn test_pushdown_into_scan() { + let scan = test_scan(Some(FilterPushdown::Exact)); + let predicate = col_lit_predicate("a", "foo", schema()); + let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()); + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo + " + ); +} + +/// Show that we can use config options to determine how to do pushdown. +#[test] +fn test_pushdown_into_scan_with_config_options() { + let scan = test_scan(None); + let predicate = col_lit_predicate("a", "foo", schema()); + let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()) as _; + + let mut cfg = ConfigOptions::default(); + cfg.execution.parquet.pushdown_filters = false; + insta::assert_snapshot!( + OptimizationTest::new_with_config( + Arc::clone(&plan), + PushdownFilter {}, + &cfg + ), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo + " + ); + + cfg.execution.parquet.pushdown_filters = true; + insta::assert_snapshot!( + OptimizationTest::new_with_config( + plan, + PushdownFilter {}, + &cfg + ), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo + " + ); +} + +#[test] +fn test_filter_collapse() { + // filter should be pushed down into the parquet scan with two filters + let scan = test_scan(Some(FilterPushdown::Exact)); + let predicate1 = col_lit_predicate("a", "foo", schema()); + let filter1 = Arc::new(FilterExec::try_new(predicate1, scan).unwrap()); + let predicate2 = col_lit_predicate("b", "bar", schema()); + let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - FilterExec: b@1 = bar + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=b@1 = bar AND a@0 = foo + " + ); +} + +#[test] +fn test_filter_with_projection() { + let scan = test_scan(Some(FilterPushdown::Exact)); + let projection = vec![1, 0]; + let projected_schema = Arc::new(schema().project(&projection).unwrap()); + let predicate = col_lit_predicate("a", "foo", &projected_schema); + let plan = Arc::new( + FilterExec::try_new(predicate, Arc::clone(&scan)) + .unwrap() + .with_projection(Some(projection)) + .unwrap(), + ); + // expect the predicate to be pushed down into the DataSource but the FilterExec to be kept for its projection + // the pushed down filters should have their indices adjusted + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - FilterExec: a@1 = foo, projection=[b@1, a@0] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - FilterExec: true, projection=[b@1, a@0] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo + ", + ); + + // add a test where the filter is on a column that isn't included in the output + let projection = vec![1]; + let predicate = col_lit_predicate("a", "foo", schema()); + let plan = Arc::new( + FilterExec::try_new(predicate, scan) + .unwrap() + .with_projection(Some(projection)) + .unwrap(), + ); + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo, projection=[b@1] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test + output: + Ok: + - FilterExec: true, projection=[b@1] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo + " + ); +} + +#[test] +fn test_push_down_through_transparent_nodes() { + // expect the predicate to be pushed down into the DataSource + let scan = test_scan(Some(FilterPushdown::Exact)); + let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 1)); + let predicate = col_lit_predicate("a", "foo", schema()); + let filter = Arc::new(FilterExec::try_new(predicate, coalesce).unwrap()); + let repartition = Arc::new( + RepartitionExec::try_new(filter, Partitioning::RoundRobinBatch(1)).unwrap(), + ); + let predicate = col_lit_predicate("a", "bar", schema()); + let plan = Arc::new(FilterExec::try_new(predicate, repartition).unwrap()); + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: Review Comment: Further thoughts: 1) Pushing over CoalesceBatches is ok I think, there shouldn't be a significant disadvantage 2) For pushing over Repartitions, I think we should only push down filters over repartitions _1. repartition does not increase parallelism_ **OR** _2. filter goes until the source and embed into it_ To track the fate of filter, you can examine SortPushdown rule. There is an exact example of it there. Applying this suggestion would be easier when you deploy the recursion mechanism in the optimizer rule -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org