This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new dd8a5d32d9 Fix tpcds planning stack overflows - Join planning
refactoring (#9962)
dd8a5d32d9 is described below
commit dd8a5d32d98da198c4601364ec45d3fe94a7a2f2
Author: Jeffrey Vo <[email protected]>
AuthorDate: Sat Apr 6 07:57:14 2024 +1100
Fix tpcds planning stack overflows - Join planning refactoring (#9962)
* Extract local variables in physical join planning to function
* Comments
---
.github/actions/setup-rust-runtime/action.yaml | 3 -
datafusion/core/src/physical_planner.rs | 110 ++++++++++++-------------
2 files changed, 54 insertions(+), 59 deletions(-)
diff --git a/.github/actions/setup-rust-runtime/action.yaml
b/.github/actions/setup-rust-runtime/action.yaml
index 27cdf9b974..1d814055ae 100644
--- a/.github/actions/setup-rust-runtime/action.yaml
+++ b/.github/actions/setup-rust-runtime/action.yaml
@@ -30,12 +30,9 @@ runs:
#
# Set debuginfo=line-tables-only as debuginfo=0 causes immensely slow
build
# See for more details: https://github.com/rust-lang/rust/issues/119560
- #
- # set RUST_MIN_STACK to avoid rust stack overflows on tpc-ds tests
run: |
echo "RUSTC_WRAPPER=sccache" >> $GITHUB_ENV
echo "SCCACHE_GHA_ENABLED=true" >> $GITHUB_ENV
echo "RUST_BACKTRACE=1" >> $GITHUB_ENV
- echo "RUST_MIN_STACK=3000000" >> $GITHUB_ENV
echo "RUSTFLAGS=-C debuginfo=line-tables-only -C incremental=false" >>
$GITHUB_ENV
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index c1067d75ed..c25523c5ae 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -972,67 +972,17 @@ impl DefaultPhysicalPlanner {
}) => {
let null_equals_null = *null_equals_null;
- // If join has expression equijoin keys, add physical
projecton.
+ // If join has expression equijoin keys, add physical
projection.
let has_expr_join_key = keys.iter().any(|(l, r)| {
!(matches!(l, Expr::Column(_))
&& matches!(r, Expr::Column(_)))
});
if has_expr_join_key {
- let left_keys = keys
- .iter()
- .map(|(l, _r)| l)
- .cloned()
- .collect::<Vec<_>>();
- let right_keys = keys
- .iter()
- .map(|(_l, r)| r)
- .cloned()
- .collect::<Vec<_>>();
- let (left, right, column_on, added_project) = {
- let (left, left_col_keys, left_projected) =
- wrap_projection_for_join_if_necessary(
- left_keys.as_slice(),
- left.as_ref().clone(),
- )?;
- let (right, right_col_keys, right_projected) =
- wrap_projection_for_join_if_necessary(
- &right_keys,
- right.as_ref().clone(),
- )?;
- (
- left,
- right,
- (left_col_keys, right_col_keys),
- left_projected || right_projected,
- )
- };
-
- let join_plan =
- LogicalPlan::Join(Join::try_new_with_project_input(
- logical_plan,
- Arc::new(left),
- Arc::new(right),
- column_on,
- )?);
-
- // Remove temporary projected columns
- let join_plan = if added_project {
- let final_join_result = join_schema
- .iter()
- .map(|(qualifier, field)| {
-
Expr::Column(datafusion_common::Column::from((qualifier, field.as_ref())))
- })
- .collect::<Vec<_>>();
- let projection =
- Projection::try_new(
- final_join_result,
- Arc::new(join_plan),
- )?;
- LogicalPlan::Projection(projection)
- } else {
- join_plan
- };
-
+ // Logic extracted into a function here as subsequent
recursive create_initial_plan()
+ // call can cause a stack overflow for a large number
of joins.
+ //
+ // See #9962 and #1047 for detailed explanation.
+ let join_plan =
project_expr_join_keys(keys,left,right,logical_plan,join_schema)?;
return self
.create_initial_plan(&join_plan, session_state)
.await;
@@ -2002,6 +1952,54 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) ->
Result<(T, R)> {
}
}
+/// Adding physical projection to join if has expression equijoin keys.
+fn project_expr_join_keys(
+ keys: &[(Expr, Expr)],
+ left: &Arc<LogicalPlan>,
+ right: &Arc<LogicalPlan>,
+ logical_plan: &LogicalPlan,
+ join_schema: &Arc<DFSchema>,
+) -> Result<LogicalPlan> {
+ let left_keys = keys.iter().map(|(l, _r)| l).cloned().collect::<Vec<_>>();
+ let right_keys = keys.iter().map(|(_l, r)| r).cloned().collect::<Vec<_>>();
+ let (left, right, column_on, added_project) = {
+ let (left, left_col_keys, left_projected) =
+ wrap_projection_for_join_if_necessary(
+ left_keys.as_slice(),
+ left.as_ref().clone(),
+ )?;
+ let (right, right_col_keys, right_projected) =
+ wrap_projection_for_join_if_necessary(&right_keys,
right.as_ref().clone())?;
+ (
+ left,
+ right,
+ (left_col_keys, right_col_keys),
+ left_projected || right_projected,
+ )
+ };
+
+ let join_plan = LogicalPlan::Join(Join::try_new_with_project_input(
+ logical_plan,
+ Arc::new(left),
+ Arc::new(right),
+ column_on,
+ )?);
+
+ // Remove temporary projected columns
+ if added_project {
+ let final_join_result = join_schema
+ .iter()
+ .map(|(qualifier, field)| {
+ Expr::Column(datafusion_common::Column::from((qualifier,
field.as_ref())))
+ })
+ .collect::<Vec<_>>();
+ let projection = Projection::try_new(final_join_result,
Arc::new(join_plan))?;
+ Ok(LogicalPlan::Projection(projection))
+ } else {
+ Ok(join_plan)
+ }
+}
+
#[cfg(test)]
mod tests {
use std::any::Any;