ozankabak commented on code in PR #8073: URL: https://github.com/apache/arrow-datafusion/pull/8073#discussion_r1386531948
########## datafusion/core/src/physical_optimizer/projection_pushdown.rs: ########## @@ -0,0 +1,2181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This file implements the `ProjectionPushdown` physical optimization rule. +//! The function [`remove_unnecessary_projections`] tries to push down all +//! projections one by one if the operator below is amenable to this. If a +//! projection reaches a source, it can even dissappear from the plan entirely. + +use super::output_requirements::OutputRequirementExec; +use super::PhysicalOptimizerRule; +use crate::datasource::physical_plan::CsvExec; +use crate::error::Result; +use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use crate::physical_plan::filter::FilterExec; +use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter}; +use crate::physical_plan::joins::{ + CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec, + SymmetricHashJoinExec, +}; +use crate::physical_plan::memory::MemoryExec; +use crate::physical_plan::projection::ProjectionExec; +use crate::physical_plan::repartition::RepartitionExec; +use crate::physical_plan::sorts::sort::SortExec; +use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use crate::physical_plan::{Distribution, ExecutionPlan}; + +use arrow_schema::SchemaRef; + +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::JoinSide; +use datafusion_physical_expr::expressions::{ + BinaryExpr, CaseExpr, CastExpr, Column, Literal, NegativeExpr, +}; +use datafusion_physical_expr::{ + Partitioning, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, + ScalarFunctionExpr, +}; +use datafusion_physical_plan::union::UnionExec; + +use itertools::Itertools; +use std::sync::Arc; + +/// This rule inspects [`ProjectionExec`]'s in the given physical plan and tries to +/// remove or swap with its child. +#[derive(Default)] +pub struct ProjectionPushdown {} + +impl ProjectionPushdown { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for ProjectionPushdown { + fn optimize( + &self, + plan: Arc<dyn ExecutionPlan>, + _config: &ConfigOptions, + ) -> Result<Arc<dyn ExecutionPlan>> { + plan.transform_down(&remove_unnecessary_projections) + } + + fn name(&self) -> &str { + "ProjectionPushdown" + } + + fn schema_check(&self) -> bool { + true + } +} + +/// This function checks if `plan` is a [`ProjectionExec`], and inspects its +/// input(s) to test whether it can push `plan` under its input(s). This function +/// will operate on the entire tree and may ultimately remove `plan` entirely +/// by leveraging source providers with built-in projection capabilities. +pub fn remove_unnecessary_projections( + plan: Arc<dyn ExecutionPlan>, +) -> Result<Transformed<Arc<dyn ExecutionPlan>>> { + let maybe_modified = if let Some(projection) = + plan.as_any().downcast_ref::<ProjectionExec>() + { + // If the projection does not cause any change on the input, we can + // safely remove it: + if is_projection_removable(projection) { + return Ok(Transformed::Yes(projection.input().clone())); + } + // If it does, check if we can push it under its child(ren): + let input = projection.input().as_any(); + if let Some(csv) = input.downcast_ref::<CsvExec>() { + try_swapping_with_csv(projection, csv) + } else if let Some(memory) = input.downcast_ref::<MemoryExec>() { + try_swapping_with_memory(projection, memory)? + } else if let Some(child_projection) = input.downcast_ref::<ProjectionExec>() { + let maybe_unified = try_unifying_projections(projection, child_projection)?; + return if let Some(new_plan) = maybe_unified { + // To unify 3 or more sequential projections: + remove_unnecessary_projections(new_plan) + } else { + Ok(Transformed::No(plan)) + }; + } else if let Some(output_req) = input.downcast_ref::<OutputRequirementExec>() { + try_swapping_with_output_req(projection, output_req)? + } else if input.is::<CoalescePartitionsExec>() { + try_swapping_with_coalesce_partitions(projection)? + } else if let Some(filter) = input.downcast_ref::<FilterExec>() { + try_swapping_with_filter(projection, filter)? + } else if let Some(repartition) = input.downcast_ref::<RepartitionExec>() { + try_swapping_with_repartition(projection, repartition)? + } else if let Some(sort) = input.downcast_ref::<SortExec>() { + try_swapping_with_sort(projection, sort)? + } else if let Some(spm) = input.downcast_ref::<SortPreservingMergeExec>() { + try_swapping_with_sort_preserving_merge(projection, spm)? + } else if let Some(union) = input.downcast_ref::<UnionExec>() { + try_pushdown_through_union(projection, union)? + } else if let Some(hash_join) = input.downcast_ref::<HashJoinExec>() { + try_pushdown_through_hash_join(projection, hash_join)? + } else if let Some(cross_join) = input.downcast_ref::<CrossJoinExec>() { + try_swapping_with_cross_join(projection, cross_join)? + } else if let Some(nl_join) = input.downcast_ref::<NestedLoopJoinExec>() { + try_swapping_with_nested_loop_join(projection, nl_join)? + } else if let Some(sm_join) = input.downcast_ref::<SortMergeJoinExec>() { + try_swapping_with_sort_merge_join(projection, sm_join)? + } else if let Some(sym_join) = input.downcast_ref::<SymmetricHashJoinExec>() { Review Comment: IIUC what @alamb proposes is almost at the boundary of categories B and D. If we can define what the proposed function does purely in terms of projection behavior (which would still have meaning independent from the pushdown rule), we can consider it to be in category D. We think that category A is not in-line with Datafusion's philosophy, and I think we all agree on this. However, in many cases, category A-type implementations serve as a good stepping stone as we try to gain a good understanding of how a good category C/D design looks like. So, on our end, we typically employ the strategy of getting a good test suite and a baseline category A implementation done first, and then progressively migrate towards a long-term category C/D solution. This PR is one of such first steps 🙂 ########## datafusion/core/src/physical_optimizer/projection_pushdown.rs: ########## @@ -0,0 +1,2181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This file implements the `ProjectionPushdown` physical optimization rule. +//! The function [`remove_unnecessary_projections`] tries to push down all +//! projections one by one if the operator below is amenable to this. If a +//! projection reaches a source, it can even dissappear from the plan entirely. + +use super::output_requirements::OutputRequirementExec; +use super::PhysicalOptimizerRule; +use crate::datasource::physical_plan::CsvExec; +use crate::error::Result; +use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use crate::physical_plan::filter::FilterExec; +use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter}; +use crate::physical_plan::joins::{ + CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec, + SymmetricHashJoinExec, +}; +use crate::physical_plan::memory::MemoryExec; +use crate::physical_plan::projection::ProjectionExec; +use crate::physical_plan::repartition::RepartitionExec; +use crate::physical_plan::sorts::sort::SortExec; +use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use crate::physical_plan::{Distribution, ExecutionPlan}; + +use arrow_schema::SchemaRef; + +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::JoinSide; +use datafusion_physical_expr::expressions::{ + BinaryExpr, CaseExpr, CastExpr, Column, Literal, NegativeExpr, +}; +use datafusion_physical_expr::{ + Partitioning, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, + ScalarFunctionExpr, +}; +use datafusion_physical_plan::union::UnionExec; + +use itertools::Itertools; +use std::sync::Arc; + +/// This rule inspects [`ProjectionExec`]'s in the given physical plan and tries to +/// remove or swap with its child. +#[derive(Default)] +pub struct ProjectionPushdown {} + +impl ProjectionPushdown { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for ProjectionPushdown { + fn optimize( + &self, + plan: Arc<dyn ExecutionPlan>, + _config: &ConfigOptions, + ) -> Result<Arc<dyn ExecutionPlan>> { + plan.transform_down(&remove_unnecessary_projections) + } + + fn name(&self) -> &str { + "ProjectionPushdown" + } + + fn schema_check(&self) -> bool { + true + } +} + +/// This function checks if `plan` is a [`ProjectionExec`], and inspects its +/// input(s) to test whether it can push `plan` under its input(s). This function +/// will operate on the entire tree and may ultimately remove `plan` entirely +/// by leveraging source providers with built-in projection capabilities. +pub fn remove_unnecessary_projections( + plan: Arc<dyn ExecutionPlan>, +) -> Result<Transformed<Arc<dyn ExecutionPlan>>> { + let maybe_modified = if let Some(projection) = + plan.as_any().downcast_ref::<ProjectionExec>() + { + // If the projection does not cause any change on the input, we can + // safely remove it: + if is_projection_removable(projection) { + return Ok(Transformed::Yes(projection.input().clone())); + } + // If it does, check if we can push it under its child(ren): + let input = projection.input().as_any(); + if let Some(csv) = input.downcast_ref::<CsvExec>() { + try_swapping_with_csv(projection, csv) + } else if let Some(memory) = input.downcast_ref::<MemoryExec>() { + try_swapping_with_memory(projection, memory)? + } else if let Some(child_projection) = input.downcast_ref::<ProjectionExec>() { + let maybe_unified = try_unifying_projections(projection, child_projection)?; + return if let Some(new_plan) = maybe_unified { + // To unify 3 or more sequential projections: + remove_unnecessary_projections(new_plan) + } else { + Ok(Transformed::No(plan)) + }; + } else if let Some(output_req) = input.downcast_ref::<OutputRequirementExec>() { + try_swapping_with_output_req(projection, output_req)? + } else if input.is::<CoalescePartitionsExec>() { + try_swapping_with_coalesce_partitions(projection)? + } else if let Some(filter) = input.downcast_ref::<FilterExec>() { + try_swapping_with_filter(projection, filter)? + } else if let Some(repartition) = input.downcast_ref::<RepartitionExec>() { + try_swapping_with_repartition(projection, repartition)? + } else if let Some(sort) = input.downcast_ref::<SortExec>() { + try_swapping_with_sort(projection, sort)? + } else if let Some(spm) = input.downcast_ref::<SortPreservingMergeExec>() { + try_swapping_with_sort_preserving_merge(projection, spm)? + } else if let Some(union) = input.downcast_ref::<UnionExec>() { + try_pushdown_through_union(projection, union)? + } else if let Some(hash_join) = input.downcast_ref::<HashJoinExec>() { + try_pushdown_through_hash_join(projection, hash_join)? + } else if let Some(cross_join) = input.downcast_ref::<CrossJoinExec>() { + try_swapping_with_cross_join(projection, cross_join)? + } else if let Some(nl_join) = input.downcast_ref::<NestedLoopJoinExec>() { + try_swapping_with_nested_loop_join(projection, nl_join)? + } else if let Some(sm_join) = input.downcast_ref::<SortMergeJoinExec>() { + try_swapping_with_sort_merge_join(projection, sm_join)? + } else if let Some(sym_join) = input.downcast_ref::<SymmetricHashJoinExec>() { Review Comment: IIUC what @alamb proposes is almost at the boundary of categories B and D. If we can define what the proposed function does purely in terms of projection behavior (which would still have meaning independent from the pushdown rule), we can consider it to be in category D. We think that category A is not in-line with Datafusion's philosophy, and I think we all agree on this. However, in many cases, category A-type implementations serve as a good stepping stone as we try to gain a good understanding of how a good category C/D design looks like. So, on our end, we typically employ the strategy of getting a good test suite and a baseline category A implementation done first, and then progressively migrate towards a long-term category C/D solution. This PR is one of such first steps 🙂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
