This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new c312ffe7d9 Stop copying LogicalPlan and Exprs in  `TypeCoercion` (10% 
faster planning) (#10356)
c312ffe7d9 is described below

commit c312ffe7d954563888a303beb8796848d20ff7c6
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed May 15 15:58:15 2024 -0400

    Stop copying LogicalPlan and Exprs in  `TypeCoercion` (10% faster planning) 
(#10356)
    
    * Add `LogicalPlan::recompute_schema` for handling rewrite passes
    
    * Stop copying LogicalPlan and Exprs in  `TypeCoercion`
    
    * Apply suggestions from code review
    
    Co-authored-by: Oleks V <[email protected]>
    
    ---------
    
    Co-authored-by: Oleks V <[email protected]>
---
 datafusion/optimizer/src/analyzer/type_coercion.rs | 125 +++++++++++++++------
 1 file changed, 88 insertions(+), 37 deletions(-)

diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs 
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 60b81aff9a..0f1f3ba7e7 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
 use arrow::datatypes::{DataType, IntervalUnit};
 
 use datafusion_common::config::ConfigOptions;
-use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
+use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
 use datafusion_common::{
     exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, 
DFSchema,
     DataFusionError, Result, ScalarValue,
@@ -31,8 +31,8 @@ use datafusion_expr::expr::{
     self, AggregateFunctionDefinition, Between, BinaryExpr, Case, Exists, 
InList,
     InSubquery, Like, ScalarFunction, WindowFunction,
 };
-use datafusion_expr::expr_rewriter::rewrite_preserving_name;
 use datafusion_expr::expr_schema::cast_subquery;
+use datafusion_expr::logical_plan::tree_node::unwrap_arc;
 use datafusion_expr::logical_plan::Subquery;
 use datafusion_expr::type_coercion::binary::{
     comparison_coercion, get_input_types, like_coercion,
@@ -52,6 +52,7 @@ use datafusion_expr::{
 };
 
 use crate::analyzer::AnalyzerRule;
+use crate::utils::NamePreserver;
 
 #[derive(Default)]
 pub struct TypeCoercion {}
@@ -68,26 +69,28 @@ impl AnalyzerRule for TypeCoercion {
     }
 
     fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> 
Result<LogicalPlan> {
-        analyze_internal(&DFSchema::empty(), &plan)
+        let empty_schema = DFSchema::empty();
+
+        let transformed_plan = plan
+            .transform_up_with_subqueries(|plan| 
analyze_internal(&empty_schema, plan))?
+            .data;
+
+        Ok(transformed_plan)
     }
 }
 
+/// use the external schema to handle the correlated subqueries case
+///
+/// Assumes that children have already been optimized
 fn analyze_internal(
-    // use the external schema to handle the correlated subqueries case
     external_schema: &DFSchema,
-    plan: &LogicalPlan,
-) -> Result<LogicalPlan> {
-    // optimize child plans first
-    let new_inputs = plan
-        .inputs()
-        .iter()
-        .map(|p| analyze_internal(external_schema, p))
-        .collect::<Result<Vec<_>>>()?;
+    plan: LogicalPlan,
+) -> Result<Transformed<LogicalPlan>> {
     // get schema representing all available input fields. This is used for 
data type
     // resolution only, so order does not matter here
-    let mut schema = merge_schema(new_inputs.iter().collect());
+    let mut schema = merge_schema(plan.inputs());
 
-    if let LogicalPlan::TableScan(ts) = plan {
+    if let LogicalPlan::TableScan(ts) = &plan {
         let source_schema = DFSchema::try_from_qualified_schema(
             ts.table_name.clone(),
             &ts.source.schema(),
@@ -100,25 +103,75 @@ fn analyze_internal(
     // select t2.c2 from t1 where t1.c1 in (select t2.c1 from t2 where 
t2.c2=t1.c3)
     schema.merge(external_schema);
 
-    let mut expr_rewrite = TypeCoercionRewriter { schema: &schema };
-
-    let new_expr = plan
-        .expressions()
-        .into_iter()
-        .map(|expr| {
-            // ensure aggregate names don't change:
-            // https://github.com/apache/datafusion/issues/3555
-            rewrite_preserving_name(expr, &mut expr_rewrite)
-        })
-        .collect::<Result<Vec<_>>>()?;
-
-    plan.with_new_exprs(new_expr, new_inputs)
+    let mut expr_rewrite = TypeCoercionRewriter::new(&schema);
+
+    let name_preserver = NamePreserver::new(&plan);
+    // apply coercion rewrite all expressions in the plan individually
+    plan.map_expressions(|expr| {
+        let original_name = name_preserver.save(&expr)?;
+        expr.rewrite(&mut expr_rewrite)?
+            .map_data(|expr| original_name.restore(expr))
+    })?
+    // coerce join expressions specially
+    .map_data(|plan| expr_rewrite.coerce_joins(plan))?
+    // recompute the schema after the expressions have been rewritten as the 
types may have changed
+    .map_data(|plan| plan.recompute_schema())
 }
 
 pub(crate) struct TypeCoercionRewriter<'a> {
     pub(crate) schema: &'a DFSchema,
 }
 
+impl<'a> TypeCoercionRewriter<'a> {
+    fn new(schema: &'a DFSchema) -> Self {
+        Self { schema }
+    }
+
+    /// Coerce join equality expressions
+    ///
+    /// Joins must be treated specially as their equality expressions are 
stored
+    /// as a parallel list of left and right expressions, rather than a single
+    /// equality expression
+    ///
+    /// For example, on_exprs like `t1.a = t2.b AND t1.x = t2.y` will be stored
+    /// as a list of `(t1.a, t2.b), (t1.x, t2.y)`
+    fn coerce_joins(&mut self, plan: LogicalPlan) -> Result<LogicalPlan> {
+        let LogicalPlan::Join(mut join) = plan else {
+            return Ok(plan);
+        };
+
+        join.on = join
+            .on
+            .into_iter()
+            .map(|(lhs, rhs)| {
+                // coerce the arguments as though they were a single binary 
equality
+                // expression
+                let (lhs, rhs) = self.coerce_binary_op(lhs, Operator::Eq, 
rhs)?;
+                Ok((lhs, rhs))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        Ok(LogicalPlan::Join(join))
+    }
+
+    fn coerce_binary_op(
+        &self,
+        left: Expr,
+        op: Operator,
+        right: Expr,
+    ) -> Result<(Expr, Expr)> {
+        let (left_type, right_type) = get_input_types(
+            &left.get_type(self.schema)?,
+            &op,
+            &right.get_type(self.schema)?,
+        )?;
+        Ok((
+            left.cast_to(&left_type, self.schema)?,
+            right.cast_to(&right_type, self.schema)?,
+        ))
+    }
+}
+
 impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
     type Node = Expr;
 
@@ -131,14 +184,15 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
                 subquery,
                 outer_ref_columns,
             }) => {
-                let new_plan = analyze_internal(self.schema, &subquery)?;
+                let new_plan = analyze_internal(self.schema, 
unwrap_arc(subquery))?.data;
                 Ok(Transformed::yes(Expr::ScalarSubquery(Subquery {
                     subquery: Arc::new(new_plan),
                     outer_ref_columns,
                 })))
             }
             Expr::Exists(Exists { subquery, negated }) => {
-                let new_plan = analyze_internal(self.schema, 
&subquery.subquery)?;
+                let new_plan =
+                    analyze_internal(self.schema, 
unwrap_arc(subquery.subquery))?.data;
                 Ok(Transformed::yes(Expr::Exists(Exists {
                     subquery: Subquery {
                         subquery: Arc::new(new_plan),
@@ -152,7 +206,8 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
                 subquery,
                 negated,
             }) => {
-                let new_plan = analyze_internal(self.schema, 
&subquery.subquery)?;
+                let new_plan =
+                    analyze_internal(self.schema, 
unwrap_arc(subquery.subquery))?.data;
                 let expr_type = expr.get_type(self.schema)?;
                 let subquery_type = new_plan.schema().field(0).data_type();
                 let common_type = comparison_coercion(&expr_type, 
subquery_type).ok_or(plan_datafusion_err!(
@@ -221,15 +276,11 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
                 ))))
             }
             Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
-                let (left_type, right_type) = get_input_types(
-                    &left.get_type(self.schema)?,
-                    &op,
-                    &right.get_type(self.schema)?,
-                )?;
+                let (left, right) = self.coerce_binary_op(*left, op, *right)?;
                 Ok(Transformed::yes(Expr::BinaryExpr(BinaryExpr::new(
-                    Box::new(left.cast_to(&left_type, self.schema)?),
+                    Box::new(left),
                     op,
-                    Box::new(right.cast_to(&right_type, self.schema)?),
+                    Box::new(right),
                 ))))
             }
             Expr::Between(Between {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to