This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9fb8eaeb3f Dataframe with_column and with_column_renamed performance 
improvements (#14653)
9fb8eaeb3f is described below

commit 9fb8eaeb3f027c963a603cd31db206e879f217f6
Author: Bruce Ritchie <bruce.ritc...@veeva.com>
AuthorDate: Thu Feb 27 06:25:10 2025 -0500

    Dataframe with_column and with_column_renamed performance improvements 
(#14653)
    
    * POC for with_column improvements.
    
    * Updates. Assumptions are still not valid here.
    
    * Added flag to indicate whether it is safe to project without validation 
or not.
    
    * Updated documentation for DataFrame.projection_requires_validation field.
    
    * project_with_validation is no longer public.
---
 datafusion/core/benches/dataframe.rs        |  6 +--
 datafusion/core/src/dataframe/mod.rs        | 67 ++++++++++++++++++++++++-----
 datafusion/core/src/dataframe/parquet.rs    |  1 +
 datafusion/expr/src/logical_plan/builder.rs | 33 +++++++++++++-
 4 files changed, 91 insertions(+), 16 deletions(-)

diff --git a/datafusion/core/benches/dataframe.rs 
b/datafusion/core/benches/dataframe.rs
index 087764883a..03078e05e1 100644
--- a/datafusion/core/benches/dataframe.rs
+++ b/datafusion/core/benches/dataframe.rs
@@ -56,8 +56,7 @@ fn run(column_count: u32, ctx: Arc<SessionContext>) {
 
             data_frame = data_frame
                 .with_column_renamed(field_name, new_field_name)
-                .unwrap();
-            data_frame = data_frame
+                .unwrap()
                 .with_column(new_field_name, btrim(vec![col(new_field_name)]))
                 .unwrap();
         }
@@ -68,8 +67,7 @@ fn run(column_count: u32, ctx: Arc<SessionContext>) {
 }
 
 fn criterion_benchmark(c: &mut Criterion) {
-    // 500 takes far too long right now
-    for column_count in [10, 100, 200 /* 500 */] {
+    for column_count in [10, 100, 200, 500] {
         let ctx = create_context(column_count).unwrap();
 
         c.bench_function(&format!("with_column_{column_count}"), |b| {
diff --git a/datafusion/core/src/dataframe/mod.rs 
b/datafusion/core/src/dataframe/mod.rs
index d2aee0a161..e998e489a9 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -188,6 +188,22 @@ pub struct DataFrame {
     // Box the (large) SessionState to reduce the size of DataFrame on the 
stack
     session_state: Box<SessionState>,
     plan: LogicalPlan,
+    // Whether projection ops can skip validation or not. This flag if false
+    // allows for an optimization in `with_column` and `with_column_renamed` 
functions
+    // where the recursive work required to columnize and normalize 
expressions can
+    // be skipped if set to false. Since these function calls are often 
chained or
+    // called many times in dataframe operations this can result in a 
significant
+    // performance gain.
+    //
+    // The conditions where this can be set to false is when the dataframe 
function
+    // call results in the last operation being a
+    // `LogicalPlanBuilder::from(plan).project(fields)?.build()` or
+    // 
`LogicalPlanBuilder::from(plan).project_with_validation(fields)?.build()`
+    // call. This requirement guarantees that the plan has had all 
columnization
+    // and normalization applied to existing expressions and only new 
expressions
+    // will require that work. Any operation that update the plan in any way
+    // via anything other than a `project` call should set this to true.
+    projection_requires_validation: bool,
 }
 
 impl DataFrame {
@@ -200,6 +216,7 @@ impl DataFrame {
         Self {
             session_state: Box::new(session_state),
             plan,
+            projection_requires_validation: true,
         }
     }
 
@@ -337,6 +354,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan: project_plan,
+            projection_requires_validation: false,
         })
     }
 
@@ -442,6 +460,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: true,
         })
     }
 
@@ -482,6 +501,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: true,
         })
     }
 
@@ -555,6 +575,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: !is_grouping_set,
         })
     }
 
@@ -567,6 +588,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: true,
         })
     }
 
@@ -605,6 +627,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: 
self.projection_requires_validation,
         })
     }
 
@@ -642,6 +665,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: true,
         })
     }
 
@@ -680,6 +704,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: true,
         })
     }
 
@@ -711,6 +736,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: true,
         })
     }
 
@@ -752,6 +778,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: true,
         })
     }
 
@@ -952,6 +979,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: 
self.projection_requires_validation,
         })
     }
 
@@ -1001,6 +1029,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: 
self.projection_requires_validation,
         })
     }
 
@@ -1068,6 +1097,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: true,
         })
     }
 
@@ -1127,6 +1157,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: true,
         })
     }
 
@@ -1162,6 +1193,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: true,
         })
     }
 
@@ -1433,6 +1465,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: 
self.projection_requires_validation,
         })
     }
 
@@ -1485,6 +1518,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: true,
         })
     }
 
@@ -1520,6 +1554,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: true,
         })
     }
 
@@ -1565,6 +1600,7 @@ impl DataFrame {
         DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: 
self.projection_requires_validation,
         }
         .collect()
         .await
@@ -1634,6 +1670,7 @@ impl DataFrame {
         DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: 
self.projection_requires_validation,
         }
         .collect()
         .await
