Copilot commented on code in PR #17946:
URL: https://github.com/apache/datafusion/pull/17946#discussion_r2508155325
##########
datafusion/core/src/dataframe/mod.rs:
##########
@@ -2368,6 +2368,139 @@ impl DataFrame {
let df = ctx.read_batch(batch)?;
Ok(df)
}
+
+ /// Pivot the DataFrame, transforming rows into columns based on the
specified value columns and aggregation functions.
+ ///
+ /// # Arguments
+ /// * `aggregate_functions` - Aggregation expressions to apply (e.g., sum,
count).
+ /// * `value_column` - Columns whose unique values will become new columns
in the output.
+ /// * `value_source` - Columns to use as values for the pivoted columns.
+ /// * `default_on_null` - Optional expressions to use as default values
when a pivoted value is null.
+ ///
+ /// # Example
+ /// ```
+ /// # use datafusion::prelude::*;
+ /// # use arrow::array::{ArrayRef, Int32Array, StringArray};
+ /// # use datafusion::functions_aggregate::expr_fn::sum;
+ /// # use std::sync::Arc;
+ /// # let ctx = SessionContext::new();
+ /// let value: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+ /// let category: ArrayRef = Arc::new(StringArray::from(vec!["A", "B",
"A"]));
+ /// let df = DataFrame::from_columns(vec![("value", value), ("category",
category)]).unwrap();
+ /// let pivoted = df.pivot(
+ /// vec![sum(col("value"))],
+ /// vec![Column::from("category")],
+ /// vec![col("value")],
+ /// None
+ /// ).unwrap();
+ /// ```
+ pub fn pivot(
+ self,
+ aggregate_functions: Vec<Expr>,
+ value_column: Vec<Column>,
+ value_source: Vec<Expr>,
+ default_on_null: Option<Vec<Expr>>,
+ ) -> Result<Self> {
+ let plan = LogicalPlanBuilder::from(self.plan)
+ .pivot(
+ aggregate_functions,
+ value_column,
+ value_source,
+ default_on_null,
+ )?
+ .build()?;
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan,
+ projection_requires_validation:
self.projection_requires_validation,
+ })
+ }
+
+ /// Unpivot the DataFrame, transforming columns into rows.
+ ///
+ /// # Arguments
+ /// * `value_column_names` - Names for the value columns in the output
+ /// * `name_column` - Name for the column that will contain the original
column names
+ /// * `unpivot_columns` - List of (column_names, optional_alias) tuples to
unpivot
+ /// * `id_columns` - Optional list of columns to preserve (if None, all
non-unpivoted columns are preserved)
+ /// * `include_nulls` - Whether to include rows with NULL values (default:
false excludes NULLs)
+ ///
+ /// # Example
+ /// ```
+ /// # use std::sync::Arc;
+ /// # use arrow::array::{ArrayRef, Int32Array};
+ /// # use datafusion::prelude::*;
+ /// # use datafusion::error::Result;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<()> {
+ /// let ctx = SessionContext::new();
+ /// let id: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
+ /// let jan: ArrayRef = Arc::new(Int32Array::from(vec![100, 110]));
+ /// let feb: ArrayRef = Arc::new(Int32Array::from(vec![200, 210]));
+ /// let mar: ArrayRef = Arc::new(Int32Array::from(vec![300, 310]));
+ /// let df = DataFrame::from_columns(vec![("id", id), ("jan", jan),
("feb", feb), ("mar", mar)]).unwrap();
+ /// let unpivoted = df.unpivot(
+ /// vec!["jan".to_string(), "feb".to_string(), "mar".to_string()],
+ /// "month".to_string(),
+ /// vec![(vec!["jan".to_string(), "feb".to_string(),
"mar".to_string()], None)],
+ /// None,
+ /// false
+ /// ).unwrap();
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn unpivot(
+ self,
+ value_column_names: Vec<String>,
+ name_column: String,
+ unpivot_columns: Vec<(Vec<String>, Option<String>)>,
+ id_columns: Option<Vec<String>>,
+ include_nulls: bool,
+ ) -> Result<Self> {
+ // Get required UDF functions from the session state
+ let named_struct_fn = self
+ .session_state
+ .scalar_functions()
+ .get("named_struct")
+ .ok_or_else(|| {
+ DataFusionError::Plan("named_struct function not
found".to_string())
+ })?;
Review Comment:
Inconsistent indentation in the error handling closure. The opening brace
and closure body on lines 2465-2467 should be indented consistently with the
similar error handlers below on lines 2473-2475 and 2481-2483.
```suggestion
DataFusionError::Plan("named_struct function not
found".to_string())
})?;
```
##########
datafusion/core/src/dataframe/mod.rs:
##########
@@ -2368,6 +2368,139 @@ impl DataFrame {
let df = ctx.read_batch(batch)?;
Ok(df)
}
+
+ /// Pivot the DataFrame, transforming rows into columns based on the
specified value columns and aggregation functions.
+ ///
+ /// # Arguments
+ /// * `aggregate_functions` - Aggregation expressions to apply (e.g., sum,
count).
+ /// * `value_column` - Columns whose unique values will become new columns
in the output.
+ /// * `value_source` - Columns to use as values for the pivoted columns.
+ /// * `default_on_null` - Optional expressions to use as default values
when a pivoted value is null.
+ ///
+ /// # Example
+ /// ```
+ /// # use datafusion::prelude::*;
+ /// # use arrow::array::{ArrayRef, Int32Array, StringArray};
+ /// # use datafusion::functions_aggregate::expr_fn::sum;
+ /// # use std::sync::Arc;
+ /// # let ctx = SessionContext::new();
+ /// let value: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+ /// let category: ArrayRef = Arc::new(StringArray::from(vec!["A", "B",
"A"]));
+ /// let df = DataFrame::from_columns(vec![("value", value), ("category",
category)]).unwrap();
+ /// let pivoted = df.pivot(
+ /// vec![sum(col("value"))],
+ /// vec![Column::from("category")],
+ /// vec![col("value")],
+ /// None
+ /// ).unwrap();
+ /// ```
+ pub fn pivot(
+ self,
+ aggregate_functions: Vec<Expr>,
+ value_column: Vec<Column>,
+ value_source: Vec<Expr>,
+ default_on_null: Option<Vec<Expr>>,
+ ) -> Result<Self> {
+ let plan = LogicalPlanBuilder::from(self.plan)
+ .pivot(
+ aggregate_functions,
+ value_column,
+ value_source,
+ default_on_null,
+ )?
+ .build()?;
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan,
+ projection_requires_validation:
self.projection_requires_validation,
+ })
+ }
+
+ /// Unpivot the DataFrame, transforming columns into rows.
+ ///
+ /// # Arguments
+ /// * `value_column_names` - Names for the value columns in the output
+ /// * `name_column` - Name for the column that will contain the original
column names
+ /// * `unpivot_columns` - List of (column_names, optional_alias) tuples to
unpivot
+ /// * `id_columns` - Optional list of columns to preserve (if None, all
non-unpivoted columns are preserved)
+ /// * `include_nulls` - Whether to include rows with NULL values (default:
false excludes NULLs)
+ ///
+ /// # Example
+ /// ```
+ /// # use std::sync::Arc;
+ /// # use arrow::array::{ArrayRef, Int32Array};
+ /// # use datafusion::prelude::*;
+ /// # use datafusion::error::Result;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<()> {
+ /// let ctx = SessionContext::new();
+ /// let id: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
+ /// let jan: ArrayRef = Arc::new(Int32Array::from(vec![100, 110]));
+ /// let feb: ArrayRef = Arc::new(Int32Array::from(vec![200, 210]));
+ /// let mar: ArrayRef = Arc::new(Int32Array::from(vec![300, 310]));
+ /// let df = DataFrame::from_columns(vec![("id", id), ("jan", jan),
("feb", feb), ("mar", mar)]).unwrap();
+ /// let unpivoted = df.unpivot(
+ /// vec!["jan".to_string(), "feb".to_string(), "mar".to_string()],
+ /// "month".to_string(),
+ /// vec![(vec!["jan".to_string(), "feb".to_string(),
"mar".to_string()], None)],
Review Comment:
The documentation example is incorrect. The first parameter
`value_column_names` should be a single value column name (e.g.,
`vec!["value".to_string()]`), not the list of columns being unpivoted.
According to the implementation and test cases, the columns being unpivoted
should only be in the `unpivot_columns` parameter.
This should be:
```rust
vec!["value".to_string()], // Name for the value column in output
"month".to_string(),
vec![
(vec!["jan".to_string()], Some("jan".to_string())),
(vec!["feb".to_string()], Some("feb".to_string())),
(vec!["mar".to_string()], Some("mar".to_string()))
],
```
```suggestion
/// vec!["value".to_string()], // Name for the value column in output
/// "month".to_string(),
/// vec![
/// (vec!["jan".to_string()], Some("jan".to_string())),
/// (vec!["feb".to_string()], Some("feb".to_string())),
/// (vec!["mar".to_string()], Some("mar".to_string()))
/// ],
```
##########
datafusion/sql/src/relation/mod.rs:
##########
@@ -183,6 +187,208 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
.build()?;
(plan, alias)
}
+ TableFactor::Pivot {
+ table,
+ aggregate_functions,
+ value_column,
+ value_source,
+ default_on_null,
+ alias,
+ } => {
+ let plan =
+ self.create_relation(table.as_ref().clone(),
planner_context)?;
+ let schema = plan.schema();
+ let aggregate_functions = aggregate_functions
+ .into_iter()
+ .map(|func| {
+ self.sql_expr_to_logical_expr(func.expr, schema,
planner_context)
+ .map(|expr| match func.alias {
+ Some(name) => expr.alias(name.value),
+ None => expr,
+ })
+ })
+ .collect::<Result<Vec<_>>>()?;
+ let value_column = value_column.into_iter().map(|column| {
+ let expr = match column {
+ SqlExpr::Identifier(id) =>
self.sql_identifier_to_expr(id, schema, planner_context)?,
+ SqlExpr::CompoundIdentifier(idents) =>
self.sql_compound_identifier_to_expr(idents, schema, planner_context)?,
+ expr => return plan_err!(
+ "Expected column identifier, found: {expr:?} in
pivot value column"
+ ),
+ };
+ match expr {
+ Expr::Column(col) => Ok(col),
+ expr => plan_err!(
+ "Expected column identifier, found: {expr:?} in
pivot value column"
+ ),
+ }
+ }).collect::<Result<Vec<_>>>()?;
+
+ let PivotValueSource::List(source) = value_source else {
+ // Dynamic pivot: the output schema is determined by the
data in the source table at runtime.
+ return plan_err!("Dynamic pivot is not supported yet");
+ };
+ let value_source = source
+ .into_iter()
+ .map(|expr_with_alias| {
+ self.sql_expr_to_logical_expr(
+ expr_with_alias.expr,
+ schema,
+ planner_context,
+ )
+ .map(|expr| {
+ match expr_with_alias.alias {
+ Some(name) => expr.alias(name.value),
+ None => expr,
+ }
+ })
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let default_on_null = default_on_null
+ .map(|expr| {
+ let expr =
+ self.sql_expr_to_logical_expr(expr, schema,
planner_context)?;
+ match expr {
+ Expr::Literal(ScalarValue::List(list), _) =>
(0..list.len())
+ .map(|idx| {
+ Ok(Expr::Literal(
+
ScalarValue::try_from_array(list.values(), idx)?,
+ None,
+ ))
+ })
+ .collect::<Result<Vec<_>>>(),
+ _ => plan_err!("Pivot default value cannot be
NULL"),
Review Comment:
The error message is misleading. The code actually checks if the expression
is NOT a List literal, but the error says "Pivot default value cannot be NULL".
This should be more accurate, such as "Pivot default value must be a list
literal" or "Expected list literal for pivot default values".
```suggestion
_ => plan_err!("Pivot default value must be a
list literal"),
```
##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -1493,6 +1497,303 @@ impl LogicalPlanBuilder {
unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
.map(Self::new)
}
+
+ pub fn pivot(
+ self,
+ aggregate_functions: Vec<Expr>,
+ value_column: Vec<Column>,
+ value_source: Vec<Expr>,
+ default_on_null: Option<Vec<Expr>>,
+ ) -> Result<Self> {
+ match default_on_null {
+ Some(default_values) if default_values.len() !=
aggregate_functions.len() => {
+ return plan_err!("Number of default values must match the
number of aggregate functions");
+ }
+ _ => {}
+ }
+ let mut used_columns = HashSet::new();
+ used_columns.extend(value_column.iter().cloned());
+ for agg in aggregate_functions.iter() {
+ expr_to_columns(agg, &mut used_columns)?;
+ }
+
+ let used_columns = used_columns
+ .iter()
+ .map(|c| c.name.clone())
+ .collect::<HashSet<_>>();
+
+ // Extract group by columns (all columns not involved in aggregation
or pivot)
+ let schema = self.schema();
+
+ let group_by_columns = schema
+ .fields()
+ .iter()
+ .filter_map(|f| {
+ if used_columns.contains(f.name()) {
+ None // Skip columns that are used in aggregation or pivot
+ } else {
+ Some(Expr::Column(Column::from_name(f.name())))
+ }
+ })
+ .collect::<Vec<_>>();
+
+ // Create filtered aggregate expressions for each value in value_source
+ let mut aggr_exprs = Vec::new();
+
+ for value in &value_source {
+ let (value, value_alias) =
+ if let Expr::Alias(Alias { expr, name, .. }) = value {
+ (expr.as_ref(), name)
+ } else {
+ (value, &value.to_string())
+ };
+ let condition = match value_column.len() {
+ 0 => return plan_err!("Pivot requires at least one value
column"),
+ 1 => binary_expr(
+ Expr::Column(value_column[0].clone()),
+ Operator::IsNotDistinctFrom,
+ value.clone(),
+ ),
+ _ => {
+ let Expr::ScalarFunction(ScalarFunction { func, args }) =
value
+ else {
+ return plan_err!("Pivot value must be struct(literals)
if multiple value columns are provided");
+ };
+ if func.name() != "struct" {
+ return plan_err!("Pivot value must be struct(literals)
if multiple value columns are provided");
+ }
+ if args.len() != value_column.len() {
+ return plan_err!(
+ "Pivot value list length must match value column
count"
+ );
+ }
+ let mut condition: Option<Expr> = None;
+ for (idx, col) in value_column.iter().enumerate() {
+ let single_condition = binary_expr(
+ Expr::Column(col.clone()),
+ Operator::IsNotDistinctFrom,
+ args[idx].clone(),
+ );
+ condition = match condition {
+ None => Some(single_condition),
+ Some(prev) => Some(and(prev, single_condition)),
+ };
+ }
+ match condition {
+ None => {
+ return plan_err!("Pivot value condition cannot be
empty")
+ }
+ Some(cond) => cond,
+ }
Review Comment:
This check is unreachable and unnecessary. Since we're in the
`value_column.len() > 1` branch (the `_` case matches 2 or more), and we
iterate through `value_column.iter().enumerate()`, the `condition` variable
will always be `Some` after the loop completes. The first iteration sets it to
`Some`, and subsequent iterations keep it as `Some`. This check can never fail
and should be removed.
```suggestion
condition.unwrap()
```
##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -1493,6 +1497,303 @@ impl LogicalPlanBuilder {
unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
.map(Self::new)
}
+
+ pub fn pivot(
+ self,
+ aggregate_functions: Vec<Expr>,
+ value_column: Vec<Column>,
+ value_source: Vec<Expr>,
+ default_on_null: Option<Vec<Expr>>,
+ ) -> Result<Self> {
+ match default_on_null {
+ Some(default_values) if default_values.len() !=
aggregate_functions.len() => {
+ return plan_err!("Number of default values must match the
number of aggregate functions");
+ }
+ _ => {}
+ }
+ let mut used_columns = HashSet::new();
+ used_columns.extend(value_column.iter().cloned());
+ for agg in aggregate_functions.iter() {
+ expr_to_columns(agg, &mut used_columns)?;
+ }
+
+ let used_columns = used_columns
+ .iter()
+ .map(|c| c.name.clone())
+ .collect::<HashSet<_>>();
+
+ // Extract group by columns (all columns not involved in aggregation
or pivot)
+ let schema = self.schema();
+
+ let group_by_columns = schema
+ .fields()
+ .iter()
+ .filter_map(|f| {
+ if used_columns.contains(f.name()) {
+ None // Skip columns that are used in aggregation or pivot
+ } else {
+ Some(Expr::Column(Column::from_name(f.name())))
+ }
+ })
+ .collect::<Vec<_>>();
+
+ // Create filtered aggregate expressions for each value in value_source
+ let mut aggr_exprs = Vec::new();
+
+ for value in &value_source {
+ let (value, value_alias) =
+ if let Expr::Alias(Alias { expr, name, .. }) = value {
+ (expr.as_ref(), name)
+ } else {
+ (value, &value.to_string())
+ };
+ let condition = match value_column.len() {
+ 0 => return plan_err!("Pivot requires at least one value
column"),
+ 1 => binary_expr(
+ Expr::Column(value_column[0].clone()),
+ Operator::IsNotDistinctFrom,
+ value.clone(),
+ ),
+ _ => {
+ let Expr::ScalarFunction(ScalarFunction { func, args }) =
value
+ else {
+ return plan_err!("Pivot value must be struct(literals)
if multiple value columns are provided");
+ };
+ if func.name() != "struct" {
+ return plan_err!("Pivot value must be struct(literals)
if multiple value columns are provided");
+ }
+ if args.len() != value_column.len() {
+ return plan_err!(
+ "Pivot value list length must match value column
count"
+ );
+ }
+ let mut condition: Option<Expr> = None;
+ for (idx, col) in value_column.iter().enumerate() {
+ let single_condition = binary_expr(
+ Expr::Column(col.clone()),
+ Operator::IsNotDistinctFrom,
+ args[idx].clone(),
+ );
+ condition = match condition {
+ None => Some(single_condition),
+ Some(prev) => Some(and(prev, single_condition)),
+ };
+ }
+ match condition {
+ None => {
+ return plan_err!("Pivot value condition cannot be
empty")
+ }
+ Some(cond) => cond,
+ }
+ }
+ };
+
+ for (i, agg_func) in aggregate_functions.iter().enumerate() {
+ let (expr, name, metadata) = match agg_func {
+ Expr::Alias(Alias {
+ expr,
+ name,
+ metadata,
+ ..
+ }) if matches!(expr.as_ref(), Expr::AggregateFunction(_))
=> {
+ (expr.as_ref(), name, metadata)
+ }
+ Expr::AggregateFunction(_) => {
+ (agg_func, &agg_func.to_string(), &None)
+ }
+ _ => {
+ return plan_err!(
+ "Pivot aggregate function must be either an alias
or an aggregate function expression, but got: {agg_func:?}"
+ );
+ }
+ };
+ let expr = expr
+ .clone()
+ .transform(|nested_expr| match &nested_expr {
+ Expr::AggregateFunction(func) => {
+ let filter = match &func.params.filter {
+ Some(filter) => {
+ and(filter.as_ref().clone(),
condition.clone())
+ }
+ None => condition.clone(),
+ };
+ let mut func = func.clone();
+ func.params.filter = Some(Box::new(filter));
+ Ok(Transformed::yes(Expr::AggregateFunction(func)))
+ }
+ _ => Ok(Transformed::no(nested_expr)),
+ })?
+ .data;
+
+ let expr = match default_on_null.as_ref() {
+ Some(default_values) => {
+ when(expr.clone().is_null(), default_values[i].clone())
+ .otherwise(expr)?
+ }
+ None => expr,
+ };
+ let pivot_col_name = format!(
+ "{}_{}",
+ value_alias.replace("\"", "").replace("'", ""),
+ name
+ );
+ aggr_exprs
+ .push(expr.alias_with_metadata(pivot_col_name,
metadata.clone()));
+ }
+ }
+
+ let aggregate_plan = self.aggregate(group_by_columns, aggr_exprs)?;
+
+ Ok(aggregate_plan)
+ }
+
+ #[allow(clippy::too_many_arguments)]
+ pub fn unpivot(
+ self,
+ value_column_names: Vec<String>,
+ name_column: String,
+ unpivot_columns: Vec<(Vec<String>, Option<String>)>,
+ id_columns: Option<Vec<String>>,
+ include_nulls: bool,
+ named_struct_fn: &Arc<ScalarUDF>,
+ make_array_fn: &Arc<ScalarUDF>,
+ get_field_fn: &Arc<ScalarUDF>,
+ ) -> Result<Self> {
+ let schema = self.schema();
+ let num_value_columns = value_column_names.len();
+
+ // Validate that all unpivot columns have the same number of columns
+ for (cols, _) in &unpivot_columns {
+ if cols.len() != num_value_columns {
+ return plan_err!(
+ "All unpivot columns must have {} column(s), but found {}",
+ num_value_columns,
+ cols.len()
+ );
+ }
+ }
+
+ // Get the list of columns that should be preserved (not unpivoted)
+ let unpivot_col_set: HashSet<String> = unpivot_columns
+ .iter()
+ .flat_map(|(cols, _)| cols.iter().cloned())
+ .collect();
+
+ let preserved_columns: Vec<Expr> = if let Some(id_columns) =
id_columns {
+ id_columns
+ .iter()
+ .map(|col| Expr::Column(Column::from_name(col)))
+ .collect()
+ } else {
+ schema
+ .iter()
+ .filter_map(|(q, f)| {
+ if !unpivot_col_set.contains(f.name()) {
+ Some(Expr::Column(Column::new(q.cloned(), f.name())))
+ } else {
+ None
+ }
+ })
+ .collect()
+ };
+
+ // Build array of structs: array[struct(name_val, col1_val, col2_val,
...), ...]
+ let mut struct_exprs = Vec::new();
+
+ for (col_names, alias_opt) in unpivot_columns {
+ // Build struct fields: [name_literal, name_column_name, value1,
value1_name, value2, value2_name, ...]
+ let mut struct_fields = Vec::new();
+
+ // Add name field
+ let name_value = alias_opt.unwrap_or_else(|| col_names[0].clone());
+ struct_fields.push(lit(name_column.clone()));
+ struct_fields.push(lit(name_value));
+
+ // Add value fields
+ for (i, col_name) in col_names.iter().enumerate() {
+ struct_fields.push(lit(value_column_names[i].clone()));
+
struct_fields.push(Expr::Column(Column::from_qualified_name(col_name)));
+ }
+
+ // Create struct expression
+ let struct_expr = Expr::ScalarFunction(ScalarFunction::new_udf(
+ Arc::clone(named_struct_fn),
+ struct_fields,
+ ));
+
+ struct_exprs.push(struct_expr);
+ }
+
+ let unpivot_array_column = "__unpivot_array";
Review Comment:
The hardcoded column name `__unpivot_array` could potentially conflict with
existing columns in the schema. If a user's table already has a column named
`__unpivot_array`, this would cause a collision and unexpected behavior.
Consider either:
1. Checking if this name exists in the schema and generating a unique name
if needed
2. Using a UUID or other guaranteed-unique identifier
3. Using a more obscure prefix that's less likely to conflict (though this
doesn't eliminate the risk)
```suggestion
// Generate a unique column name for the unpivot array to avoid
collisions
let mut base_name = "__unpivot_array".to_string();
let mut unpivot_array_column = base_name.clone();
let mut i = 1;
let existing_names: HashSet<&str> = schema.iter().map(|(_, f)|
f.name().as_str()).collect();
while existing_names.contains(unpivot_array_column.as_str()) {
unpivot_array_column = format!("{}{}", base_name, i);
i += 1;
}
```
##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -1493,6 +1497,303 @@ impl LogicalPlanBuilder {
unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
.map(Self::new)
}
+
+ pub fn pivot(
+ self,
+ aggregate_functions: Vec<Expr>,
+ value_column: Vec<Column>,
+ value_source: Vec<Expr>,
+ default_on_null: Option<Vec<Expr>>,
+ ) -> Result<Self> {
+ match default_on_null {
+ Some(default_values) if default_values.len() !=
aggregate_functions.len() => {
+ return plan_err!("Number of default values must match the
number of aggregate functions");
+ }
+ _ => {}
+ }
+ let mut used_columns = HashSet::new();
+ used_columns.extend(value_column.iter().cloned());
+ for agg in aggregate_functions.iter() {
+ expr_to_columns(agg, &mut used_columns)?;
+ }
+
+ let used_columns = used_columns
+ .iter()
+ .map(|c| c.name.clone())
+ .collect::<HashSet<_>>();
+
+ // Extract group by columns (all columns not involved in aggregation
or pivot)
+ let schema = self.schema();
+
+ let group_by_columns = schema
+ .fields()
+ .iter()
+ .filter_map(|f| {
+ if used_columns.contains(f.name()) {
+ None // Skip columns that are used in aggregation or pivot
+ } else {
+ Some(Expr::Column(Column::from_name(f.name())))
+ }
+ })
+ .collect::<Vec<_>>();
+
+ // Create filtered aggregate expressions for each value in value_source
+ let mut aggr_exprs = Vec::new();
+
+ for value in &value_source {
+ let (value, value_alias) =
+ if let Expr::Alias(Alias { expr, name, .. }) = value {
+ (expr.as_ref(), name)
+ } else {
+ (value, &value.to_string())
+ };
+ let condition = match value_column.len() {
+ 0 => return plan_err!("Pivot requires at least one value
column"),
+ 1 => binary_expr(
+ Expr::Column(value_column[0].clone()),
+ Operator::IsNotDistinctFrom,
+ value.clone(),
+ ),
+ _ => {
+ let Expr::ScalarFunction(ScalarFunction { func, args }) =
value
+ else {
+ return plan_err!("Pivot value must be struct(literals)
if multiple value columns are provided");
+ };
+ if func.name() != "struct" {
+ return plan_err!("Pivot value must be struct(literals)
if multiple value columns are provided");
+ }
+ if args.len() != value_column.len() {
+ return plan_err!(
+ "Pivot value list length must match value column
count"
+ );
+ }
+ let mut condition: Option<Expr> = None;
+ for (idx, col) in value_column.iter().enumerate() {
+ let single_condition = binary_expr(
+ Expr::Column(col.clone()),
+ Operator::IsNotDistinctFrom,
+ args[idx].clone(),
+ );
+ condition = match condition {
+ None => Some(single_condition),
+ Some(prev) => Some(and(prev, single_condition)),
+ };
+ }
+ match condition {
+ None => {
+ return plan_err!("Pivot value condition cannot be
empty")
+ }
+ Some(cond) => cond,
+ }
+ }
+ };
+
+ for (i, agg_func) in aggregate_functions.iter().enumerate() {
+ let (expr, name, metadata) = match agg_func {
+ Expr::Alias(Alias {
+ expr,
+ name,
+ metadata,
+ ..
+ }) if matches!(expr.as_ref(), Expr::AggregateFunction(_))
=> {
+ (expr.as_ref(), name, metadata)
+ }
+ Expr::AggregateFunction(_) => {
+ (agg_func, &agg_func.to_string(), &None)
+ }
+ _ => {
+ return plan_err!(
+ "Pivot aggregate function must be either an alias
or an aggregate function expression, but got: {agg_func:?}"
+ );
+ }
+ };
+ let expr = expr
+ .clone()
+ .transform(|nested_expr| match &nested_expr {
+ Expr::AggregateFunction(func) => {
+ let filter = match &func.params.filter {
+ Some(filter) => {
+ and(filter.as_ref().clone(),
condition.clone())
+ }
+ None => condition.clone(),
+ };
+ let mut func = func.clone();
+ func.params.filter = Some(Box::new(filter));
+ Ok(Transformed::yes(Expr::AggregateFunction(func)))
+ }
+ _ => Ok(Transformed::no(nested_expr)),
+ })?
+ .data;
+
+ let expr = match default_on_null.as_ref() {
+ Some(default_values) => {
+ when(expr.clone().is_null(), default_values[i].clone())
+ .otherwise(expr)?
+ }
+ None => expr,
+ };
+ let pivot_col_name = format!(
+ "{}_{}",
+ value_alias.replace("\"", "").replace("'", ""),
+ name
+ );
+ aggr_exprs
+ .push(expr.alias_with_metadata(pivot_col_name,
metadata.clone()));
+ }
+ }
+
+ let aggregate_plan = self.aggregate(group_by_columns, aggr_exprs)?;
+
+ Ok(aggregate_plan)
+ }
+
+ #[allow(clippy::too_many_arguments)]
+ pub fn unpivot(
+ self,
+ value_column_names: Vec<String>,
+ name_column: String,
+ unpivot_columns: Vec<(Vec<String>, Option<String>)>,
+ id_columns: Option<Vec<String>>,
+ include_nulls: bool,
+ named_struct_fn: &Arc<ScalarUDF>,
+ make_array_fn: &Arc<ScalarUDF>,
+ get_field_fn: &Arc<ScalarUDF>,
+ ) -> Result<Self> {
Review Comment:
Missing validation for empty input vectors. The function should validate
that `value_column_names` and `unpivot_columns` are not empty, as an unpivot
operation without these doesn't make semantic sense. Consider adding early
validation like:
```rust
if value_column_names.is_empty() {
return plan_err!("Unpivot requires at least one value column name");
}
if unpivot_columns.is_empty() {
return plan_err!("Unpivot requires at least one column to unpivot");
}
```
```suggestion
) -> Result<Self> {
if value_column_names.is_empty() {
return plan_err!("Unpivot requires at least one value column
name");
}
if unpivot_columns.is_empty() {
return plan_err!("Unpivot requires at least one column to
unpivot");
}
```
##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -1493,6 +1497,303 @@ impl LogicalPlanBuilder {
unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
.map(Self::new)
}
+
+ pub fn pivot(
+ self,
+ aggregate_functions: Vec<Expr>,
+ value_column: Vec<Column>,
+ value_source: Vec<Expr>,
+ default_on_null: Option<Vec<Expr>>,
+ ) -> Result<Self> {
Review Comment:
Missing validation for empty input vectors. The function should validate
that `aggregate_functions` and `value_source` are not empty, as a pivot
operation without aggregate functions or values doesn't make semantic sense.
Consider adding early validation like:
```rust
if aggregate_functions.is_empty() {
return plan_err!("Pivot requires at least one aggregate function");
}
if value_source.is_empty() {
return plan_err!("Pivot requires at least one value in value_source");
}
```
```suggestion
) -> Result<Self> {
if aggregate_functions.is_empty() {
return plan_err!("Pivot requires at least one aggregate
function");
}
if value_source.is_empty() {
return plan_err!("Pivot requires at least one value in
value_source");
}
```
##########
datafusion/sql/src/relation/mod.rs:
##########
@@ -183,6 +187,208 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
.build()?;
(plan, alias)
}
+ TableFactor::Pivot {
+ table,
+ aggregate_functions,
+ value_column,
+ value_source,
+ default_on_null,
+ alias,
+ } => {
+ let plan =
+ self.create_relation(table.as_ref().clone(),
planner_context)?;
+ let schema = plan.schema();
+ let aggregate_functions = aggregate_functions
+ .into_iter()
+ .map(|func| {
+ self.sql_expr_to_logical_expr(func.expr, schema,
planner_context)
+ .map(|expr| match func.alias {
+ Some(name) => expr.alias(name.value),
+ None => expr,
+ })
+ })
+ .collect::<Result<Vec<_>>>()?;
+ let value_column = value_column.into_iter().map(|column| {
+ let expr = match column {
+ SqlExpr::Identifier(id) =>
self.sql_identifier_to_expr(id, schema, planner_context)?,
+ SqlExpr::CompoundIdentifier(idents) =>
self.sql_compound_identifier_to_expr(idents, schema, planner_context)?,
+ expr => return plan_err!(
+ "Expected column identifier, found: {expr:?} in
pivot value column"
+ ),
+ };
+ match expr {
+ Expr::Column(col) => Ok(col),
+ expr => plan_err!(
+ "Expected column identifier, found: {expr:?} in
pivot value column"
+ ),
+ }
+ }).collect::<Result<Vec<_>>>()?;
+
+ let PivotValueSource::List(source) = value_source else {
+ // Dynamic pivot: the output schema is determined by the
data in the source table at runtime.
+ return plan_err!("Dynamic pivot is not supported yet");
+ };
+ let value_source = source
+ .into_iter()
+ .map(|expr_with_alias| {
+ self.sql_expr_to_logical_expr(
+ expr_with_alias.expr,
+ schema,
+ planner_context,
+ )
+ .map(|expr| {
+ match expr_with_alias.alias {
+ Some(name) => expr.alias(name.value),
+ None => expr,
+ }
+ })
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let default_on_null = default_on_null
+ .map(|expr| {
+ let expr =
+ self.sql_expr_to_logical_expr(expr, schema,
planner_context)?;
+ match expr {
+ Expr::Literal(ScalarValue::List(list), _) =>
(0..list.len())
+ .map(|idx| {
+ Ok(Expr::Literal(
+
ScalarValue::try_from_array(list.values(), idx)?,
+ None,
+ ))
+ })
+ .collect::<Result<Vec<_>>>(),
+ _ => plan_err!("Pivot default value cannot be
NULL"),
+ }
+ })
+ .transpose()?;
+
+ let plan = LogicalPlanBuilder::from(plan)
+ .pivot(
+ aggregate_functions,
+ value_column,
+ value_source,
+ default_on_null,
+ )?
+ .build()?;
+ (plan, alias)
+ }
+ TableFactor::Unpivot {
+ table,
+ value,
+ name,
+ columns,
+ null_inclusion,
+ alias,
+ } => {
+ let plan =
+ self.create_relation(table.as_ref().clone(),
planner_context)?;
+
+ // Parse value expression(s)
+ let value_columns = match value {
+ SqlExpr::Tuple(exprs) => exprs,
+ single_expr => vec![single_expr],
+ };
+ let value_column_names: Vec<String> = value_columns
+ .into_iter()
+ .map(|expr| match expr {
+ SqlExpr::Identifier(id) => Ok(id.value),
+ _ => plan_err!("Expected identifier in UNPIVOT value
clause"),
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // Parse name column
+ let name_column = name.value;
+
+ // Parse columns to unpivot
+ let unpivot_columns: Vec<(Vec<String>, Option<String>)> =
columns
+ .into_iter()
+ .map(|col_with_alias| {
+ let column_names = match &col_with_alias.expr {
+ SqlExpr::Tuple(exprs) => exprs
+ .iter()
+ .map(|e| match e {
+ SqlExpr::Identifier(id) =>
Ok(id.value.clone()),
+ SqlExpr::CompoundIdentifier(ids) => Ok(ids
+ .iter()
+ .map(|i| i.value.as_str())
+ .collect::<Vec<_>>()
+ .join(".")),
+ _ => plan_err!(
+ "Expected identifier in UNPIVOT IN
clause"
+ ),
+ })
+ .collect::<Result<Vec<_>>>()?,
+ SqlExpr::Identifier(id) => vec![id.value.clone()],
+ SqlExpr::CompoundIdentifier(ids) => {
+ vec![ids
+ .iter()
+ .map(|i| i.value.as_str())
+ .collect::<Vec<_>>()
+ .join(".")]
+ }
+ _ => {
+ return plan_err!(
+ "Expected identifier or tuple in UNPIVOT
IN clause"
+ )
+ }
+ };
+
+ let alias = col_with_alias.alias.map(|alias|
alias.value);
+ Ok((column_names, alias))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // Determine if nulls should be included (default is EXCLUDE)
+ let include_nulls = matches!(
+ null_inclusion,
+ Some(sqlparser::ast::NullInclusion::IncludeNulls)
+ );
+
+ // Get named_struct and make_array functions from context
+ let named_struct_fn = self
+ .context_provider
+ .get_function_meta("named_struct")
+ .ok_or_else(|| {
+ datafusion_common::DataFusionError::Plan(
+ "named_struct function not found".to_string(),
+ )
+ })?;
+
+ let make_array_fn = self
+ .context_provider
+ .get_function_meta("make_array")
+ .ok_or_else(|| {
+ datafusion_common::DataFusionError::Plan(
+ "make_array function not found".to_string(),
+ )
+ })?;
+
+ let get_field_fn = self
+ .context_provider
+ .get_function_meta("get_field")
+ .ok_or_else(|| {
+ datafusion_common::DataFusionError::Plan(
+ "get_field function not found".to_string(),
+ )
+ })?;
Review Comment:
Inconsistent indentation in the error handling closure. The opening brace on
line 370 should be indented to align with the previous error handlers on lines
352 and 361 for consistency.
```suggestion
datafusion_common::DataFusionError::Plan(
"get_field function not found".to_string(),
)
})?;
```
##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -1493,6 +1497,303 @@ impl LogicalPlanBuilder {
unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
.map(Self::new)
}
+
+ pub fn pivot(
+ self,
+ aggregate_functions: Vec<Expr>,
+ value_column: Vec<Column>,
+ value_source: Vec<Expr>,
+ default_on_null: Option<Vec<Expr>>,
+ ) -> Result<Self> {
+ match default_on_null {
+ Some(default_values) if default_values.len() !=
aggregate_functions.len() => {
+ return plan_err!("Number of default values must match the
number of aggregate functions");
+ }
+ _ => {}
+ }
+ let mut used_columns = HashSet::new();
+ used_columns.extend(value_column.iter().cloned());
+ for agg in aggregate_functions.iter() {
+ expr_to_columns(agg, &mut used_columns)?;
+ }
+
+ let used_columns = used_columns
+ .iter()
+ .map(|c| c.name.clone())
+ .collect::<HashSet<_>>();
+
+ // Extract group by columns (all columns not involved in aggregation
or pivot)
+ let schema = self.schema();
+
+ let group_by_columns = schema
+ .fields()
+ .iter()
+ .filter_map(|f| {
+ if used_columns.contains(f.name()) {
+ None // Skip columns that are used in aggregation or pivot
+ } else {
+ Some(Expr::Column(Column::from_name(f.name())))
+ }
+ })
+ .collect::<Vec<_>>();
+
+ // Create filtered aggregate expressions for each value in value_source
+ let mut aggr_exprs = Vec::new();
+
+ for value in &value_source {
+ let (value, value_alias) =
+ if let Expr::Alias(Alias { expr, name, .. }) = value {
+ (expr.as_ref(), name)
+ } else {
+ (value, &value.to_string())
+ };
+ let condition = match value_column.len() {
+ 0 => return plan_err!("Pivot requires at least one value
column"),
+ 1 => binary_expr(
+ Expr::Column(value_column[0].clone()),
+ Operator::IsNotDistinctFrom,
+ value.clone(),
+ ),
+ _ => {
+ let Expr::ScalarFunction(ScalarFunction { func, args }) =
value
+ else {
+ return plan_err!("Pivot value must be struct(literals)
if multiple value columns are provided");
+ };
+ if func.name() != "struct" {
+ return plan_err!("Pivot value must be struct(literals)
if multiple value columns are provided");
+ }
+ if args.len() != value_column.len() {
+ return plan_err!(
+ "Pivot value list length must match value column
count"
+ );
+ }
+ let mut condition: Option<Expr> = None;
+ for (idx, col) in value_column.iter().enumerate() {
+ let single_condition = binary_expr(
+ Expr::Column(col.clone()),
+ Operator::IsNotDistinctFrom,
+ args[idx].clone(),
+ );
+ condition = match condition {
+ None => Some(single_condition),
+ Some(prev) => Some(and(prev, single_condition)),
+ };
+ }
+ match condition {
+ None => {
+ return plan_err!("Pivot value condition cannot be
empty")
+ }
+ Some(cond) => cond,
+ }
+ }
+ };
+
+ for (i, agg_func) in aggregate_functions.iter().enumerate() {
+ let (expr, name, metadata) = match agg_func {
+ Expr::Alias(Alias {
+ expr,
+ name,
+ metadata,
+ ..
+ }) if matches!(expr.as_ref(), Expr::AggregateFunction(_))
=> {
+ (expr.as_ref(), name, metadata)
+ }
+ Expr::AggregateFunction(_) => {
+ (agg_func, &agg_func.to_string(), &None)
+ }
+ _ => {
+ return plan_err!(
+ "Pivot aggregate function must be either an alias
or an aggregate function expression, but got: {agg_func:?}"
+ );
+ }
+ };
+ let expr = expr
+ .clone()
+ .transform(|nested_expr| match &nested_expr {
+ Expr::AggregateFunction(func) => {
+ let filter = match &func.params.filter {
+ Some(filter) => {
+ and(filter.as_ref().clone(),
condition.clone())
+ }
+ None => condition.clone(),
+ };
+ let mut func = func.clone();
+ func.params.filter = Some(Box::new(filter));
+ Ok(Transformed::yes(Expr::AggregateFunction(func)))
+ }
+ _ => Ok(Transformed::no(nested_expr)),
+ })?
+ .data;
+
+ let expr = match default_on_null.as_ref() {
+ Some(default_values) => {
+ when(expr.clone().is_null(), default_values[i].clone())
+ .otherwise(expr)?
+ }
+ None => expr,
+ };
+ let pivot_col_name = format!(
+ "{}_{}",
+ value_alias.replace("\"", "").replace("'", ""),
Review Comment:
Using string replacements to sanitize column names is fragile and may not
handle all edge cases. Consider using a more robust approach or documenting why
only quotes need to be removed. For example, other special characters like
backticks, brackets, or control characters might also appear in aliases.
```suggestion
sanitize_column_name(value_alias),
```
##########
datafusion/core/src/dataframe/mod.rs:
##########
@@ -2368,6 +2368,139 @@ impl DataFrame {
let df = ctx.read_batch(batch)?;
Ok(df)
}
+
+ /// Pivot the DataFrame, transforming rows into columns based on the
specified value columns and aggregation functions.
+ ///
+ /// # Arguments
+ /// * `aggregate_functions` - Aggregation expressions to apply (e.g., sum,
count).
+ /// * `value_column` - Columns whose unique values will become new columns
in the output.
+ /// * `value_source` - Columns to use as values for the pivoted columns.
+ /// * `default_on_null` - Optional expressions to use as default values
when a pivoted value is null.
+ ///
+ /// # Example
+ /// ```
+ /// # use datafusion::prelude::*;
+ /// # use arrow::array::{ArrayRef, Int32Array, StringArray};
+ /// # use datafusion::functions_aggregate::expr_fn::sum;
+ /// # use std::sync::Arc;
+ /// # let ctx = SessionContext::new();
+ /// let value: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+ /// let category: ArrayRef = Arc::new(StringArray::from(vec!["A", "B",
"A"]));
+ /// let df = DataFrame::from_columns(vec![("value", value), ("category",
category)]).unwrap();
+ /// let pivoted = df.pivot(
+ /// vec![sum(col("value"))],
+ /// vec![Column::from("category")],
+ /// vec![col("value")],
Review Comment:
The documentation example appears to be incorrect. The `value_source`
parameter should contain literal values that will become column names (e.g.,
`lit("A")`, `lit("B")`), not `col("value")`. Based on the test cases and
implementation, this should be:
```rust
vec![lit("A"), lit("B")]
```
```suggestion
/// vec![lit("A"), lit("B")],
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]