This is an automated email from the ASF dual-hosted git repository.

jakevin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a0542acbc refactor: with_inputs() can use original schema to avoid 
recompute schema. (#7069)
1a0542acbc is described below

commit 1a0542acbc01e5243471ae0fc3586c2f1f40013b
Author: jakevin <[email protected]>
AuthorDate: Tue Jul 25 10:24:00 2023 +0800

    refactor: with_inputs() can use original schema to avoid recompute schema. 
(#7069)
    
    * minor: with_inputs() can use original schema.
    
    * remove useless code
---
 datafusion/core/src/physical_planner.rs     |  5 ++-
 datafusion/expr/src/logical_plan/builder.rs |  9 ++---
 datafusion/expr/src/logical_plan/plan.rs    | 54 +++++++++++++++++------------
 3 files changed, 36 insertions(+), 32 deletions(-)

diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index c0e60008bc..913deebddf 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -69,7 +69,7 @@ use datafusion_expr::expr::{
 };
 use datafusion_expr::expr_rewriter::{unalias, unnormalize_cols};
 use 
datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
-use datafusion_expr::{logical_plan, DmlStatement, StringifiedPlan, WriteOp};
+use datafusion_expr::{DmlStatement, StringifiedPlan, WriteOp};
 use datafusion_expr::{WindowFrame, WindowFrameBound};
 use datafusion_physical_expr::expressions::Literal;
 use datafusion_sql::utils::window_expr_common_partition_keys;
@@ -942,10 +942,9 @@ impl DefaultPhysicalPlanner {
                                 })
                                 .collect::<Vec<_>>();
                             let projection =
-                                logical_plan::Projection::try_new_with_schema(
+                                Projection::try_new(
                                     final_join_result,
                                     Arc::new(join_plan),
-                                    join_schema.clone(),
                                 )?;
                             LogicalPlan::Projection(projection)
                         } else {
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index 9ddf6231c5..cd8940e134 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -24,7 +24,7 @@ use crate::expr_rewriter::{
     rewrite_sort_cols_by_aggs,
 };
 use crate::type_coercion::binary::comparison_coercion;
-use crate::utils::{columnize_expr, compare_sort_expr, exprlist_to_fields};
+use crate::utils::{columnize_expr, compare_sort_expr};
 use crate::{and, binary_expr, DmlStatement, Operator, WriteOp};
 use crate::{
     logical_plan::{
@@ -1239,15 +1239,10 @@ pub fn project(
         }
     }
     validate_unique_names("Projections", projected_expr.iter())?;
-    let input_schema = DFSchema::new_with_metadata(
-        exprlist_to_fields(&projected_expr, &plan)?,
-        plan.schema().metadata().clone(),
-    )?;
 
-    Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+    Ok(LogicalPlan::Projection(Projection::try_new(
         projected_expr,
         Arc::new(plan.clone()),
-        DFSchemaRef::new(input_schema),
     )?))
 }
 
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 0485973364..be8270ddc3 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -482,7 +482,38 @@ impl LogicalPlan {
     }
 
     pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) -> 
Result<LogicalPlan> {
-        from_plan(self, &self.expressions(), inputs)
+        // with_new_inputs use original expression,
+        // so we don't need to recompute Schema.
+        match &self {
+            LogicalPlan::Projection(projection) => {
+                Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+                    projection.expr.to_vec(),
+                    Arc::new(inputs[0].clone()),
+                    projection.schema.clone(),
+                )?))
+            }
+            LogicalPlan::Window(Window {
+                window_expr,
+                schema,
+                ..
+            }) => Ok(LogicalPlan::Window(Window {
+                input: Arc::new(inputs[0].clone()),
+                window_expr: window_expr.to_vec(),
+                schema: schema.clone(),
+            })),
+            LogicalPlan::Aggregate(Aggregate {
+                group_expr,
+                aggr_expr,
+                schema,
+                ..
+            }) => Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
+                Arc::new(inputs[0].clone()),
+                group_expr.to_vec(),
+                aggr_expr.to_vec(),
+                schema.clone(),
+            )?)),
+            _ => from_plan(self, &self.expressions(), inputs),
+        }
     }
 
     /// Convert a prepared [`LogicalPlan`] into its inner logical plan
@@ -1290,13 +1321,6 @@ impl Projection {
             schema,
         }
     }
-
-    pub fn try_from_plan(plan: &LogicalPlan) -> Result<&Projection> {
-        match plan {
-            LogicalPlan::Projection(it) => Ok(it),
-            _ => plan_err!("Could not coerce into Projection!"),
-        }
-    }
 }
 
 /// Aliased subquery
@@ -1375,13 +1399,6 @@ impl Filter {
 
         Ok(Self { predicate, input })
     }
-
-    pub fn try_from_plan(plan: &LogicalPlan) -> Result<&Filter> {
-        match plan {
-            LogicalPlan::Filter(it) => Ok(it),
-            _ => plan_err!("Could not coerce into Filter!"),
-        }
-    }
 }
 
 /// Window its input based on a set of window spec and window function (e.g. 
SUM or RANK)
@@ -1632,13 +1649,6 @@ impl Aggregate {
             schema,
         })
     }
-
-    pub fn try_from_plan(plan: &LogicalPlan) -> Result<&Aggregate> {
-        match plan {
-            LogicalPlan::Aggregate(it) => Ok(it),
-            _ => plan_err!("Could not coerce into Aggregate!"),
-        }
-    }
 }
 
 /// Sorts its input according to a list of sort expressions.

Reply via email to