@@ -1703,12 +1740,13 @@ impl DataFrame {
         DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: 
self.projection_requires_validation,
         }
         .collect()
         .await
     }
 
-    /// Add an additional column to the DataFrame.
+    /// Add or replace a column in the DataFrame.
     ///
     /// # Example
     /// ```
@@ -1736,33 +1774,36 @@ impl DataFrame {
 
         let mut col_exists = false;
         let new_column = expr.alias(name);
-        let mut fields: Vec<Expr> = plan
+        let mut fields: Vec<(Expr, bool)> = plan
             .schema()
             .iter()
             .filter_map(|(qualifier, field)| {
                 if field.name() == name {
                     col_exists = true;
-                    Some(new_column.clone())
+                    Some((new_column.clone(), true))
                 } else {
                     let e = col(Column::from((qualifier, field)));
                     window_fn_str
                         .as_ref()
                         .filter(|s| *s == &e.to_string())
                         .is_none()
-                        .then_some(e)
+                        .then_some((e, self.projection_requires_validation))
                 }
             })
             .collect();
 
         if !col_exists {
-            fields.push(new_column);
+            fields.push((new_column, true));
         }
 
-        let project_plan = 
LogicalPlanBuilder::from(plan).project(fields)?.build()?;
+        let project_plan = LogicalPlanBuilder::from(plan)
+            .project_with_validation(fields)?
+            .build()?;
 
         Ok(DataFrame {
             session_state: self.session_state,
             plan: project_plan,
+            projection_requires_validation: false,
         })
     }
 
@@ -1819,19 +1860,23 @@ impl DataFrame {
             .iter()
             .map(|(qualifier, field)| {
                 if qualifier.eq(&qualifier_rename) && field.as_ref() == 
field_rename {
-                    col(Column::from((qualifier, field)))
-                        .alias_qualified(qualifier.cloned(), new_name)
+                    (
+                        col(Column::from((qualifier, field)))
+                            .alias_qualified(qualifier.cloned(), new_name),
+                        false,
+                    )
                 } else {
-                    col(Column::from((qualifier, field)))
+                    (col(Column::from((qualifier, field))), false)
                 }
             })
             .collect::<Vec<_>>();
         let project_plan = LogicalPlanBuilder::from(self.plan)
-            .project(projection)?
+            .project_with_validation(projection)?
             .build()?;
         Ok(DataFrame {
             session_state: self.session_state,
             plan: project_plan,
+            projection_requires_validation: false,
         })
     }
 
@@ -1897,6 +1942,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: 
self.projection_requires_validation,
         })
     }
 
@@ -1932,6 +1978,7 @@ impl DataFrame {
         Ok(DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: 
self.projection_requires_validation,
         })
     }
 
diff --git a/datafusion/core/src/dataframe/parquet.rs 
b/datafusion/core/src/dataframe/parquet.rs
index 1dd4d68fca..1bb5444ca0 100644
--- a/datafusion/core/src/dataframe/parquet.rs
+++ b/datafusion/core/src/dataframe/parquet.rs
@@ -93,6 +93,7 @@ impl DataFrame {
         DataFrame {
             session_state: self.session_state,
             plan,
+            projection_requires_validation: 
self.projection_requires_validation,
         }
         .collect()
         .await
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index 4d825c6bfe..2bb15da218 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -528,6 +528,15 @@ impl LogicalPlanBuilder {
         project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
     }
 
+    /// Apply a projection without alias with optional validation
+    /// (true to validate, false to not validate)
+    pub fn project_with_validation(
+        self,
+        expr: Vec<(impl Into<Expr>, bool)>,
+    ) -> Result<Self> {
+        project_with_validation(Arc::unwrap_or_clone(self.plan), 
expr).map(Self::new)
+    }
+
     /// Select the given column indices
     pub fn select(self, indices: impl IntoIterator<Item = usize>) -> 
Result<Self> {
         let exprs: Vec<_> = indices
@@ -1647,13 +1656,33 @@ pub fn union_by_name(
 pub fn project(
     plan: LogicalPlan,
     expr: impl IntoIterator<Item = impl Into<Expr>>,
+) -> Result<LogicalPlan> {
+    project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
+}
+
+/// Create Projection. Similar to project except that the expressions
+/// passed in have a flag to indicate if that expression requires
+/// validation (normalize & columnize) (true) or not (false)
+/// # Errors
+/// This function errors under any of the following conditions:
+/// * Two or more expressions have the same name
+/// * An invalid expression is used (e.g. a `sort` expression)
+fn project_with_validation(
+    plan: LogicalPlan,
+    expr: impl IntoIterator<Item = (impl Into<Expr>, bool)>,
 ) -> Result<LogicalPlan> {
     let mut projected_expr = vec![];
-    for e in expr {
+    for (e, validate) in expr {
         let e = e.into();
         match e {
             Expr::Wildcard { .. } => projected_expr.push(e),
-            _ => projected_expr.push(columnize_expr(normalize_col(e, &plan)?, 
&plan)?),
+            _ => {
+                if validate {
+                    projected_expr.push(columnize_expr(normalize_col(e, 
&plan)?, &plan)?)
+                } else {
+                    projected_expr.push(e)
+                }
+            }
         }
     }
     validate_unique_names("Projections", projected_expr.iter())?;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to