crepererum commented on code in PR #8073:
URL: https://github.com/apache/arrow-datafusion/pull/8073#discussion_r1385200589


##########
datafusion/core/src/physical_optimizer/projection_pushdown.rs:
##########
@@ -0,0 +1,2181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This file implements the `ProjectionPushdown` physical optimization rule.
+//! The function [`remove_unnecessary_projections`] tries to push down all
+//! projections one by one if the operator below is amenable to this. If a
+//! projection reaches a source, it can even dissappear from the plan entirely.
+
+use super::output_requirements::OutputRequirementExec;
+use super::PhysicalOptimizerRule;
+use crate::datasource::physical_plan::CsvExec;
+use crate::error::Result;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::filter::FilterExec;
+use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
+use crate::physical_plan::joins::{
+    CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec,
+    SymmetricHashJoinExec,
+};
+use crate::physical_plan::memory::MemoryExec;
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::sorts::sort::SortExec;
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::{Distribution, ExecutionPlan};
+
+use arrow_schema::SchemaRef;
+
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_common::JoinSide;
+use datafusion_physical_expr::expressions::{
+    BinaryExpr, CaseExpr, CastExpr, Column, Literal, NegativeExpr,
+};
+use datafusion_physical_expr::{
+    Partitioning, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
+    ScalarFunctionExpr,
+};
+use datafusion_physical_plan::union::UnionExec;
+
+use itertools::Itertools;
+use std::sync::Arc;
+
+/// This rule inspects [`ProjectionExec`]'s in the given physical plan and 
tries to
+/// remove or swap with its child.
+#[derive(Default)]
+pub struct ProjectionPushdown {}
+
+impl ProjectionPushdown {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for ProjectionPushdown {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        plan.transform_down(&remove_unnecessary_projections)
+    }
+
+    fn name(&self) -> &str {
+        "ProjectionPushdown"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+/// This function checks if `plan` is a [`ProjectionExec`], and inspects its
+/// input(s) to test whether it can push `plan` under its input(s). This 
function
+/// will operate on the entire tree and may ultimately remove `plan` entirely
+/// by leveraging source providers with built-in projection capabilities.
+pub fn remove_unnecessary_projections(
+    plan: Arc<dyn ExecutionPlan>,
+) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
+    let maybe_modified = if let Some(projection) =
+        plan.as_any().downcast_ref::<ProjectionExec>()
+    {
+        // If the projection does not cause any change on the input, we can
+        // safely remove it:
+        if is_projection_removable(projection) {
+            return Ok(Transformed::Yes(projection.input().clone()));
+        }
+        // If it does, check if we can push it under its child(ren):
+        let input = projection.input().as_any();
+        if let Some(csv) = input.downcast_ref::<CsvExec>() {
+            try_swapping_with_csv(projection, csv)
+        } else if let Some(memory) = input.downcast_ref::<MemoryExec>() {
+            try_swapping_with_memory(projection, memory)?
+        } else if let Some(child_projection) = 
input.downcast_ref::<ProjectionExec>() {
+            let maybe_unified = try_unifying_projections(projection, 
child_projection)?;
+            return if let Some(new_plan) = maybe_unified {
+                // To unify 3 or more sequential projections:
+                remove_unnecessary_projections(new_plan)
+            } else {
+                Ok(Transformed::No(plan))
+            };
+        } else if let Some(output_req) = 
input.downcast_ref::<OutputRequirementExec>() {
+            try_swapping_with_output_req(projection, output_req)?
+        } else if input.is::<CoalescePartitionsExec>() {
+            try_swapping_with_coalesce_partitions(projection)?
+        } else if let Some(filter) = input.downcast_ref::<FilterExec>() {
+            try_swapping_with_filter(projection, filter)?
+        } else if let Some(repartition) = 
input.downcast_ref::<RepartitionExec>() {
+            try_swapping_with_repartition(projection, repartition)?
+        } else if let Some(sort) = input.downcast_ref::<SortExec>() {
+            try_swapping_with_sort(projection, sort)?
+        } else if let Some(spm) = 
input.downcast_ref::<SortPreservingMergeExec>() {
+            try_swapping_with_sort_preserving_merge(projection, spm)?
+        } else if let Some(union) = input.downcast_ref::<UnionExec>() {
+            try_pushdown_through_union(projection, union)?
+        } else if let Some(hash_join) = input.downcast_ref::<HashJoinExec>() {
+            try_pushdown_through_hash_join(projection, hash_join)?
+        } else if let Some(cross_join) = input.downcast_ref::<CrossJoinExec>() 
{
+            try_swapping_with_cross_join(projection, cross_join)?
+        } else if let Some(nl_join) = 
input.downcast_ref::<NestedLoopJoinExec>() {
+            try_swapping_with_nested_loop_join(projection, nl_join)?
+        } else if let Some(sm_join) = 
input.downcast_ref::<SortMergeJoinExec>() {
+            try_swapping_with_sort_merge_join(projection, sm_join)?
+        } else if let Some(sym_join) = 
input.downcast_ref::<SymmetricHashJoinExec>() {

Review Comment:
   could we make this registry-based so that custom execs could also profit 
from this pass?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to