This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new dd8a5d32d9 Fix tpcds planning stack overflows - Join planning 
refactoring (#9962)
dd8a5d32d9 is described below

commit dd8a5d32d98da198c4601364ec45d3fe94a7a2f2
Author: Jeffrey Vo <[email protected]>
AuthorDate: Sat Apr 6 07:57:14 2024 +1100

    Fix tpcds planning stack overflows - Join planning refactoring (#9962)
    
    * Extract local variables in physical join planning to function
    
    * Comments
---
 .github/actions/setup-rust-runtime/action.yaml |   3 -
 datafusion/core/src/physical_planner.rs        | 110 ++++++++++++-------------
 2 files changed, 54 insertions(+), 59 deletions(-)

diff --git a/.github/actions/setup-rust-runtime/action.yaml 
b/.github/actions/setup-rust-runtime/action.yaml
index 27cdf9b974..1d814055ae 100644
--- a/.github/actions/setup-rust-runtime/action.yaml
+++ b/.github/actions/setup-rust-runtime/action.yaml
@@ -30,12 +30,9 @@ runs:
       # 
       # Set debuginfo=line-tables-only as debuginfo=0 causes immensely slow 
build
       # See for more details: https://github.com/rust-lang/rust/issues/119560
-      #
-      # set RUST_MIN_STACK to avoid rust stack overflows on tpc-ds tests
       run: |
         echo "RUSTC_WRAPPER=sccache" >> $GITHUB_ENV  
         echo "SCCACHE_GHA_ENABLED=true" >> $GITHUB_ENV 
         echo "RUST_BACKTRACE=1" >> $GITHUB_ENV
-        echo "RUST_MIN_STACK=3000000" >> $GITHUB_ENV
         echo "RUSTFLAGS=-C debuginfo=line-tables-only -C incremental=false" >> 
$GITHUB_ENV
      
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index c1067d75ed..c25523c5ae 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -972,67 +972,17 @@ impl DefaultPhysicalPlanner {
                 }) => {
                     let null_equals_null = *null_equals_null;
 
-                    // If join has expression equijoin keys, add physical 
projecton.
+                    // If join has expression equijoin keys, add physical 
projection.
                     let has_expr_join_key = keys.iter().any(|(l, r)| {
                         !(matches!(l, Expr::Column(_))
                             && matches!(r, Expr::Column(_)))
                     });
                     if has_expr_join_key {
-                        let left_keys = keys
-                            .iter()
-                            .map(|(l, _r)| l)
-                            .cloned()
-                            .collect::<Vec<_>>();
-                        let right_keys = keys
-                            .iter()
-                            .map(|(_l, r)| r)
-                            .cloned()
-                            .collect::<Vec<_>>();
-                        let (left, right, column_on, added_project) = {
-                            let (left, left_col_keys, left_projected) =
-                                wrap_projection_for_join_if_necessary(
-                                    left_keys.as_slice(),
-                                    left.as_ref().clone(),
-                                )?;
-                            let (right, right_col_keys, right_projected) =
-                                wrap_projection_for_join_if_necessary(
-                                    &right_keys,
-                                    right.as_ref().clone(),
-                                )?;
-                            (
-                                left,
-                                right,
-                                (left_col_keys, right_col_keys),
-                                left_projected || right_projected,
-                            )
-                        };
-
-                        let join_plan =
-                            LogicalPlan::Join(Join::try_new_with_project_input(
-                                logical_plan,
-                                Arc::new(left),
-                                Arc::new(right),
-                                column_on,
-                            )?);
-
-                        // Remove temporary projected columns
-                        let join_plan = if added_project {
-                            let final_join_result = join_schema
-                                .iter()
-                                .map(|(qualifier, field)| {
-                                    
Expr::Column(datafusion_common::Column::from((qualifier, field.as_ref())))
-                                })
-                                .collect::<Vec<_>>();
-                            let projection =
-                                Projection::try_new(
-                                    final_join_result,
-                                    Arc::new(join_plan),
-                                )?;
-                            LogicalPlan::Projection(projection)
-                        } else {
-                            join_plan
-                        };
-
+                        // Logic extracted into a function here as subsequent 
recursive create_initial_plan()
+                        // call can cause a stack overflow for a large number 
of joins.
+                        //
+                        // See #9962 and #1047 for detailed explanation.
+                        let join_plan = 
project_expr_join_keys(keys,left,right,logical_plan,join_schema)?;
                         return self
                             .create_initial_plan(&join_plan, session_state)
                             .await;
@@ -2002,6 +1952,54 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> 
Result<(T, R)> {
     }
 }
 
+/// Adding physical projection to join if has expression equijoin keys.
+fn project_expr_join_keys(
+    keys: &[(Expr, Expr)],
+    left: &Arc<LogicalPlan>,
+    right: &Arc<LogicalPlan>,
+    logical_plan: &LogicalPlan,
+    join_schema: &Arc<DFSchema>,
+) -> Result<LogicalPlan> {
+    let left_keys = keys.iter().map(|(l, _r)| l).cloned().collect::<Vec<_>>();
+    let right_keys = keys.iter().map(|(_l, r)| r).cloned().collect::<Vec<_>>();
+    let (left, right, column_on, added_project) = {
+        let (left, left_col_keys, left_projected) =
+            wrap_projection_for_join_if_necessary(
+                left_keys.as_slice(),
+                left.as_ref().clone(),
+            )?;
+        let (right, right_col_keys, right_projected) =
+            wrap_projection_for_join_if_necessary(&right_keys, 
right.as_ref().clone())?;
+        (
+            left,
+            right,
+            (left_col_keys, right_col_keys),
+            left_projected || right_projected,
+        )
+    };
+
+    let join_plan = LogicalPlan::Join(Join::try_new_with_project_input(
+        logical_plan,
+        Arc::new(left),
+        Arc::new(right),
+        column_on,
+    )?);
+
+    // Remove temporary projected columns
+    if added_project {
+        let final_join_result = join_schema
+            .iter()
+            .map(|(qualifier, field)| {
+                Expr::Column(datafusion_common::Column::from((qualifier, 
field.as_ref())))
+            })
+            .collect::<Vec<_>>();
+        let projection = Projection::try_new(final_join_result, 
Arc::new(join_plan))?;
+        Ok(LogicalPlan::Projection(projection))
+    } else {
+        Ok(join_plan)
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use std::any::Any;

Reply via email to