Copilot commented on code in PR #17946:
URL: https://github.com/apache/datafusion/pull/17946#discussion_r2508155325


##########
datafusion/core/src/dataframe/mod.rs:
##########
@@ -2368,6 +2368,139 @@ impl DataFrame {
         let df = ctx.read_batch(batch)?;
         Ok(df)
     }
+
+    /// Pivot the DataFrame, transforming rows into columns based on the 
specified value columns and aggregation functions.
+    ///
+    /// # Arguments
+    /// * `aggregate_functions` - Aggregation expressions to apply (e.g., sum, 
count).
+    /// * `value_column` - Columns whose unique values will become new columns 
in the output.
+    /// * `value_source` - Columns to use as values for the pivoted columns.
+    /// * `default_on_null` - Optional expressions to use as default values 
when a pivoted value is null.
+    ///
+    /// # Example
+    /// ```
+    /// # use datafusion::prelude::*;
+    /// # use arrow::array::{ArrayRef, Int32Array, StringArray};
+    /// # use datafusion::functions_aggregate::expr_fn::sum;
+    /// # use std::sync::Arc;
+    /// # let ctx = SessionContext::new();
+    /// let value: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+    /// let category: ArrayRef = Arc::new(StringArray::from(vec!["A", "B", 
"A"]));
+    /// let df = DataFrame::from_columns(vec![("value", value), ("category", 
category)]).unwrap();
+    /// let pivoted = df.pivot(
+    ///     vec![sum(col("value"))],
+    ///     vec![Column::from("category")],
+    ///     vec![col("value")],
+    ///     None
+    /// ).unwrap();
+    /// ```
+    pub fn pivot(
+        self,
+        aggregate_functions: Vec<Expr>,
+        value_column: Vec<Column>,
+        value_source: Vec<Expr>,
+        default_on_null: Option<Vec<Expr>>,
+    ) -> Result<Self> {
+        let plan = LogicalPlanBuilder::from(self.plan)
+            .pivot(
+                aggregate_functions,
+                value_column,
+                value_source,
+                default_on_null,
+            )?
+            .build()?;
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan,
+            projection_requires_validation: 
self.projection_requires_validation,
+        })
+    }
+
+    /// Unpivot the DataFrame, transforming columns into rows.
+    ///
+    /// # Arguments
+    /// * `value_column_names` - Names for the value columns in the output
+    /// * `name_column` - Name for the column that will contain the original 
column names
+    /// * `unpivot_columns` - List of (column_names, optional_alias) tuples to 
unpivot
+    /// * `id_columns` - Optional list of columns to preserve (if None, all 
non-unpivoted columns are preserved)
+    /// * `include_nulls` - Whether to include rows with NULL values (default: 
false excludes NULLs)
+    ///
+    /// # Example
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use arrow::array::{ArrayRef, Int32Array};
+    /// # use datafusion::prelude::*;
+    /// # use datafusion::error::Result;
+    /// # #[tokio::main]
+    /// # async fn main() -> Result<()> {
+    /// let ctx = SessionContext::new();
+    /// let id: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
+    /// let jan: ArrayRef = Arc::new(Int32Array::from(vec![100, 110]));
+    /// let feb: ArrayRef = Arc::new(Int32Array::from(vec![200, 210]));
+    /// let mar: ArrayRef = Arc::new(Int32Array::from(vec![300, 310]));
+    /// let df = DataFrame::from_columns(vec![("id", id), ("jan", jan), 
("feb", feb), ("mar", mar)]).unwrap();
+    /// let unpivoted = df.unpivot(
+    ///     vec!["jan".to_string(), "feb".to_string(), "mar".to_string()],
+    ///     "month".to_string(),
+    ///     vec![(vec!["jan".to_string(), "feb".to_string(), 
"mar".to_string()], None)],
+    ///     None,
+    ///     false
+    /// ).unwrap();
+    /// # Ok(())
+    /// # }
+    /// ```
+    pub fn unpivot(
+        self,
+        value_column_names: Vec<String>,
+        name_column: String,
+        unpivot_columns: Vec<(Vec<String>, Option<String>)>,
+        id_columns: Option<Vec<String>>,
+        include_nulls: bool,
+    ) -> Result<Self> {
+        // Get required UDF functions from the session state
+        let named_struct_fn = self
+            .session_state
+            .scalar_functions()
+            .get("named_struct")
+            .ok_or_else(|| {
+            DataFusionError::Plan("named_struct function not 
found".to_string())
+        })?;

Review Comment:
   Inconsistent indentation in the error handling closure. The opening brace 
and closure body on lines 2465-2467 should be indented consistently with the 
similar error handlers below on lines 2473-2475 and 2481-2483.
   ```suggestion
                   DataFusionError::Plan("named_struct function not 
found".to_string())
               })?;
   ```



##########
datafusion/core/src/dataframe/mod.rs:
##########
@@ -2368,6 +2368,139 @@ impl DataFrame {
         let df = ctx.read_batch(batch)?;
         Ok(df)
     }
+
+    /// Pivot the DataFrame, transforming rows into columns based on the 
specified value columns and aggregation functions.
+    ///
+    /// # Arguments
+    /// * `aggregate_functions` - Aggregation expressions to apply (e.g., sum, 
count).
+    /// * `value_column` - Columns whose unique values will become new columns 
in the output.
+    /// * `value_source` - Columns to use as values for the pivoted columns.
+    /// * `default_on_null` - Optional expressions to use as default values 
when a pivoted value is null.
+    ///
+    /// # Example
+    /// ```
+    /// # use datafusion::prelude::*;
+    /// # use arrow::array::{ArrayRef, Int32Array, StringArray};
+    /// # use datafusion::functions_aggregate::expr_fn::sum;
+    /// # use std::sync::Arc;
+    /// # let ctx = SessionContext::new();
+    /// let value: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+    /// let category: ArrayRef = Arc::new(StringArray::from(vec!["A", "B", 
"A"]));
+    /// let df = DataFrame::from_columns(vec![("value", value), ("category", 
category)]).unwrap();
+    /// let pivoted = df.pivot(
+    ///     vec![sum(col("value"))],
+    ///     vec![Column::from("category")],
+    ///     vec![col("value")],
+    ///     None
+    /// ).unwrap();
+    /// ```
+    pub fn pivot(
+        self,
+        aggregate_functions: Vec<Expr>,
+        value_column: Vec<Column>,
+        value_source: Vec<Expr>,
+        default_on_null: Option<Vec<Expr>>,
+    ) -> Result<Self> {
+        let plan = LogicalPlanBuilder::from(self.plan)
+            .pivot(
+                aggregate_functions,
+                value_column,
+                value_source,
+                default_on_null,
+            )?
+            .build()?;
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan,
+            projection_requires_validation: 
self.projection_requires_validation,
+        })
+    }
+
+    /// Unpivot the DataFrame, transforming columns into rows.
+    ///
+    /// # Arguments
+    /// * `value_column_names` - Names for the value columns in the output
+    /// * `name_column` - Name for the column that will contain the original 
column names
+    /// * `unpivot_columns` - List of (column_names, optional_alias) tuples to 
unpivot
+    /// * `id_columns` - Optional list of columns to preserve (if None, all 
non-unpivoted columns are preserved)
+    /// * `include_nulls` - Whether to include rows with NULL values (default: 
false excludes NULLs)
+    ///
+    /// # Example
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use arrow::array::{ArrayRef, Int32Array};
+    /// # use datafusion::prelude::*;
+    /// # use datafusion::error::Result;
+    /// # #[tokio::main]
+    /// # async fn main() -> Result<()> {
+    /// let ctx = SessionContext::new();
+    /// let id: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
+    /// let jan: ArrayRef = Arc::new(Int32Array::from(vec![100, 110]));
+    /// let feb: ArrayRef = Arc::new(Int32Array::from(vec![200, 210]));
+    /// let mar: ArrayRef = Arc::new(Int32Array::from(vec![300, 310]));
+    /// let df = DataFrame::from_columns(vec![("id", id), ("jan", jan), 
("feb", feb), ("mar", mar)]).unwrap();
+    /// let unpivoted = df.unpivot(
+    ///     vec!["jan".to_string(), "feb".to_string(), "mar".to_string()],
+    ///     "month".to_string(),
+    ///     vec![(vec!["jan".to_string(), "feb".to_string(), 
"mar".to_string()], None)],

