This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4fa823219e chore: refactor Substrait consumer's "rename_field" and 
implement the rest of types (#16345)
4fa823219e is described below

commit 4fa823219e32ca94a4878a387691a325f79e66d3
Author: Arttu <blizz...@users.noreply.github.com>
AuthorDate: Thu Jun 12 05:32:04 2025 +0100

    chore: refactor Substrait consumer's "rename_field" and implement the rest 
of types (#16345)
    
    * refactor Substrait consumer's "rename_field" and implement the rest of 
types
    
    The rename_field is a bit confusing with it's "rename_self" parameter. Also 
there are times when one wants to just rename a data type.
    And I'd like to reuse the same code within our codebase, since with 
Substrait the case of renaming a field/type comes up a bunch, so I'd like to 
make these functions "pub".
    
    And lastly, this adds support for all list types, as well as dict/ree/union 
types. Dunno how necessary those are, but seems right to support them still.
    
    * datatype => data_type
    
    * fix clippy
---
 .../src/logical_plan/consumer/expr/mod.rs          |   5 +-
 .../substrait/src/logical_plan/consumer/utils.rs   | 268 ++++++++++++++-------
 2 files changed, 181 insertions(+), 92 deletions(-)

diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/mod.rs 
b/datafusion/substrait/src/logical_plan/consumer/expr/mod.rs
index b3ec2e3781..d701827671 100644
--- a/datafusion/substrait/src/logical_plan/consumer/expr/mod.rs
+++ b/datafusion/substrait/src/logical_plan/consumer/expr/mod.rs
@@ -38,9 +38,9 @@ pub use subquery::*;
 pub use window_function::*;
 
 use crate::extensions::Extensions;
-use crate::logical_plan::consumer::utils::rename_field;
 use crate::logical_plan::consumer::{
-    from_substrait_named_struct, DefaultSubstraitConsumer, SubstraitConsumer,
+    from_substrait_named_struct, rename_field, DefaultSubstraitConsumer,
+    SubstraitConsumer,
 };
 use datafusion::arrow::datatypes::Field;
 use datafusion::common::{not_impl_err, plan_err, substrait_err, DFSchema, 
DFSchemaRef};
@@ -152,7 +152,6 @@ pub async fn from_substrait_extended_expr(
             &substrait_expr.output_names,
             expr_idx,
             &mut names_idx,
-            /*rename_self=*/ true,
         )?;
         exprs.push((expr, output_field));
     }
diff --git a/datafusion/substrait/src/logical_plan/consumer/utils.rs 
b/datafusion/substrait/src/logical_plan/consumer/utils.rs
index a267971ff8..396c5e673f 100644
--- a/datafusion/substrait/src/logical_plan/consumer/utils.rs
+++ b/datafusion/substrait/src/logical_plan/consumer/utils.rs
@@ -16,10 +16,10 @@
 // under the License.
 
 use crate::logical_plan::consumer::SubstraitConsumer;
-use datafusion::arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema};
+use datafusion::arrow::datatypes::{DataType, Field, Schema, UnionFields};
 use datafusion::common::{
-    not_impl_err, substrait_datafusion_err, substrait_err, DFSchema, 
DFSchemaRef,
-    TableReference,
+    exec_err, not_impl_err, substrait_datafusion_err, substrait_err, DFSchema,
+    DFSchemaRef, TableReference,
 };
 use datafusion::logical_expr::expr::Sort;
 use datafusion::logical_expr::{Cast, Expr, ExprSchemable, LogicalPlanBuilder};
@@ -81,98 +81,167 @@ pub(super) fn next_struct_field_name(
     }
 }
 
