This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 9fb8eaeb3f Dataframe with_column and with_column_renamed performance improvements (#14653) 9fb8eaeb3f is described below commit 9fb8eaeb3f027c963a603cd31db206e879f217f6 Author: Bruce Ritchie <bruce.ritc...@veeva.com> AuthorDate: Thu Feb 27 06:25:10 2025 -0500 Dataframe with_column and with_column_renamed performance improvements (#14653) * POC for with_column improvements. * Updates. Assumptions are still not valid here. * Added flag to indicate whether it is safe to project without validation or not. * Updated documentation for DataFrame.projection_requires_validation field. * project_with_validation is no longer public. --- datafusion/core/benches/dataframe.rs | 6 +-- datafusion/core/src/dataframe/mod.rs | 67 ++++++++++++++++++++++++----- datafusion/core/src/dataframe/parquet.rs | 1 + datafusion/expr/src/logical_plan/builder.rs | 33 +++++++++++++- 4 files changed, 91 insertions(+), 16 deletions(-) diff --git a/datafusion/core/benches/dataframe.rs b/datafusion/core/benches/dataframe.rs index 087764883a..03078e05e1 100644 --- a/datafusion/core/benches/dataframe.rs +++ b/datafusion/core/benches/dataframe.rs @@ -56,8 +56,7 @@ fn run(column_count: u32, ctx: Arc<SessionContext>) { data_frame = data_frame .with_column_renamed(field_name, new_field_name) - .unwrap(); - data_frame = data_frame + .unwrap() .with_column(new_field_name, btrim(vec![col(new_field_name)])) .unwrap(); } @@ -68,8 +67,7 @@ fn run(column_count: u32, ctx: Arc<SessionContext>) { } fn criterion_benchmark(c: &mut Criterion) { - // 500 takes far too long right now - for column_count in [10, 100, 200 /* 500 */] { + for column_count in [10, 100, 200, 500] { let ctx = create_context(column_count).unwrap(); c.bench_function(&format!("with_column_{column_count}"), |b| { diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index d2aee0a161..e998e489a9 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -188,6 +188,22 @@ pub struct DataFrame { // Box the (large) SessionState to reduce the size of DataFrame on the stack session_state: Box<SessionState>, plan: LogicalPlan, + // Whether projection ops can skip validation or not. This flag if false + // allows for an optimization in `with_column` and `with_column_renamed` functions + // where the recursive work required to columnize and normalize expressions can + // be skipped if set to false. Since these function calls are often chained or + // called many times in dataframe operations this can result in a significant + // performance gain. + // + // The conditions where this can be set to false is when the dataframe function + // call results in the last operation being a + // `LogicalPlanBuilder::from(plan).project(fields)?.build()` or + // `LogicalPlanBuilder::from(plan).project_with_validation(fields)?.build()` + // call. This requirement guarantees that the plan has had all columnization + // and normalization applied to existing expressions and only new expressions + // will require that work. Any operation that update the plan in any way + // via anything other than a `project` call should set this to true. + projection_requires_validation: bool, } impl DataFrame { @@ -200,6 +216,7 @@ impl DataFrame { Self { session_state: Box::new(session_state), plan, + projection_requires_validation: true, } } @@ -337,6 +354,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan: project_plan, + projection_requires_validation: false, }) } @@ -442,6 +460,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -482,6 +501,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -555,6 +575,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: !is_grouping_set, }) } @@ -567,6 +588,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -605,6 +627,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, }) } @@ -642,6 +665,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -680,6 +704,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -711,6 +736,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -752,6 +778,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -952,6 +979,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, }) } @@ -1001,6 +1029,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, }) } @@ -1068,6 +1097,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -1127,6 +1157,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -1162,6 +1193,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -1433,6 +1465,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, }) } @@ -1485,6 +1518,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -1520,6 +1554,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -1565,6 +1600,7 @@ impl DataFrame { DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, } .collect() .await @@ -1634,6 +1670,7 @@ impl DataFrame { DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, } .collect() .await @@ -1703,12 +1740,13 @@ impl DataFrame { DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, } .collect() .await } - /// Add an additional column to the DataFrame. + /// Add or replace a column in the DataFrame. /// /// # Example /// ``` @@ -1736,33 +1774,36 @@ impl DataFrame { let mut col_exists = false; let new_column = expr.alias(name); - let mut fields: Vec<Expr> = plan + let mut fields: Vec<(Expr, bool)> = plan .schema() .iter() .filter_map(|(qualifier, field)| { if field.name() == name { col_exists = true; - Some(new_column.clone()) + Some((new_column.clone(), true)) } else { let e = col(Column::from((qualifier, field))); window_fn_str .as_ref() .filter(|s| *s == &e.to_string()) .is_none() - .then_some(e) + .then_some((e, self.projection_requires_validation)) } }) .collect(); if !col_exists { - fields.push(new_column); + fields.push((new_column, true)); } - let project_plan = LogicalPlanBuilder::from(plan).project(fields)?.build()?; + let project_plan = LogicalPlanBuilder::from(plan) + .project_with_validation(fields)? + .build()?; Ok(DataFrame { session_state: self.session_state, plan: project_plan, + projection_requires_validation: false, }) } @@ -1819,19 +1860,23 @@ impl DataFrame { .iter() .map(|(qualifier, field)| { if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename { - col(Column::from((qualifier, field))) - .alias_qualified(qualifier.cloned(), new_name) + ( + col(Column::from((qualifier, field))) + .alias_qualified(qualifier.cloned(), new_name), + false, + ) } else { - col(Column::from((qualifier, field))) + (col(Column::from((qualifier, field))), false) } }) .collect::<Vec<_>>(); let project_plan = LogicalPlanBuilder::from(self.plan) - .project(projection)? + .project_with_validation(projection)? .build()?; Ok(DataFrame { session_state: self.session_state, plan: project_plan, + projection_requires_validation: false, }) } @@ -1897,6 +1942,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, }) } @@ -1932,6 +1978,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, }) } diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 1dd4d68fca..1bb5444ca0 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -93,6 +93,7 @@ impl DataFrame { DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, } .collect() .await diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 4d825c6bfe..2bb15da218 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -528,6 +528,15 @@ impl LogicalPlanBuilder { project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new) } + /// Apply a projection without alias with optional validation + /// (true to validate, false to not validate) + pub fn project_with_validation( + self, + expr: Vec<(impl Into<Expr>, bool)>, + ) -> Result<Self> { + project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new) + } + /// Select the given column indices pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> { let exprs: Vec<_> = indices @@ -1647,13 +1656,33 @@ pub fn union_by_name( pub fn project( plan: LogicalPlan, expr: impl IntoIterator<Item = impl Into<Expr>>, +) -> Result<LogicalPlan> { + project_with_validation(plan, expr.into_iter().map(|e| (e, true))) +} + +/// Create Projection. Similar to project except that the expressions +/// passed in have a flag to indicate if that expression requires +/// validation (normalize & columnize) (true) or not (false) +/// # Errors +/// This function errors under any of the following conditions: +/// * Two or more expressions have the same name +/// * An invalid expression is used (e.g. a `sort` expression) +fn project_with_validation( + plan: LogicalPlan, + expr: impl IntoIterator<Item = (impl Into<Expr>, bool)>, ) -> Result<LogicalPlan> { let mut projected_expr = vec![]; - for e in expr { + for (e, validate) in expr { let e = e.into(); match e { Expr::Wildcard { .. } => projected_expr.push(e), - _ => projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?), + _ => { + if validate { + projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?) + } else { + projected_expr.push(e) + } + } } } validate_unique_names("Projections", projected_expr.iter())?; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org