Review Comment:
   The documentation example is incorrect. The first parameter 
`value_column_names` should be a single value column name (e.g., 
`vec!["value".to_string()]`), not the list of columns being unpivoted. 
According to the implementation and test cases, the columns being unpivoted 
should only be in the `unpivot_columns` parameter.
   
   This should be:
   ```rust
   vec!["value".to_string()],  // Name for the value column in output
   "month".to_string(),
   vec![
       (vec!["jan".to_string()], Some("jan".to_string())),
       (vec!["feb".to_string()], Some("feb".to_string())),
       (vec!["mar".to_string()], Some("mar".to_string()))
   ],
   ```
   ```suggestion
       ///     vec!["value".to_string()], // Name for the value column in output
       ///     "month".to_string(),
       ///     vec![
       ///         (vec!["jan".to_string()], Some("jan".to_string())),
       ///         (vec!["feb".to_string()], Some("feb".to_string())),
       ///         (vec!["mar".to_string()], Some("mar".to_string()))
       ///     ],
   ```



##########
datafusion/sql/src/relation/mod.rs:
##########
@@ -183,6 +187,208 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
                         .build()?;
                 (plan, alias)
             }
+            TableFactor::Pivot {
+                table,
+                aggregate_functions,
+                value_column,
+                value_source,
+                default_on_null,
+                alias,
+            } => {
+                let plan =
+                    self.create_relation(table.as_ref().clone(), 
planner_context)?;
+                let schema = plan.schema();
+                let aggregate_functions = aggregate_functions
+                    .into_iter()
+                    .map(|func| {
+                        self.sql_expr_to_logical_expr(func.expr, schema, 
planner_context)
+                            .map(|expr| match func.alias {
+                                Some(name) => expr.alias(name.value),
+                                None => expr,
+                            })
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+                let value_column = value_column.into_iter().map(|column| {
+                    let expr = match column {
+                        SqlExpr::Identifier(id) => 
self.sql_identifier_to_expr(id, schema, planner_context)?,
+                        SqlExpr::CompoundIdentifier(idents) => 
self.sql_compound_identifier_to_expr(idents, schema, planner_context)?,
+                        expr => return plan_err!(
+                            "Expected column identifier, found: {expr:?} in 
pivot value column"
+                        ),
+                    };
+                    match expr {
+                        Expr::Column(col) => Ok(col),
+                        expr => plan_err!(
+                            "Expected column identifier, found: {expr:?} in 
pivot value column"
+                        ),
+                    }
+                }).collect::<Result<Vec<_>>>()?;
+
+                let PivotValueSource::List(source) = value_source else {
+                    // Dynamic pivot: the output schema is determined by the 
data in the source table at runtime.
+                    return plan_err!("Dynamic pivot is not supported yet");
+                };
+                let value_source = source
+                    .into_iter()
+                    .map(|expr_with_alias| {
+                        self.sql_expr_to_logical_expr(
+                            expr_with_alias.expr,
+                            schema,
+                            planner_context,
+                        )
+                        .map(|expr| {
+                            match expr_with_alias.alias {
+                                Some(name) => expr.alias(name.value),
+                                None => expr,
+                            }
+                        })
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+
+                let default_on_null = default_on_null
+                    .map(|expr| {
+                        let expr =
+                            self.sql_expr_to_logical_expr(expr, schema, 
planner_context)?;
+                        match expr {
+                            Expr::Literal(ScalarValue::List(list), _) => 
(0..list.len())
+                                .map(|idx| {
+                                    Ok(Expr::Literal(
+                                        
ScalarValue::try_from_array(list.values(), idx)?,
+                                        None,
+                                    ))
+                                })
+                                .collect::<Result<Vec<_>>>(),
+                            _ => plan_err!("Pivot default value cannot be 
NULL"),

Review Comment:
   The error message is misleading. The code actually checks if the expression 
is NOT a List literal, but the error says "Pivot default value cannot be NULL". 
This should be more accurate, such as "Pivot default value must be a list 
literal" or "Expected list literal for pivot default values".
   ```suggestion
                               _ => plan_err!("Pivot default value must be a 
list literal"),
   ```



##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -1493,6 +1497,303 @@ impl LogicalPlanBuilder {
         unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
             .map(Self::new)
     }
+
+    pub fn pivot(
+        self,
+        aggregate_functions: Vec<Expr>,
+        value_column: Vec<Column>,
+        value_source: Vec<Expr>,
+        default_on_null: Option<Vec<Expr>>,
+    ) -> Result<Self> {
+        match default_on_null {
+            Some(default_values) if default_values.len() != 
aggregate_functions.len() => {
+                return plan_err!("Number of default values must match the 
number of aggregate functions");
+            }
+            _ => {}
+        }
+        let mut used_columns = HashSet::new();
+        used_columns.extend(value_column.iter().cloned());
+        for agg in aggregate_functions.iter() {
+            expr_to_columns(agg, &mut used_columns)?;
+        }
+
+        let used_columns = used_columns
+            .iter()
+            .map(|c| c.name.clone())
+            .collect::<HashSet<_>>();
+
+        // Extract group by columns (all columns not involved in aggregation 
or pivot)
+        let schema = self.schema();
+
+        let group_by_columns = schema
+            .fields()
+            .iter()
+            .filter_map(|f| {
+                if used_columns.contains(f.name()) {
+                    None // Skip columns that are used in aggregation or pivot
+                } else {
+                    Some(Expr::Column(Column::from_name(f.name())))
+                }
+            })
+            .collect::<Vec<_>>();
+
+        // Create filtered aggregate expressions for each value in value_source
+        let mut aggr_exprs = Vec::new();
+
+        for value in &value_source {
+            let (value, value_alias) =
+                if let Expr::Alias(Alias { expr, name, .. }) = value {
+                    (expr.as_ref(), name)
+                } else {
+                    (value, &value.to_string())
+                };
+            let condition = match value_column.len() {
+                0 => return plan_err!("Pivot requires at least one value 
column"),
+                1 => binary_expr(
+                    Expr::Column(value_column[0].clone()),
+                    Operator::IsNotDistinctFrom,
+                    value.clone(),
+                ),
+                _ => {
+                    let Expr::ScalarFunction(ScalarFunction { func, args }) = 
value
+                    else {
+                        return plan_err!("Pivot value must be struct(literals) 
if multiple value columns are provided");
+                    };
+                    if func.name() != "struct" {
+                        return plan_err!("Pivot value must be struct(literals) 
if multiple value columns are provided");
+                    }
+                    if args.len() != value_column.len() {
+                        return plan_err!(
+                            "Pivot value list length must match value column 
count"
+                        );
+                    }
+                    let mut condition: Option<Expr> = None;
+                    for (idx, col) in value_column.iter().enumerate() {
+                        let single_condition = binary_expr(
+                            Expr::Column(col.clone()),
+                            Operator::IsNotDistinctFrom,
+                            args[idx].clone(),
+                        );
+                        condition = match condition {
+                            None => Some(single_condition),
+                            Some(prev) => Some(and(prev, single_condition)),
+                        };
+                    }
+                    match condition {
+                        None => {
+                            return plan_err!("Pivot value condition cannot be 
empty")
+                        }
+                        Some(cond) => cond,
+                    }

Review Comment:
   This check is unreachable and unnecessary. Since we're in the 
`value_column.len() > 1` branch (the `_` case matches 2 or more), and we 
iterate through `value_column.iter().enumerate()`, the `condition` variable 
will always be `Some` after the loop completes. The first iteration sets it to 
`Some`, and subsequent iterations keep it as `Some`. This check can never fail 
and should be removed.
   ```suggestion
                       condition.unwrap()
   ```



##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -1493,6 +1497,303 @@ impl LogicalPlanBuilder {
         unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
             .map(Self::new)
     }
+
+    pub fn pivot(
+        self,
+        aggregate_functions: Vec<Expr>,
+        value_column: Vec<Column>,
+        value_source: Vec<Expr>,
+        default_on_null: Option<Vec<Expr>>,
+    ) -> Result<Self> {
+        match default_on_null {
+            Some(default_values) if default_values.len() != 
aggregate_functions.len() => {
+                return plan_err!("Number of default values must match the 
number of aggregate functions");
+            }
+            _ => {}
+        }
+        let mut used_columns = HashSet::new();
+        used_columns.extend(value_column.iter().cloned());
+        for agg in aggregate_functions.iter() {
+            expr_to_columns(agg, &mut used_columns)?;
+        }
+
+        let used_columns = used_columns
+            .iter()
+            .map(|c| c.name.clone())
+            .collect::<HashSet<_>>();
+
+        // Extract group by columns (all columns not involved in aggregation 
or pivot)
+        let schema = self.schema();
+
+        let group_by_columns = schema
+            .fields()
+            .iter()
+            .filter_map(|f| {
+                if used_columns.contains(f.name()) {
+                    None // Skip columns that are used in aggregation or pivot
+                } else {
+                    Some(Expr::Column(Column::from_name(f.name())))
+                }
+            })
+            .collect::<Vec<_>>();
+
+        // Create filtered aggregate expressions for each value in value_source
+        let mut aggr_exprs = Vec::new();
+
+        for value in &value_source {
+            let (value, value_alias) =
+                if let Expr::Alias(Alias { expr, name, .. }) = value {
+                    (expr.as_ref(), name)
+                } else {
+                    (value, &value.to_string())
+                };
+            let condition = match value_column.len() {
+                0 => return plan_err!("Pivot requires at least one value 
column"),
+                1 => binary_expr(
+                    Expr::Column(value_column[0].clone()),
+                    Operator::IsNotDistinctFrom,
+                    value.clone(),
+                ),
+                _ => {
+                    let Expr::ScalarFunction(ScalarFunction { func, args }) = 
value
+                    else {
+                        return plan_err!("Pivot value must be struct(literals) 
if multiple value columns are provided");
+                    };
+                    if func.name() != "struct" {
+                        return plan_err!("Pivot value must be struct(literals) 
if multiple value columns are provided");
+                    }
+                    if args.len() != value_column.len() {
+                        return plan_err!(
+                            "Pivot value list length must match value column 
count"
+                        );
+                    }
+                    let mut condition: Option<Expr> = None;
+                    for (idx, col) in value_column.iter().enumerate() {
+                        let single_condition = binary_expr(
+                            Expr::Column(col.clone()),
+                            Operator::IsNotDistinctFrom,
+                            args[idx].clone(),
+                        );
+                        condition = match condition {
+                            None => Some(single_condition),
+                            Some(prev) => Some(and(prev, single_condition)),
+                        };
+                    }
+                    match condition {
+                        None => {
+                            return plan_err!("Pivot value condition cannot be 
empty")
+                        }
+                        Some(cond) => cond,
+                    }
+                }
+            };
+
+            for (i, agg_func) in aggregate_functions.iter().enumerate() {
+                let (expr, name, metadata) = match agg_func {
+                    Expr::Alias(Alias {
+                        expr,
+                        name,
+                        metadata,
+                        ..
+                    }) if matches!(expr.as_ref(), Expr::AggregateFunction(_)) 
=> {
+                        (expr.as_ref(), name, metadata)
+                    }
+                    Expr::AggregateFunction(_) => {
+                        (agg_func, &agg_func.to_string(), &None)
+                    }
+                    _ => {
+                        return plan_err!(
+                            "Pivot aggregate function must be either an alias 
or an aggregate function expression, but got: {agg_func:?}"
+                        );
+                    }
+                };
+                let expr = expr
+                    .clone()
+                    .transform(|nested_expr| match &nested_expr {
+                        Expr::AggregateFunction(func) => {
+                            let filter = match &func.params.filter {
+                                Some(filter) => {
+                                    and(filter.as_ref().clone(), 
condition.clone())
+                                }
+                                None => condition.clone(),
+                            };
+                            let mut func = func.clone();
+                            func.params.filter = Some(Box::new(filter));
+                            Ok(Transformed::yes(Expr::AggregateFunction(func)))
+                        }
+                        _ => Ok(Transformed::no(nested_expr)),
+                    })?
+                    .data;
+
+                let expr = match default_on_null.as_ref() {
+                    Some(default_values) => {
+                        when(expr.clone().is_null(), default_values[i].clone())
+                            .otherwise(expr)?
+                    }
+                    None => expr,
+                };
+                let pivot_col_name = format!(
+                    "{}_{}",
+                    value_alias.replace("\"", "").replace("'", ""),
+                    name
+                );
+                aggr_exprs
+                    .push(expr.alias_with_metadata(pivot_col_name, 
metadata.clone()));
+            }
+        }
+
+        let aggregate_plan = self.aggregate(group_by_columns, aggr_exprs)?;
+
+        Ok(aggregate_plan)
+    }
+
+    #[allow(clippy::too_many_arguments)]
+    pub fn unpivot(
+        self,
+        value_column_names: Vec<String>,
+        name_column: String,
+        unpivot_columns: Vec<(Vec<String>, Option<String>)>,
+        id_columns: Option<Vec<String>>,
+        include_nulls: bool,
+        named_struct_fn: &Arc<ScalarUDF>,
+        make_array_fn: &Arc<ScalarUDF>,
+        get_field_fn: &Arc<ScalarUDF>,
+    ) -> Result<Self> {
+        let schema = self.schema();
+        let num_value_columns = value_column_names.len();
+
+        // Validate that all unpivot columns have the same number of columns
+        for (cols, _) in &unpivot_columns {
+            if cols.len() != num_value_columns {
+                return plan_err!(
+                    "All unpivot columns must have {} column(s), but found {}",
+                    num_value_columns,
+                    cols.len()
+                );
+            }
+        }
+
+        // Get the list of columns that should be preserved (not unpivoted)
+        let unpivot_col_set: HashSet<String> = unpivot_columns
+            .iter()
+            .flat_map(|(cols, _)| cols.iter().cloned())
+            .collect();
+
+        let preserved_columns: Vec<Expr> = if let Some(id_columns) = 
id_columns {
+            id_columns
+                .iter()
+                .map(|col| Expr::Column(Column::from_name(col)))
+                .collect()
+        } else {
+            schema
+                .iter()
+                .filter_map(|(q, f)| {
+                    if !unpivot_col_set.contains(f.name()) {
+                        Some(Expr::Column(Column::new(q.cloned(), f.name())))
+                    } else {
+                        None
+                    }
+                })
+                .collect()
+        };
+
+        // Build array of structs: array[struct(name_val, col1_val, col2_val, 
...), ...]
+        let mut struct_exprs = Vec::new();
+
+        for (col_names, alias_opt) in unpivot_columns {
+            // Build struct fields: [name_literal, name_column_name, value1, 
value1_name, value2, value2_name, ...]
+            let mut struct_fields = Vec::new();
+
+            // Add name field
+            let name_value = alias_opt.unwrap_or_else(|| col_names[0].clone());
+            struct_fields.push(lit(name_column.clone()));
+            struct_fields.push(lit(name_value));
+
+            // Add value fields
+            for (i, col_name) in col_names.iter().enumerate() {
+                struct_fields.push(lit(value_column_names[i].clone()));
+                
struct_fields.push(Expr::Column(Column::from_qualified_name(col_name)));
+            }
+
+            // Create struct expression
+            let struct_expr = Expr::ScalarFunction(ScalarFunction::new_udf(
+                Arc::clone(named_struct_fn),
+                struct_fields,
+            ));
+
+            struct_exprs.push(struct_expr);
+        }
+
+        let unpivot_array_column = "__unpivot_array";

Review Comment:
   The hardcoded column name `__unpivot_array` could potentially conflict with 
existing columns in the schema. If a user's table already has a column named 
`__unpivot_array`, this would cause a collision and unexpected behavior. 
Consider either:
   1. Checking if this name exists in the schema and generating a unique name 
if needed
   2. Using a UUID or other guaranteed-unique identifier
   3. Using a more obscure prefix that's less likely to conflict (though this 
doesn't eliminate the risk)
   ```suggestion
           // Generate a unique column name for the unpivot array to avoid 
collisions
           let mut base_name = "__unpivot_array".to_string();
           let mut unpivot_array_column = base_name.clone();
           let mut i = 1;
           let existing_names: HashSet<&str> = schema.iter().map(|(_, f)| 
f.name().as_str()).collect();
           while existing_names.contains(unpivot_array_column.as_str()) {
               unpivot_array_column = format!("{}{}", base_name, i);
               i += 1;
           }
   ```



##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -1493,6 +1497,303 @@ impl LogicalPlanBuilder {
         unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
             .map(Self::new)
     }
+
+    pub fn pivot(
+        self,
+        aggregate_functions: Vec<Expr>,
+        value_column: Vec<Column>,
+        value_source: Vec<Expr>,
+        default_on_null: Option<Vec<Expr>>,
+    ) -> Result<Self> {
+        match default_on_null {
+            Some(default_values) if default_values.len() != 
aggregate_functions.len() => {
+                return plan_err!("Number of default values must match the 
number of aggregate functions");
+            }
+            _ => {}
+        }
+        let mut used_columns = HashSet::new();
+        used_columns.extend(value_column.iter().cloned());
+        for agg in aggregate_functions.iter() {
+            expr_to_columns(agg, &mut used_columns)?;
+        }
+
+        let used_columns = used_columns
+            .iter()
+            .map(|c| c.name.clone())
+            .collect::<HashSet<_>>();
+
+        // Extract group by columns (all columns not involved in aggregation 
or pivot)
+        let schema = self.schema();
+
+        let group_by_columns = schema
+            .fields()
+            .iter()
+            .filter_map(|f| {
+                if used_columns.contains(f.name()) {
+                    None // Skip columns that are used in aggregation or pivot
+                } else {
+                    Some(Expr::Column(Column::from_name(f.name())))
+                }
+            })
+            .collect::<Vec<_>>();
+
+        // Create filtered aggregate expressions for each value in value_source
+        let mut aggr_exprs = Vec::new();
+
+        for value in &value_source {
+            let (value, value_alias) =
+                if let Expr::Alias(Alias { expr, name, .. }) = value {
+                    (expr.as_ref(), name)
+                } else {
+                    (value, &value.to_string())
+                };
+            let condition = match value_column.len() {
+                0 => return plan_err!("Pivot requires at least one value 
column"),
+                1 => binary_expr(
+                    Expr::Column(value_column[0].clone()),
+                    Operator::IsNotDistinctFrom,
+                    value.clone(),
+                ),
+                _ => {
+                    let Expr::ScalarFunction(ScalarFunction { func, args }) = 
value
+                    else {
+                        return plan_err!("Pivot value must be struct(literals) 
if multiple value columns are provided");
+                    };
+                    if func.name() != "struct" {
+                        return plan_err!("Pivot value must be struct(literals) 
if multiple value columns are provided");
+                    }
+                    if args.len() != value_column.len() {
+                        return plan_err!(
+                            "Pivot value list length must match value column 
count"
+                        );
+                    }
+                    let mut condition: Option<Expr> = None;
+                    for (idx, col) in value_column.iter().enumerate() {
+                        let single_condition = binary_expr(
+                            Expr::Column(col.clone()),
+                            Operator::IsNotDistinctFrom,
+                            args[idx].clone(),
+                        );
+                        condition = match condition {
+                            None => Some(single_condition),
+                            Some(prev) => Some(and(prev, single_condition)),
+                        };
+                    }
+                    match condition {
+                        None => {
+                            return plan_err!("Pivot value condition cannot be 
empty")
+                        }
+                        Some(cond) => cond,
+                    }
+                }
+            };
+
+            for (i, agg_func) in aggregate_functions.iter().enumerate() {
+                let (expr, name, metadata) = match agg_func {
+                    Expr::Alias(Alias {
+                        expr,
+                        name,
+                        metadata,
+                        ..
+                    }) if matches!(expr.as_ref(), Expr::AggregateFunction(_)) 
=> {
+                        (expr.as_ref(), name, metadata)
+                    }
+                    Expr::AggregateFunction(_) => {
+                        (agg_func, &agg_func.to_string(), &None)
+                    }
+                    _ => {
+                        return plan_err!(
+                            "Pivot aggregate function must be either an alias 
or an aggregate function expression, but got: {agg_func:?}"
+                        );
+                    }
+                };
+                let expr = expr
+                    .clone()
+                    .transform(|nested_expr| match &nested_expr {
+                        Expr::AggregateFunction(func) => {
+                            let filter = match &func.params.filter {
+                                Some(filter) => {
+                                    and(filter.as_ref().clone(), 
condition.clone())
+                                }
+                                None => condition.clone(),
+                            };
+                            let mut func = func.clone();
+                            func.params.filter = Some(Box::new(filter));
+                            Ok(Transformed::yes(Expr::AggregateFunction(func)))
+                        }
+                        _ => Ok(Transformed::no(nested_expr)),
+                    })?
+                    .data;
+
+                let expr = match default_on_null.as_ref() {
+                    Some(default_values) => {
+                        when(expr.clone().is_null(), default_values[i].clone())
+                            .otherwise(expr)?
+                    }
+                    None => expr,
+                };
+                let pivot_col_name = format!(
+                    "{}_{}",
+                    value_alias.replace("\"", "").replace("'", ""),
+                    name
+                );
+                aggr_exprs
+                    .push(expr.alias_with_metadata(pivot_col_name, 
metadata.clone()));
+            }
+        }
+
+        let aggregate_plan = self.aggregate(group_by_columns, aggr_exprs)?;
+
+        Ok(aggregate_plan)
+    }
+
+    #[allow(clippy::too_many_arguments)]
+    pub fn unpivot(
+        self,
+        value_column_names: Vec<String>,
+        name_column: String,
+        unpivot_columns: Vec<(Vec<String>, Option<String>)>,
+        id_columns: Option<Vec<String>>,
+        include_nulls: bool,
+        named_struct_fn: &Arc<ScalarUDF>,
+        make_array_fn: &Arc<ScalarUDF>,
+        get_field_fn: &Arc<ScalarUDF>,
+    ) -> Result<Self> {

Review Comment:
   Missing validation for empty input vectors. The function should validate 
that `value_column_names` and `unpivot_columns` are not empty, as an unpivot 
operation without these doesn't make semantic sense. Consider adding early 
validation like:
   ```rust
   if value_column_names.is_empty() {
       return plan_err!("Unpivot requires at least one value column name");
   }
   if unpivot_columns.is_empty() {
       return plan_err!("Unpivot requires at least one column to unpivot");
   }
   ```
   ```suggestion
       ) -> Result<Self> {
           if value_column_names.is_empty() {
               return plan_err!("Unpivot requires at least one value column 
name");
           }
           if unpivot_columns.is_empty() {
               return plan_err!("Unpivot requires at least one column to 
unpivot");
           }
   ```



##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -1493,6 +1497,303 @@ impl LogicalPlanBuilder {
         unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
             .map(Self::new)
     }
+
+    pub fn pivot(
+        self,
+        aggregate_functions: Vec<Expr>,
+        value_column: Vec<Column>,
+        value_source: Vec<Expr>,
+        default_on_null: Option<Vec<Expr>>,
+    ) -> Result<Self> {

Review Comment:
   Missing validation for empty input vectors. The function should validate 
that `aggregate_functions` and `value_source` are not empty, as a pivot 
operation without aggregate functions or values doesn't make semantic sense. 
Consider adding early validation like:
   ```rust
   if aggregate_functions.is_empty() {
       return plan_err!("Pivot requires at least one aggregate function");
   }
   if value_source.is_empty() {
       return plan_err!("Pivot requires at least one value in value_source");
   }
   ```
   ```suggestion
       ) -> Result<Self> {
           if aggregate_functions.is_empty() {
               return plan_err!("Pivot requires at least one aggregate 
function");
           }
           if value_source.is_empty() {
               return plan_err!("Pivot requires at least one value in 
value_source");
           }
   ```



##########
datafusion/sql/src/relation/mod.rs:
##########
@@ -183,6 +187,208 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
                         .build()?;
                 (plan, alias)
             }
+            TableFactor::Pivot {
+                table,
+                aggregate_functions,
+                value_column,
+                value_source,
+                default_on_null,
+                alias,
+            } => {
+                let plan =
+                    self.create_relation(table.as_ref().clone(), 
planner_context)?;
+                let schema = plan.schema();
+                let aggregate_functions = aggregate_functions
+                    .into_iter()
+                    .map(|func| {
+                        self.sql_expr_to_logical_expr(func.expr, schema, 
planner_context)
+                            .map(|expr| match func.alias {
+                                Some(name) => expr.alias(name.value),
+                                None => expr,
+                            })
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+                let value_column = value_column.into_iter().map(|column| {
+                    let expr = match column {
+                        SqlExpr::Identifier(id) => 
self.sql_identifier_to_expr(id, schema, planner_context)?,
+                        SqlExpr::CompoundIdentifier(idents) => 
self.sql_compound_identifier_to_expr(idents, schema, planner_context)?,
+                        expr => return plan_err!(
+                            "Expected column identifier, found: {expr:?} in 
pivot value column"
+                        ),
+                    };
+                    match expr {
+                        Expr::Column(col) => Ok(col),
+                        expr => plan_err!(
+                            "Expected column identifier, found: {expr:?} in 
pivot value column"
+                        ),
+                    }
+                }).collect::<Result<Vec<_>>>()?;
+
+                let PivotValueSource::List(source) = value_source else {
+                    // Dynamic pivot: the output schema is determined by the 
data in the source table at runtime.
+                    return plan_err!("Dynamic pivot is not supported yet");
+                };
+                let value_source = source
+                    .into_iter()
+                    .map(|expr_with_alias| {
+                        self.sql_expr_to_logical_expr(
+                            expr_with_alias.expr,
+                            schema,
+                            planner_context,
+                        )
+                        .map(|expr| {
+                            match expr_with_alias.alias {
+                                Some(name) => expr.alias(name.value),
+                                None => expr,
+                            }
+                        })
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+
+                let default_on_null = default_on_null
+                    .map(|expr| {
+                        let expr =
+                            self.sql_expr_to_logical_expr(expr, schema, 
planner_context)?;
+                        match expr {
+                            Expr::Literal(ScalarValue::List(list), _) => 
(0..list.len())
+                                .map(|idx| {
+                                    Ok(Expr::Literal(
+                                        
ScalarValue::try_from_array(list.values(), idx)?,
+                                        None,
+                                    ))
+                                })
+                                .collect::<Result<Vec<_>>>(),
+                            _ => plan_err!("Pivot default value cannot be 
NULL"),
+                        }
+                    })
+                    .transpose()?;
+
+                let plan = LogicalPlanBuilder::from(plan)
+                    .pivot(
+                        aggregate_functions,
+                        value_column,
+                        value_source,
+                        default_on_null,
+                    )?
+                    .build()?;
+                (plan, alias)
+            }
+            TableFactor::Unpivot {
+                table,
+                value,
+                name,
+                columns,
+                null_inclusion,
+                alias,
+            } => {
+                let plan =
+                    self.create_relation(table.as_ref().clone(), 
planner_context)?;
+
+                // Parse value expression(s)
+                let value_columns = match value {
+                    SqlExpr::Tuple(exprs) => exprs,
+                    single_expr => vec![single_expr],
+                };
+                let value_column_names: Vec<String> = value_columns
+                    .into_iter()
+                    .map(|expr| match expr {
+                        SqlExpr::Identifier(id) => Ok(id.value),
+                        _ => plan_err!("Expected identifier in UNPIVOT value 
clause"),
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+
+                // Parse name column
+                let name_column = name.value;
+
+                // Parse columns to unpivot
+                let unpivot_columns: Vec<(Vec<String>, Option<String>)> = 
columns
+                    .into_iter()
+                    .map(|col_with_alias| {
+                        let column_names = match &col_with_alias.expr {
+                            SqlExpr::Tuple(exprs) => exprs
+                                .iter()
+                                .map(|e| match e {
+                                    SqlExpr::Identifier(id) => 
Ok(id.value.clone()),
+                                    SqlExpr::CompoundIdentifier(ids) => Ok(ids
+                                        .iter()
+                                        .map(|i| i.value.as_str())
+                                        .collect::<Vec<_>>()
+                                        .join(".")),
+                                    _ => plan_err!(
+                                        "Expected identifier in UNPIVOT IN 
clause"
+                                    ),
+                                })
+                                .collect::<Result<Vec<_>>>()?,
+                            SqlExpr::Identifier(id) => vec![id.value.clone()],
+                            SqlExpr::CompoundIdentifier(ids) => {
+                                vec![ids
+                                    .iter()
+                                    .map(|i| i.value.as_str())
+                                    .collect::<Vec<_>>()
+                                    .join(".")]
+                            }
+                            _ => {
+                                return plan_err!(
+                                    "Expected identifier or tuple in UNPIVOT 
IN clause"
+                                )
+                            }
+                        };
+
+                        let alias = col_with_alias.alias.map(|alias| 
alias.value);
+                        Ok((column_names, alias))
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+
+                // Determine if nulls should be included (default is EXCLUDE)
+                let include_nulls = matches!(
+                    null_inclusion,
+                    Some(sqlparser::ast::NullInclusion::IncludeNulls)
+                );
+
+                // Get named_struct and make_array functions from context
+                let named_struct_fn = self
+                    .context_provider
+                    .get_function_meta("named_struct")
+                    .ok_or_else(|| {
+                        datafusion_common::DataFusionError::Plan(
+                            "named_struct function not found".to_string(),
+                        )
+                    })?;
+
+                let make_array_fn = self
+                    .context_provider
+                    .get_function_meta("make_array")
+                    .ok_or_else(|| {
+                        datafusion_common::DataFusionError::Plan(
+                            "make_array function not found".to_string(),
+                        )
+                    })?;
+
+                let get_field_fn = self
+                    .context_provider
+                    .get_function_meta("get_field")
+                    .ok_or_else(|| {
+                    datafusion_common::DataFusionError::Plan(
+                        "get_field function not found".to_string(),
+                    )
+                })?;

Review Comment:
   Inconsistent indentation in the error handling closure. The opening brace on 
line 370 should be indented to align with the previous error handlers on lines 
352 and 361 for consistency.
   ```suggestion
                           datafusion_common::DataFusionError::Plan(
                               "get_field function not found".to_string(),
                           )
                       })?;
   ```



##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -1493,6 +1497,303 @@ impl LogicalPlanBuilder {
         unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
             .map(Self::new)
     }
+
+    pub fn pivot(
+        self,
+        aggregate_functions: Vec<Expr>,
+        value_column: Vec<Column>,
+        value_source: Vec<Expr>,
+        default_on_null: Option<Vec<Expr>>,
+    ) -> Result<Self> {
+        match default_on_null {
+            Some(default_values) if default_values.len() != 
aggregate_functions.len() => {
+                return plan_err!("Number of default values must match the 
number of aggregate functions");
+            }
+            _ => {}
+        }
+        let mut used_columns = HashSet::new();
+        used_columns.extend(value_column.iter().cloned());
+        for agg in aggregate_functions.iter() {
+            expr_to_columns(agg, &mut used_columns)?;
+        }
+
+        let used_columns = used_columns
+            .iter()
+            .map(|c| c.name.clone())
+            .collect::<HashSet<_>>();
+
+        // Extract group by columns (all columns not involved in aggregation 
or pivot)
+        let schema = self.schema();
+
+        let group_by_columns = schema
+            .fields()
+            .iter()
+            .filter_map(|f| {
+                if used_columns.contains(f.name()) {
+                    None // Skip columns that are used in aggregation or pivot
+                } else {
+                    Some(Expr::Column(Column::from_name(f.name())))
+                }
+            })
+            .collect::<Vec<_>>();
+
+        // Create filtered aggregate expressions for each value in value_source
+        let mut aggr_exprs = Vec::new();
+
+        for value in &value_source {
+            let (value, value_alias) =
+                if let Expr::Alias(Alias { expr, name, .. }) = value {
+                    (expr.as_ref(), name)
+                } else {
+                    (value, &value.to_string())
+                };
+            let condition = match value_column.len() {
+                0 => return plan_err!("Pivot requires at least one value 
column"),
+                1 => binary_expr(
+                    Expr::Column(value_column[0].clone()),
+                    Operator::IsNotDistinctFrom,
+                    value.clone(),
+                ),
+                _ => {
+                    let Expr::ScalarFunction(ScalarFunction { func, args }) = 
value
+                    else {
+                        return plan_err!("Pivot value must be struct(literals) 
if multiple value columns are provided");
+                    };
+                    if func.name() != "struct" {
+                        return plan_err!("Pivot value must be struct(literals) 
if multiple value columns are provided");
+                    }
+                    if args.len() != value_column.len() {
+                        return plan_err!(
+                            "Pivot value list length must match value column 
count"
+                        );
+                    }
+                    let mut condition: Option<Expr> = None;
+                    for (idx, col) in value_column.iter().enumerate() {
+                        let single_condition = binary_expr(
+                            Expr::Column(col.clone()),
+                            Operator::IsNotDistinctFrom,
+                            args[idx].clone(),
+                        );
+                        condition = match condition {
+                            None => Some(single_condition),
+                            Some(prev) => Some(and(prev, single_condition)),
+                        };
+                    }
+                    match condition {
+                        None => {
+                            return plan_err!("Pivot value condition cannot be 
empty")
+                        }
+                        Some(cond) => cond,
+                    }
+                }
+            };
+
+            for (i, agg_func) in aggregate_functions.iter().enumerate() {
+                let (expr, name, metadata) = match agg_func {
+                    Expr::Alias(Alias {
+                        expr,
+                        name,
+                        metadata,
+                        ..
+                    }) if matches!(expr.as_ref(), Expr::AggregateFunction(_)) 
=> {
+                        (expr.as_ref(), name, metadata)
+                    }
+                    Expr::AggregateFunction(_) => {
+                        (agg_func, &agg_func.to_string(), &None)
+                    }
+                    _ => {
+                        return plan_err!(
+                            "Pivot aggregate function must be either an alias 
or an aggregate function expression, but got: {agg_func:?}"
+                        );
+                    }
+                };
+                let expr = expr
+                    .clone()
+                    .transform(|nested_expr| match &nested_expr {
+                        Expr::AggregateFunction(func) => {
+                            let filter = match &func.params.filter {
+                                Some(filter) => {
+                                    and(filter.as_ref().clone(), 
condition.clone())
+                                }
+                                None => condition.clone(),
+                            };
+                            let mut func = func.clone();
+                            func.params.filter = Some(Box::new(filter));
+                            Ok(Transformed::yes(Expr::AggregateFunction(func)))
+                        }
+                        _ => Ok(Transformed::no(nested_expr)),
+                    })?
+                    .data;
+
+                let expr = match default_on_null.as_ref() {
+                    Some(default_values) => {
+                        when(expr.clone().is_null(), default_values[i].clone())
+                            .otherwise(expr)?
+                    }
+                    None => expr,
+                };
+                let pivot_col_name = format!(
+                    "{}_{}",
+                    value_alias.replace("\"", "").replace("'", ""),

Review Comment:
   Using string replacements to sanitize column names is fragile and may not 
handle all edge cases. Consider using a more robust approach or documenting why 
only quotes need to be removed. For example, other special characters like 
backticks, brackets, or control characters might also appear in aliases.
   ```suggestion
                       sanitize_column_name(value_alias),
   ```



##########
datafusion/core/src/dataframe/mod.rs:
##########
@@ -2368,6 +2368,139 @@ impl DataFrame {
         let df = ctx.read_batch(batch)?;
         Ok(df)
     }
+
+    /// Pivot the DataFrame, transforming rows into columns based on the 
specified value columns and aggregation functions.
+    ///
+    /// # Arguments
+    /// * `aggregate_functions` - Aggregation expressions to apply (e.g., sum, 
count).
+    /// * `value_column` - Columns whose unique values will become new columns 
in the output.
+    /// * `value_source` - Columns to use as values for the pivoted columns.
+    /// * `default_on_null` - Optional expressions to use as default values 
when a pivoted value is null.
+    ///
+    /// # Example
+    /// ```
+    /// # use datafusion::prelude::*;
+    /// # use arrow::array::{ArrayRef, Int32Array, StringArray};
+    /// # use datafusion::functions_aggregate::expr_fn::sum;
+    /// # use std::sync::Arc;
+    /// # let ctx = SessionContext::new();
+    /// let value: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+    /// let category: ArrayRef = Arc::new(StringArray::from(vec!["A", "B", 
"A"]));
+    /// let df = DataFrame::from_columns(vec![("value", value), ("category", 
category)]).unwrap();
+    /// let pivoted = df.pivot(
+    ///     vec![sum(col("value"))],
+    ///     vec![Column::from("category")],
+    ///     vec![col("value")],

Review Comment:
   The documentation example appears to be incorrect. The `value_source` 
parameter should contain literal values that will become column names (e.g., 
`lit("A")`, `lit("B")`), not `col("value")`. Based on the test cases and 
implementation, this should be:
   ```rust
   vec![lit("A"), lit("B")]
   ```
   ```suggestion
       ///     vec![lit("A"), lit("B")],
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to