crepererum commented on code in PR #8073: URL: https://github.com/apache/arrow-datafusion/pull/8073#discussion_r1386515290
########## datafusion/core/src/physical_optimizer/projection_pushdown.rs: ########## @@ -0,0 +1,2181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This file implements the `ProjectionPushdown` physical optimization rule. +//! The function [`remove_unnecessary_projections`] tries to push down all +//! projections one by one if the operator below is amenable to this. If a +//! projection reaches a source, it can even dissappear from the plan entirely. + +use super::output_requirements::OutputRequirementExec; +use super::PhysicalOptimizerRule; +use crate::datasource::physical_plan::CsvExec; +use crate::error::Result; +use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use crate::physical_plan::filter::FilterExec; +use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter}; +use crate::physical_plan::joins::{ + CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec, + SymmetricHashJoinExec, +}; +use crate::physical_plan::memory::MemoryExec; +use crate::physical_plan::projection::ProjectionExec; +use crate::physical_plan::repartition::RepartitionExec; +use crate::physical_plan::sorts::sort::SortExec; +use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use crate::physical_plan::{Distribution, ExecutionPlan}; + +use arrow_schema::SchemaRef; + +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::JoinSide; +use datafusion_physical_expr::expressions::{ + BinaryExpr, CaseExpr, CastExpr, Column, Literal, NegativeExpr, +}; +use datafusion_physical_expr::{ + Partitioning, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, + ScalarFunctionExpr, +}; +use datafusion_physical_plan::union::UnionExec; + +use itertools::Itertools; +use std::sync::Arc; + +/// This rule inspects [`ProjectionExec`]'s in the given physical plan and tries to +/// remove or swap with its child. +#[derive(Default)] +pub struct ProjectionPushdown {} + +impl ProjectionPushdown { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for ProjectionPushdown { + fn optimize( + &self, + plan: Arc<dyn ExecutionPlan>, + _config: &ConfigOptions, + ) -> Result<Arc<dyn ExecutionPlan>> { + plan.transform_down(&remove_unnecessary_projections) + } + + fn name(&self) -> &str { + "ProjectionPushdown" + } + + fn schema_check(&self) -> bool { + true + } +} + +/// This function checks if `plan` is a [`ProjectionExec`], and inspects its +/// input(s) to test whether it can push `plan` under its input(s). This function +/// will operate on the entire tree and may ultimately remove `plan` entirely +/// by leveraging source providers with built-in projection capabilities. +pub fn remove_unnecessary_projections( + plan: Arc<dyn ExecutionPlan>, +) -> Result<Transformed<Arc<dyn ExecutionPlan>>> { + let maybe_modified = if let Some(projection) = + plan.as_any().downcast_ref::<ProjectionExec>() + { + // If the projection does not cause any change on the input, we can + // safely remove it: + if is_projection_removable(projection) { + return Ok(Transformed::Yes(projection.input().clone())); + } + // If it does, check if we can push it under its child(ren): + let input = projection.input().as_any(); + if let Some(csv) = input.downcast_ref::<CsvExec>() { + try_swapping_with_csv(projection, csv) + } else if let Some(memory) = input.downcast_ref::<MemoryExec>() { + try_swapping_with_memory(projection, memory)? + } else if let Some(child_projection) = input.downcast_ref::<ProjectionExec>() { + let maybe_unified = try_unifying_projections(projection, child_projection)?; + return if let Some(new_plan) = maybe_unified { + // To unify 3 or more sequential projections: + remove_unnecessary_projections(new_plan) + } else { + Ok(Transformed::No(plan)) + }; + } else if let Some(output_req) = input.downcast_ref::<OutputRequirementExec>() { + try_swapping_with_output_req(projection, output_req)? + } else if input.is::<CoalescePartitionsExec>() { + try_swapping_with_coalesce_partitions(projection)? + } else if let Some(filter) = input.downcast_ref::<FilterExec>() { + try_swapping_with_filter(projection, filter)? + } else if let Some(repartition) = input.downcast_ref::<RepartitionExec>() { + try_swapping_with_repartition(projection, repartition)? + } else if let Some(sort) = input.downcast_ref::<SortExec>() { + try_swapping_with_sort(projection, sort)? + } else if let Some(spm) = input.downcast_ref::<SortPreservingMergeExec>() { + try_swapping_with_sort_preserving_merge(projection, spm)? + } else if let Some(union) = input.downcast_ref::<UnionExec>() { + try_pushdown_through_union(projection, union)? + } else if let Some(hash_join) = input.downcast_ref::<HashJoinExec>() { + try_pushdown_through_hash_join(projection, hash_join)? + } else if let Some(cross_join) = input.downcast_ref::<CrossJoinExec>() { + try_swapping_with_cross_join(projection, cross_join)? + } else if let Some(nl_join) = input.downcast_ref::<NestedLoopJoinExec>() { + try_swapping_with_nested_loop_join(projection, nl_join)? + } else if let Some(sm_join) = input.downcast_ref::<SortMergeJoinExec>() { + try_swapping_with_sort_merge_join(projection, sm_join)? + } else if let Some(sym_join) = input.downcast_ref::<SymmetricHashJoinExec>() { Review Comment: So I guess this whole problem is: > How is a set of optimizer passes linked to a set of nodes, while both sets are extendible? I see the following rough solutions: # A: omniscient optimizer The optimizer knows all node types. This is what this PR does (and what many other passes do). This doesn't scale well. # B: omniscient nodes The nodes know all optimizer passes and implement them themselves. This kinda sounds like what @alamb proposes. This doesn't scale well. # C: registry-based linking Developers are aware of the which nodes can be optimized in which way and can fill out gaps in the optimizer-node matrix. The issue is mostly how this registry should be implemented. Rust has a bunch of crates for that that are all not great (due to the issue of the initialization order). Luckily we all know which node types are in a plan (because you could traverse the plan) so we could hook registry initialization in there. Something like: ```rust trait ExecutionPlan { fn register_hook_for_optimizer_pass(....); } ``` # D: abstraction This is what most other optimizer passes do: they read some abstract property of the node (like "schema", "num children", "output ordering", ...) and infer the correct behavior based on that. I think we could use something like this here as well. Namely if you would know what columns of an input schema are used by the node itself and which are just "pass-through", you could apply projection pushdown automatically. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