-pub(super) fn rename_field(
+/// Traverse through the field, renaming the provided field itself and all its 
inner struct fields.
+pub fn rename_field(
     field: &Field,
     dfs_names: &Vec<String>,
     unnamed_field_suffix: usize, // If Substrait doesn't provide a name, we'll 
use this "c{unnamed_field_suffix}"
     name_idx: &mut usize,        // Index into dfs_names
-    rename_self: bool, // Some fields (e.g. list items) don't have names in 
Substrait and this will be false to keep old name
 ) -> datafusion::common::Result<Field> {
-    let name = if rename_self {
-        next_struct_field_name(unnamed_field_suffix, dfs_names, name_idx)?
-    } else {
-        field.name().to_string()
-    };
-    match field.data_type() {
+    let name = next_struct_field_name(unnamed_field_suffix, dfs_names, 
name_idx)?;
+    rename_fields_data_type(field.clone().with_name(name), dfs_names, name_idx)
+}
+
+/// Rename the field's data type but not the field itself.
+pub fn rename_fields_data_type(
+    field: Field,
+    dfs_names: &Vec<String>,
+    name_idx: &mut usize, // Index into dfs_names
+) -> datafusion::common::Result<Field> {
+    let dt = rename_data_type(field.data_type(), dfs_names, name_idx)?;
+    Ok(field.with_data_type(dt))
+}
+
+/// Traverse through the data type (incl. lists/maps/etc), renaming all inner 
struct fields.
+pub fn rename_data_type(
+    data_type: &DataType,
+    dfs_names: &Vec<String>,
+    name_idx: &mut usize, // Index into dfs_names
+) -> datafusion::common::Result<DataType> {
+    match data_type {
         DataType::Struct(children) => {
             let children = children
                 .iter()
                 .enumerate()
-                .map(|(child_idx, f)| {
-                    rename_field(
-                        f.as_ref(),
-                        dfs_names,
-                        child_idx,
-                        name_idx,
-                        /*rename_self=*/ true,
-                    )
+                .map(|(field_idx, f)| {
+                    rename_field(f.as_ref(), dfs_names, field_idx, name_idx)
                 })
                 .collect::<datafusion::common::Result<_>>()?;
-            Ok(field
-                .to_owned()
-                .with_name(name)
-                .with_data_type(DataType::Struct(children)))
+            Ok(DataType::Struct(children))
         }
-        DataType::List(inner) => {
-            let renamed_inner = rename_field(
-                inner.as_ref(),
+        DataType::List(inner) => 
Ok(DataType::List(Arc::new(rename_fields_data_type(
+            inner.as_ref().to_owned(),
+            dfs_names,
+            name_idx,
+        )?))),
+        DataType::LargeList(inner) => Ok(DataType::LargeList(Arc::new(
+            rename_fields_data_type(inner.as_ref().to_owned(), dfs_names, 
name_idx)?,
+        ))),
+        DataType::ListView(inner) => Ok(DataType::ListView(Arc::new(
+            rename_fields_data_type(inner.as_ref().to_owned(), dfs_names, 
name_idx)?,
+        ))),
+        DataType::LargeListView(inner) => Ok(DataType::LargeListView(Arc::new(
+            rename_fields_data_type(inner.as_ref().to_owned(), dfs_names, 
name_idx)?,
+        ))),
+        DataType::FixedSizeList(inner, len) => Ok(DataType::FixedSizeList(
+            Arc::new(rename_fields_data_type(
+                inner.as_ref().to_owned(),
                 dfs_names,
-                0,
                 name_idx,
-                /*rename_self=*/ false,
-            )?;
-            Ok(field
-                .to_owned()
-                .with_data_type(DataType::List(FieldRef::new(renamed_inner)))
-                .with_name(name))
+            )?),
+            *len,
+        )),
+        DataType::Map(entries, sorted) => {
+            let entries_data_type = match entries.data_type() {
+                DataType::Struct(fields) => {
+                    // This should be two fields, normally "key" and "value", 
but not guaranteed
+                    let fields = fields
+                        .iter()
+                        .map(|f| {
+                            rename_fields_data_type(
+                                f.as_ref().to_owned(),
+                                dfs_names,
+                                name_idx,
+                            )
+                        })
+                        .collect::<datafusion::common::Result<_>>()?;
+                    Ok(DataType::Struct(fields))
+                }
+                _ => exec_err!("Expected map type to contain an inner struct 
type"),
+            }?;
+            Ok(DataType::Map(
+                Arc::new(
+                    entries
+                        .as_ref()
+                        .to_owned()
+                        .with_data_type(entries_data_type),
+                ),
+                *sorted,
+            ))
         }
-        DataType::LargeList(inner) => {
-            let renamed_inner = rename_field(
-                inner.as_ref(),
+        DataType::Dictionary(key_type, value_type) => {
+            // Dicts probably shouldn't contain structs, but support them just 
in case one does
+            Ok(DataType::Dictionary(
+                Box::new(rename_data_type(key_type, dfs_names, name_idx)?),
+                Box::new(rename_data_type(value_type, dfs_names, name_idx)?),
+            ))
+        }
+        DataType::RunEndEncoded(run_ends_field, values_field) => {
+            // At least the run_ends_field shouldn't contain names (since it 
should be i16/i32/i64),
+            // but we'll try renaming its datatype just in case.
+            let run_ends_field = rename_fields_data_type(
+                run_ends_field.as_ref().clone(),
+                dfs_names,
+                name_idx,
+            )?;
+            let values_field = rename_fields_data_type(
+                values_field.as_ref().clone(),
                 dfs_names,
-                0,
                 name_idx,
-                /*rename_self= */ false,
             )?;
-            Ok(field
-                .to_owned()
-                
.with_data_type(DataType::LargeList(FieldRef::new(renamed_inner)))
-                .with_name(name))
+
+            Ok(DataType::RunEndEncoded(
+                Arc::new(run_ends_field),
+                Arc::new(values_field),
+            ))
         }
-        DataType::Map(inner, sorted) => match inner.data_type() {
-            DataType::Struct(key_and_value) if key_and_value.len() == 2 => {
-                let renamed_keys = rename_field(
-                    key_and_value[0].as_ref(),
-                    dfs_names,
-                    0,
-                    name_idx,
-                    /*rename_self=*/ false,
-                )?;
-                let renamed_values = rename_field(
-                    key_and_value[1].as_ref(),
-                    dfs_names,
-                    0,
-                    name_idx,
-                    /*rename_self=*/ false,
-                )?;
-                Ok(field
-                    .to_owned()
-                    .with_data_type(DataType::Map(
-                        Arc::new(Field::new(
-                            inner.name(),
-                            DataType::Struct(Fields::from(vec![
-                                renamed_keys,
-                                renamed_values,
-                            ])),
-                            inner.is_nullable(),
-                        )),
-                        *sorted,
+        DataType::Union(fields, mode) => {
+            let fields = fields
+                .iter()
+                .map(|(i, f)| {
+                    Ok((
+                        i,
+                        Arc::new(rename_fields_data_type(
+                            f.as_ref().clone(),
+                            dfs_names,
+                            name_idx,
+                        )?),
                     ))
-                    .with_name(name))
-            }
-            _ => substrait_err!("Map fields must contain a Struct with exactly 
2 fields"),
-        },
-        _ => Ok(field.to_owned().with_name(name)),
+                })
+                .collect::<datafusion::common::Result<UnionFields>>()?;
+            Ok(DataType::Union(fields, *mode))
+        }
+        // Explicitly listing the rest (which can not contain inner fields 
needing renaming)
+        // to ensure we're exhaustive
+        DataType::Null
+        | DataType::Boolean
+        | DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float16
+        | DataType::Float32
+        | DataType::Float64
+        | DataType::Timestamp(_, _)
+        | DataType::Date32
+        | DataType::Date64
+        | DataType::Time32(_)
+        | DataType::Time64(_)
+        | DataType::Duration(_)
+        | DataType::Interval(_)
+        | DataType::Binary
+        | DataType::FixedSizeBinary(_)
+        | DataType::LargeBinary
+        | DataType::BinaryView
+        | DataType::Utf8
+        | DataType::LargeUtf8
+        | DataType::Utf8View
+        | DataType::Decimal128(_, _)
+        | DataType::Decimal256(_, _) => Ok(data_type.clone()),
     }
 }
 
@@ -190,13 +259,8 @@ pub(super) fn make_renamed_schema(
         .iter()
         .enumerate()
         .map(|(field_idx, (q, f))| {
-            let renamed_f = rename_field(
-                f.as_ref(),
-                dfs_names,
-                field_idx,
-                &mut name_idx,
-                /*rename_self=*/ true,
-            )?;
+            let renamed_f =
+                rename_field(f.as_ref(), dfs_names, field_idx, &mut name_idx)?;
             Ok((q.cloned(), renamed_f))
         })
         .collect::<datafusion::common::Result<Vec<_>>>()?
@@ -473,17 +537,29 @@ pub(crate) mod tests {
             ),
             (
                 Some(table_ref.clone()),
-                Arc::new(Field::new_map(
+                Arc::new(Field::new_large_list(
                     "7",
+                    Arc::new(Field::new_struct(
+                        "item",
+                        vec![Field::new("8", DataType::Int32, false)],
+                        false,
+                    )),
+                    false,
+                )),
+            ),
+            (
+                Some(table_ref.clone()),
+                Arc::new(Field::new_map(
+                    "9",
                     "entries",
                     Arc::new(Field::new_struct(
                         "keys",
-                        vec![Field::new("8", DataType::Int32, false)],
+                        vec![Field::new("10", DataType::Int32, false)],
                         false,
                     )),
                     Arc::new(Field::new_struct(
                         "values",
-                        vec![Field::new("9", DataType::Int32, false)],
+                        vec![Field::new("11", DataType::Int32, false)],
                         false,
                     )),
                     false,
@@ -504,10 +580,12 @@ pub(crate) mod tests {
             "h".to_string(),
             "i".to_string(),
             "j".to_string(),
+            "k".to_string(),
+            "l".to_string(),
         ];
         let renamed_schema = make_renamed_schema(&schema, &dfs_names)?;
 
-        assert_eq!(renamed_schema.fields().len(), 4);
+        assert_eq!(renamed_schema.fields().len(), 5);
         assert_eq!(
             *renamed_schema.field(0),
             Field::new("a", DataType::Int32, false)
@@ -541,17 +619,29 @@ pub(crate) mod tests {
         );
         assert_eq!(
             *renamed_schema.field(3),
-            Field::new_map(
+            Field::new_large_list(
                 "h",
+                Arc::new(Field::new_struct(
+                    "item",
+                    vec![Field::new("i", DataType::Int32, false)],
+                    false,
+                )),
+                false,
+            )
+        );
+        assert_eq!(
+            *renamed_schema.field(4),
+            Field::new_map(
+                "j",
                 "entries",
                 Arc::new(Field::new_struct(
                     "keys",
-                    vec![Field::new("i", DataType::Int32, false)],
+                    vec![Field::new("k", DataType::Int32, false)],
                     false,
                 )),
                 Arc::new(Field::new_struct(
                     "values",
-                    vec![Field::new("j", DataType::Int32, false)],
+                    vec![Field::new("l", DataType::Int32, false)],
                     false,
                 )),
                 false,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to