This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 4fa823219e chore: refactor Substrait consumer's "rename_field" and implement the rest of types (#16345) 4fa823219e is described below commit 4fa823219e32ca94a4878a387691a325f79e66d3 Author: Arttu <blizz...@users.noreply.github.com> AuthorDate: Thu Jun 12 05:32:04 2025 +0100 chore: refactor Substrait consumer's "rename_field" and implement the rest of types (#16345) * refactor Substrait consumer's "rename_field" and implement the rest of types The rename_field is a bit confusing with it's "rename_self" parameter. Also there are times when one wants to just rename a data type. And I'd like to reuse the same code within our codebase, since with Substrait the case of renaming a field/type comes up a bunch, so I'd like to make these functions "pub". And lastly, this adds support for all list types, as well as dict/ree/union types. Dunno how necessary those are, but seems right to support them still. * datatype => data_type * fix clippy --- .../src/logical_plan/consumer/expr/mod.rs | 5 +- .../substrait/src/logical_plan/consumer/utils.rs | 268 ++++++++++++++------- 2 files changed, 181 insertions(+), 92 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/mod.rs b/datafusion/substrait/src/logical_plan/consumer/expr/mod.rs index b3ec2e3781..d701827671 100644 --- a/datafusion/substrait/src/logical_plan/consumer/expr/mod.rs +++ b/datafusion/substrait/src/logical_plan/consumer/expr/mod.rs @@ -38,9 +38,9 @@ pub use subquery::*; pub use window_function::*; use crate::extensions::Extensions; -use crate::logical_plan::consumer::utils::rename_field; use crate::logical_plan::consumer::{ - from_substrait_named_struct, DefaultSubstraitConsumer, SubstraitConsumer, + from_substrait_named_struct, rename_field, DefaultSubstraitConsumer, + SubstraitConsumer, }; use datafusion::arrow::datatypes::Field; use datafusion::common::{not_impl_err, plan_err, substrait_err, DFSchema, DFSchemaRef}; @@ -152,7 +152,6 @@ pub async fn from_substrait_extended_expr( &substrait_expr.output_names, expr_idx, &mut names_idx, - /*rename_self=*/ true, )?; exprs.push((expr, output_field)); } diff --git a/datafusion/substrait/src/logical_plan/consumer/utils.rs b/datafusion/substrait/src/logical_plan/consumer/utils.rs index a267971ff8..396c5e673f 100644 --- a/datafusion/substrait/src/logical_plan/consumer/utils.rs +++ b/datafusion/substrait/src/logical_plan/consumer/utils.rs @@ -16,10 +16,10 @@ // under the License. use crate::logical_plan::consumer::SubstraitConsumer; -use datafusion::arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, UnionFields}; use datafusion::common::{ - not_impl_err, substrait_datafusion_err, substrait_err, DFSchema, DFSchemaRef, - TableReference, + exec_err, not_impl_err, substrait_datafusion_err, substrait_err, DFSchema, + DFSchemaRef, TableReference, }; use datafusion::logical_expr::expr::Sort; use datafusion::logical_expr::{Cast, Expr, ExprSchemable, LogicalPlanBuilder}; @@ -81,98 +81,167 @@ pub(super) fn next_struct_field_name( } } -pub(super) fn rename_field( +/// Traverse through the field, renaming the provided field itself and all its inner struct fields. +pub fn rename_field( field: &Field, dfs_names: &Vec<String>, unnamed_field_suffix: usize, // If Substrait doesn't provide a name, we'll use this "c{unnamed_field_suffix}" name_idx: &mut usize, // Index into dfs_names - rename_self: bool, // Some fields (e.g. list items) don't have names in Substrait and this will be false to keep old name ) -> datafusion::common::Result<Field> { - let name = if rename_self { - next_struct_field_name(unnamed_field_suffix, dfs_names, name_idx)? - } else { - field.name().to_string() - }; - match field.data_type() { + let name = next_struct_field_name(unnamed_field_suffix, dfs_names, name_idx)?; + rename_fields_data_type(field.clone().with_name(name), dfs_names, name_idx) +} + +/// Rename the field's data type but not the field itself. +pub fn rename_fields_data_type( + field: Field, + dfs_names: &Vec<String>, + name_idx: &mut usize, // Index into dfs_names +) -> datafusion::common::Result<Field> { + let dt = rename_data_type(field.data_type(), dfs_names, name_idx)?; + Ok(field.with_data_type(dt)) +} + +/// Traverse through the data type (incl. lists/maps/etc), renaming all inner struct fields. +pub fn rename_data_type( + data_type: &DataType, + dfs_names: &Vec<String>, + name_idx: &mut usize, // Index into dfs_names +) -> datafusion::common::Result<DataType> { + match data_type { DataType::Struct(children) => { let children = children .iter() .enumerate() - .map(|(child_idx, f)| { - rename_field( - f.as_ref(), - dfs_names, - child_idx, - name_idx, - /*rename_self=*/ true, - ) + .map(|(field_idx, f)| { + rename_field(f.as_ref(), dfs_names, field_idx, name_idx) }) .collect::<datafusion::common::Result<_>>()?; - Ok(field - .to_owned() - .with_name(name) - .with_data_type(DataType::Struct(children))) + Ok(DataType::Struct(children)) } - DataType::List(inner) => { - let renamed_inner = rename_field( - inner.as_ref(), + DataType::List(inner) => Ok(DataType::List(Arc::new(rename_fields_data_type( + inner.as_ref().to_owned(), + dfs_names, + name_idx, + )?))), + DataType::LargeList(inner) => Ok(DataType::LargeList(Arc::new( + rename_fields_data_type(inner.as_ref().to_owned(), dfs_names, name_idx)?, + ))), + DataType::ListView(inner) => Ok(DataType::ListView(Arc::new( + rename_fields_data_type(inner.as_ref().to_owned(), dfs_names, name_idx)?, + ))), + DataType::LargeListView(inner) => Ok(DataType::LargeListView(Arc::new( + rename_fields_data_type(inner.as_ref().to_owned(), dfs_names, name_idx)?, + ))), + DataType::FixedSizeList(inner, len) => Ok(DataType::FixedSizeList( + Arc::new(rename_fields_data_type( + inner.as_ref().to_owned(), dfs_names, - 0, name_idx, - /*rename_self=*/ false, - )?; - Ok(field - .to_owned() - .with_data_type(DataType::List(FieldRef::new(renamed_inner))) - .with_name(name)) + )?), + *len, + )), + DataType::Map(entries, sorted) => { + let entries_data_type = match entries.data_type() { + DataType::Struct(fields) => { + // This should be two fields, normally "key" and "value", but not guaranteed + let fields = fields + .iter() + .map(|f| { + rename_fields_data_type( + f.as_ref().to_owned(), + dfs_names, + name_idx, + ) + }) + .collect::<datafusion::common::Result<_>>()?; + Ok(DataType::Struct(fields)) + } + _ => exec_err!("Expected map type to contain an inner struct type"), + }?; + Ok(DataType::Map( + Arc::new( + entries + .as_ref() + .to_owned() + .with_data_type(entries_data_type), + ), + *sorted, + )) } - DataType::LargeList(inner) => { - let renamed_inner = rename_field( - inner.as_ref(), + DataType::Dictionary(key_type, value_type) => { + // Dicts probably shouldn't contain structs, but support them just in case one does + Ok(DataType::Dictionary( + Box::new(rename_data_type(key_type, dfs_names, name_idx)?), + Box::new(rename_data_type(value_type, dfs_names, name_idx)?), + )) + } + DataType::RunEndEncoded(run_ends_field, values_field) => { + // At least the run_ends_field shouldn't contain names (since it should be i16/i32/i64), + // but we'll try renaming its datatype just in case. + let run_ends_field = rename_fields_data_type( + run_ends_field.as_ref().clone(), + dfs_names, + name_idx, + )?; + let values_field = rename_fields_data_type( + values_field.as_ref().clone(), dfs_names, - 0, name_idx, - /*rename_self= */ false, )?; - Ok(field - .to_owned() - .with_data_type(DataType::LargeList(FieldRef::new(renamed_inner))) - .with_name(name)) + + Ok(DataType::RunEndEncoded( + Arc::new(run_ends_field), + Arc::new(values_field), + )) } - DataType::Map(inner, sorted) => match inner.data_type() { - DataType::Struct(key_and_value) if key_and_value.len() == 2 => { - let renamed_keys = rename_field( - key_and_value[0].as_ref(), - dfs_names, - 0, - name_idx, - /*rename_self=*/ false, - )?; - let renamed_values = rename_field( - key_and_value[1].as_ref(), - dfs_names, - 0, - name_idx, - /*rename_self=*/ false, - )?; - Ok(field - .to_owned() - .with_data_type(DataType::Map( - Arc::new(Field::new( - inner.name(), - DataType::Struct(Fields::from(vec![ - renamed_keys, - renamed_values, - ])), - inner.is_nullable(), - )), - *sorted, + DataType::Union(fields, mode) => { + let fields = fields + .iter() + .map(|(i, f)| { + Ok(( + i, + Arc::new(rename_fields_data_type( + f.as_ref().clone(), + dfs_names, + name_idx, + )?), )) - .with_name(name)) - } - _ => substrait_err!("Map fields must contain a Struct with exactly 2 fields"), - }, - _ => Ok(field.to_owned().with_name(name)), + }) + .collect::<datafusion::common::Result<UnionFields>>()?; + Ok(DataType::Union(fields, *mode)) + } + // Explicitly listing the rest (which can not contain inner fields needing renaming) + // to ensure we're exhaustive + DataType::Null + | DataType::Boolean + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Float16 + | DataType::Float32 + | DataType::Float64 + | DataType::Timestamp(_, _) + | DataType::Date32 + | DataType::Date64 + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Duration(_) + | DataType::Interval(_) + | DataType::Binary + | DataType::FixedSizeBinary(_) + | DataType::LargeBinary + | DataType::BinaryView + | DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Utf8View + | DataType::Decimal128(_, _) + | DataType::Decimal256(_, _) => Ok(data_type.clone()), } } @@ -190,13 +259,8 @@ pub(super) fn make_renamed_schema( .iter() .enumerate() .map(|(field_idx, (q, f))| { - let renamed_f = rename_field( - f.as_ref(), - dfs_names, - field_idx, - &mut name_idx, - /*rename_self=*/ true, - )?; + let renamed_f = + rename_field(f.as_ref(), dfs_names, field_idx, &mut name_idx)?; Ok((q.cloned(), renamed_f)) }) .collect::<datafusion::common::Result<Vec<_>>>()? @@ -473,17 +537,29 @@ pub(crate) mod tests { ), ( Some(table_ref.clone()), - Arc::new(Field::new_map( + Arc::new(Field::new_large_list( "7", + Arc::new(Field::new_struct( + "item", + vec![Field::new("8", DataType::Int32, false)], + false, + )), + false, + )), + ), + ( + Some(table_ref.clone()), + Arc::new(Field::new_map( + "9", "entries", Arc::new(Field::new_struct( "keys", - vec![Field::new("8", DataType::Int32, false)], + vec![Field::new("10", DataType::Int32, false)], false, )), Arc::new(Field::new_struct( "values", - vec![Field::new("9", DataType::Int32, false)], + vec![Field::new("11", DataType::Int32, false)], false, )), false, @@ -504,10 +580,12 @@ pub(crate) mod tests { "h".to_string(), "i".to_string(), "j".to_string(), + "k".to_string(), + "l".to_string(), ]; let renamed_schema = make_renamed_schema(&schema, &dfs_names)?; - assert_eq!(renamed_schema.fields().len(), 4); + assert_eq!(renamed_schema.fields().len(), 5); assert_eq!( *renamed_schema.field(0), Field::new("a", DataType::Int32, false) @@ -541,17 +619,29 @@ pub(crate) mod tests { ); assert_eq!( *renamed_schema.field(3), - Field::new_map( + Field::new_large_list( "h", + Arc::new(Field::new_struct( + "item", + vec![Field::new("i", DataType::Int32, false)], + false, + )), + false, + ) + ); + assert_eq!( + *renamed_schema.field(4), + Field::new_map( + "j", "entries", Arc::new(Field::new_struct( "keys", - vec![Field::new("i", DataType::Int32, false)], + vec![Field::new("k", DataType::Int32, false)], false, )), Arc::new(Field::new_struct( "values", - vec![Field::new("j", DataType::Int32, false)], + vec![Field::new("l", DataType::Int32, false)], false, )), false, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org