This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 32b63ff95a Stop copying LogicalPlan and Exprs in 
`ScalarSubqueryToJoin` (#10489)
32b63ff95a is described below

commit 32b63ff95a8c2cc2d56be0b265d8a13e0ca42f96
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu May 16 20:09:00 2024 -0400

    Stop copying LogicalPlan and Exprs in `ScalarSubqueryToJoin` (#10489)
---
 .../optimizer/src/scalar_subquery_to_join.rs       | 83 ++++++++++++++--------
 1 file changed, 54 insertions(+), 29 deletions(-)

diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs 
b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index b7fce68fb3..71692b9345 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -29,7 +29,7 @@ use datafusion_common::alias::AliasGenerator;
 use datafusion_common::tree_node::{
     Transformed, TransformedResult, TreeNode, TreeNodeRecursion, 
TreeNodeRewriter,
 };
-use datafusion_common::{plan_err, Column, Result, ScalarValue};
+use datafusion_common::{internal_err, plan_err, Column, Result, ScalarValue};
 use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
 use datafusion_expr::logical_plan::{JoinType, Subquery};
 use datafusion_expr::utils::conjunction;
@@ -50,7 +50,7 @@ impl ScalarSubqueryToJoin {
     /// # Arguments
     /// * `predicate` - A conjunction to split and search
     ///
-    /// Returns a tuple (subqueries, rewrite expression)
+    /// Returns a tuple (subqueries, alias)
     fn extract_subquery_exprs(
         &self,
         predicate: &Expr,
@@ -71,19 +71,36 @@ impl ScalarSubqueryToJoin {
 impl OptimizerRule for ScalarSubqueryToJoin {
     fn try_optimize(
         &self,
-        plan: &LogicalPlan,
-        config: &dyn OptimizerConfig,
+        _plan: &LogicalPlan,
+        _config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
+        internal_err!("Should have called ScalarSubqueryToJoin::rewrite")
+    }
+
+    fn supports_rewrite(&self) -> bool {
+        true
+    }
+
+    fn rewrite(
+        &self,
+        plan: LogicalPlan,
+        config: &dyn OptimizerConfig,
+    ) -> Result<Transformed<LogicalPlan>> {
         match plan {
             LogicalPlan::Filter(filter) => {
+                // Optimization: skip the rest of the rule and its copies if
+                // there are no scalar subqueries
+                if !contains_scalar_subquery(&filter.predicate) {
+                    return Ok(Transformed::no(LogicalPlan::Filter(filter)));
+                }
+
                 let (subqueries, mut rewrite_expr) = 
self.extract_subquery_exprs(
                     &filter.predicate,
                     config.alias_generator(),
                 )?;
 
                 if subqueries.is_empty() {
-                    // regular filter, no subquery exists clause here
-                    return Ok(None);
+                    return internal_err!("Expected subqueries not found in 
filter");
                 }
 
                 // iterate through all subqueries in predicate, turning each 
into a left join
@@ -94,16 +111,13 @@ impl OptimizerRule for ScalarSubqueryToJoin {
                     {
                         if !expr_check_map.is_empty() {
                             rewrite_expr = rewrite_expr
-                                .clone()
                                 .transform_up(|expr| {
-                                    if let Expr::Column(col) = &expr {
-                                        if let Some(map_expr) =
-                                            expr_check_map.get(&col.name)
-                                        {
-                                            
Ok(Transformed::yes(map_expr.clone()))
-                                        } else {
-                                            Ok(Transformed::no(expr))
-                                        }
+                                    // replace column references with entry in 
map, if it exists
+                                    if let Some(map_expr) = expr
+                                        .try_as_col()
+                                        .and_then(|col| 
expr_check_map.get(&col.name))
+                                    {
+                                        Ok(Transformed::yes(map_expr.clone()))
                                     } else {
                                         Ok(Transformed::no(expr))
                                     }
@@ -113,15 +127,21 @@ impl OptimizerRule for ScalarSubqueryToJoin {
                         cur_input = optimized_subquery;
                     } else {
                         // if we can't handle all of the subqueries then bail 
for now
-                        return Ok(None);
+                        return 
Ok(Transformed::no(LogicalPlan::Filter(filter)));
                     }
                 }
                 let new_plan = LogicalPlanBuilder::from(cur_input)
                     .filter(rewrite_expr)?
                     .build()?;
-                Ok(Some(new_plan))
+                Ok(Transformed::yes(new_plan))
             }
             LogicalPlan::Projection(projection) => {
+                // Optimization: skip the rest of the rule and its copies if
+                // there are no scalar subqueries
+                if !projection.expr.iter().any(contains_scalar_subquery) {
+                    return 
Ok(Transformed::no(LogicalPlan::Projection(projection)));
+                }
+
                 let mut all_subqueryies = vec![];
                 let mut expr_to_rewrite_expr_map = HashMap::new();
                 let mut subquery_to_expr_map = HashMap::new();
@@ -135,8 +155,7 @@ impl OptimizerRule for ScalarSubqueryToJoin {
                     expr_to_rewrite_expr_map.insert(expr, rewrite_exprs);
                 }
                 if all_subqueryies.is_empty() {
-                    // regular projection, no subquery exists clause here
-                    return Ok(None);
+                    return internal_err!("Expected subqueries not found in 
projection");
                 }
                 // iterate through all subqueries in predicate, turning each 
into a left join
                 let mut cur_input = projection.input.as_ref().clone();
@@ -153,14 +172,13 @@ impl OptimizerRule for ScalarSubqueryToJoin {
                                     let new_expr = rewrite_expr
                                         .clone()
                                         .transform_up(|expr| {
-                                            if let Expr::Column(col) = &expr {
-                                                if let Some(map_expr) =
+                                            // replace column references with 
entry in map, if it exists
+                                            if let Some(map_expr) =
+                                                
expr.try_as_col().and_then(|col| {
                                                     
expr_check_map.get(&col.name)
-                                                {
-                                                    
Ok(Transformed::yes(map_expr.clone()))
-                                                } else {
-                                                    Ok(Transformed::no(expr))
-                                                }
+                                                })
+                                            {
+                                                
Ok(Transformed::yes(map_expr.clone()))
                                             } else {
                                                 Ok(Transformed::no(expr))
                                             }
@@ -172,7 +190,7 @@ impl OptimizerRule for ScalarSubqueryToJoin {
                         }
                     } else {
                         // if we can't handle all of the subqueries then bail 
for now
-                        return Ok(None);
+                        return 
Ok(Transformed::no(LogicalPlan::Projection(projection)));
                     }
                 }
 
@@ -190,10 +208,10 @@ impl OptimizerRule for ScalarSubqueryToJoin {
                 let new_plan = LogicalPlanBuilder::from(cur_input)
                     .project(proj_exprs)?
                     .build()?;
-                Ok(Some(new_plan))
+                Ok(Transformed::yes(new_plan))
             }
 
-            _ => Ok(None),
+            plan => Ok(Transformed::no(plan)),
         }
     }
 
@@ -206,6 +224,13 @@ impl OptimizerRule for ScalarSubqueryToJoin {
     }
 }
 
+/// Returns true if the expression has a scalar subquery somewhere in it
+/// false otherwise
+fn contains_scalar_subquery(expr: &Expr) -> bool {
+    expr.exists(|expr| Ok(matches!(expr, Expr::ScalarSubquery(_))))
+        .expect("Inner is always Ok")
+}
+
 struct ExtractScalarSubQuery {
     sub_query_info: Vec<(Subquery, String)>,
     alias_gen: Arc<AliasGenerator>,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to