This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e711f147e4 Generalize struct-to-struct casting with CastOptions and 
SchemaAdapter integration (#17468)
e711f147e4 is described below

commit e711f147e41fdb6c5899cd72bf2f4579011dac13
Author: kosiew <kos...@gmail.com>
AuthorDate: Mon Sep 15 23:18:18 2025 +0800

    Generalize struct-to-struct casting with CastOptions and SchemaAdapter 
integration (#17468)
    
    * feat: enhance struct column casting with options for null handling and 
type adaptation
    
    * feat: reexport `cast_column` from `nested_struct`
    
    * feat: update cast_column function to include CastOptions for enhanced 
type casting
---
 datafusion/common/src/lib.rs                |   1 +
 datafusion/common/src/nested_struct.rs      | 481 +++++++++++++++++++++++++---
 datafusion/datasource/src/schema_adapter.rs |  36 ++-
 3 files changed, 459 insertions(+), 59 deletions(-)

diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index 3a558fa867..e70d1d27fe 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -82,6 +82,7 @@ pub use functional_dependencies::{
 };
 use hashbrown::hash_map::DefaultHashBuilder;
 pub use join_type::{JoinConstraint, JoinSide, JoinType};
+pub use nested_struct::cast_column;
 pub use null_equality::NullEquality;
 pub use param_value::ParamValues;
 pub use scalar::{ScalarType, ScalarValue};
diff --git a/datafusion/common/src/nested_struct.rs 
b/datafusion/common/src/nested_struct.rs
index f349b360f2..6e8a380df9 100644
--- a/datafusion/common/src/nested_struct.rs
+++ b/datafusion/common/src/nested_struct.rs
@@ -15,10 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::error::{DataFusionError, Result, _plan_err};
+use crate::error::{Result, _plan_err};
 use arrow::{
     array::{new_null_array, Array, ArrayRef, StructArray},
-    compute::cast,
+    compute::{cast_with_options, CastOptions},
     datatypes::{DataType::Struct, Field, FieldRef},
 };
 use std::sync::Arc;
@@ -52,36 +52,44 @@ use std::sync::Arc;
 fn cast_struct_column(
     source_col: &ArrayRef,
     target_fields: &[Arc<Field>],
+    cast_options: &CastOptions,
 ) -> Result<ArrayRef> {
-    if let Some(struct_array) = 
source_col.as_any().downcast_ref::<StructArray>() {
-        let mut children: Vec<(Arc<Field>, Arc<dyn Array>)> = Vec::new();
+    if let Some(source_struct) = 
source_col.as_any().downcast_ref::<StructArray>() {
+        validate_struct_compatibility(source_struct.fields(), target_fields)?;
+
+        let mut fields: Vec<Arc<Field>> = 
Vec::with_capacity(target_fields.len());
+        let mut arrays: Vec<ArrayRef> = 
Vec::with_capacity(target_fields.len());
         let num_rows = source_col.len();
 
         for target_child_field in target_fields {
-            let field_arc = Arc::clone(target_child_field);
-            match struct_array.column_by_name(target_child_field.name()) {
+            fields.push(Arc::clone(target_child_field));
+            match source_struct.column_by_name(target_child_field.name()) {
                 Some(source_child_col) => {
                     let adapted_child =
-                        cast_column(source_child_col, target_child_field)?;
-                    children.push((field_arc, adapted_child));
+                        cast_column(source_child_col, target_child_field, 
cast_options)
+                            .map_err(|e| {
+                            e.context(format!(
+                                "While casting struct field '{}'",
+                                target_child_field.name()
+                            ))
+                        })?;
+                    arrays.push(adapted_child);
                 }
                 None => {
-                    children.push((
-                        field_arc,
-                        new_null_array(target_child_field.data_type(), 
num_rows),
-                    ));
+                    arrays.push(new_null_array(target_child_field.data_type(), 
num_rows));
                 }
             }
         }
 
-        let struct_array = StructArray::from(children);
+        let struct_array =
+            StructArray::new(fields.into(), arrays, 
source_struct.nulls().cloned());
         Ok(Arc::new(struct_array))
     } else {
         // Return error if source is not a struct type
-        Err(DataFusionError::Plan(format!(
+        _plan_err!(
             "Cannot cast column of type {:?} to struct type. Source must be a 
struct to cast to struct.",
             source_col.data_type()
-        )))
+        )
     }
 }
 
@@ -94,6 +102,28 @@ fn cast_struct_column(
 /// - **Struct Types**: Delegates to `cast_struct_column` for struct-to-struct 
casting only
 /// - **Non-Struct Types**: Uses Arrow's standard `cast` function for 
primitive type conversions
 ///
+/// ## Cast Options
+/// The `cast_options` argument controls how Arrow handles values that cannot 
be represented
+/// in the target type. When `safe` is `false` (DataFusion's default) the cast 
will return an
+/// error if such a value is encountered. Setting `safe` to `true` instead 
produces `NULL`
+/// for out-of-range or otherwise invalid values. The options also allow 
customizing how
+/// temporal values are formatted when cast to strings.
+///
+/// ```
+/// use std::sync::Arc;
+/// use arrow::array::{Int64Array, ArrayRef};
+/// use arrow::compute::CastOptions;
+/// use arrow::datatypes::{DataType, Field};
+/// use datafusion_common::nested_struct::cast_column;
+///
+/// let source: ArrayRef = Arc::new(Int64Array::from(vec![1, i64::MAX]));
+/// let target = Field::new("ints", DataType::Int32, true);
+/// // Permit lossy conversions by producing NULL on overflow instead of 
erroring
+/// let options = CastOptions { safe: true, ..Default::default() };
+/// let result = cast_column(&source, &target, &options).unwrap();
+/// assert!(result.is_null(1));
+/// ```
+///
 /// ## Struct Casting Requirements
 /// The struct casting logic requires that the source column must already be a 
struct type.
 /// This makes the function useful for:
@@ -104,6 +134,7 @@ fn cast_struct_column(
 /// # Arguments
 /// * `source_col` - The source array to cast
 /// * `target_field` - The target field definition (including type and 
metadata)
+/// * `cast_options` - Options that govern strictness and formatting of the 
cast
 ///
 /// # Returns
 /// A `Result<ArrayRef>` containing the cast array
@@ -114,10 +145,20 @@ fn cast_struct_column(
 /// - Arrow's cast function fails for non-struct types
 /// - Memory allocation fails during struct construction
 /// - Invalid data type combinations are encountered
-pub fn cast_column(source_col: &ArrayRef, target_field: &Field) -> 
Result<ArrayRef> {
+pub fn cast_column(
+    source_col: &ArrayRef,
+    target_field: &Field,
+    cast_options: &CastOptions,
+) -> Result<ArrayRef> {
     match target_field.data_type() {
-        Struct(target_fields) => cast_struct_column(source_col, target_fields),
-        _ => Ok(cast(source_col, target_field.data_type())?),
+        Struct(target_fields) => {
+            cast_struct_column(source_col, target_fields, cast_options)
+        }
+        _ => Ok(cast_with_options(
+            source_col,
+            target_field.data_type(),
+            cast_options,
+        )?),
     }
 }
 
@@ -141,7 +182,7 @@ pub fn cast_column(source_col: &ArrayRef, target_field: 
&Field) -> Result<ArrayR
 /// * `target_fields` - Fields from the target struct type
 ///
 /// # Returns
-/// * `Ok(true)` if the structs are compatible for casting
+/// * `Ok(())` if the structs are compatible for casting
 /// * `Err(DataFusionError)` with detailed error message if incompatible
 ///
 /// # Examples
@@ -149,7 +190,7 @@ pub fn cast_column(source_col: &ArrayRef, target_field: 
&Field) -> Result<ArrayR
 /// // Compatible: source has extra field, target has missing field
 /// // Source: {a: i32, b: string, c: f64}  
 /// // Target: {a: i64, d: bool}
-/// // Result: Ok(true) - 'a' can cast i32->i64, 'b','c' ignored, 'd' filled 
with nulls
+/// // Result: Ok(()) - 'a' can cast i32->i64, 'b','c' ignored, 'd' filled 
with nulls
 ///
 /// // Incompatible: matching field has incompatible types
 /// // Source: {a: string}
@@ -159,7 +200,7 @@ pub fn cast_column(source_col: &ArrayRef, target_field: 
&Field) -> Result<ArrayR
 pub fn validate_struct_compatibility(
     source_fields: &[FieldRef],
     target_fields: &[FieldRef],
-) -> Result<bool> {
+) -> Result<()> {
     // Check compatibility for each target field
     for target_field in target_fields {
         // Look for matching field in source by name
@@ -167,6 +208,15 @@ pub fn validate_struct_compatibility(
             .iter()
             .find(|f| f.name() == target_field.name())
         {
+            // Ensure nullability is compatible. It is invalid to cast a 
nullable
+            // source field to a non-nullable target field as this may discard
+            // null values.
+            if source_field.is_nullable() && !target_field.is_nullable() {
+                return _plan_err!(
+                    "Cannot cast nullable struct field '{}' to non-nullable 
field",
+                    target_field.name()
+                );
+            }
             // Check if the matching field types are compatible
             match (source_field.data_type(), target_field.data_type()) {
                 // Recursively validate nested structs
@@ -193,15 +243,21 @@ pub fn validate_struct_compatibility(
     }
 
     // Extra fields in source are OK - they'll be ignored
-    Ok(true)
+    Ok(())
 }
 
 #[cfg(test)]
 mod tests {
+
     use super::*;
+    use crate::format::DEFAULT_CAST_OPTIONS;
     use arrow::{
-        array::{Int32Array, Int64Array, StringArray},
-        datatypes::{DataType, Field},
+        array::{
+            BinaryArray, Int32Array, Int32Builder, Int64Array, ListArray, 
MapArray,
+            MapBuilder, StringArray, StringBuilder,
+        },
+        buffer::NullBuffer,
+        datatypes::{DataType, Field, FieldRef, Int32Type},
     };
     /// Macro to extract and downcast a column from a StructArray
     macro_rules! get_column_as {
@@ -215,11 +271,35 @@ mod tests {
         };
     }
 
+    fn field(name: &str, data_type: DataType) -> Field {
+        Field::new(name, data_type, true)
+    }
+
+    fn non_null_field(name: &str, data_type: DataType) -> Field {
+        Field::new(name, data_type, false)
+    }
+
+    fn arc_field(name: &str, data_type: DataType) -> FieldRef {
+        Arc::new(field(name, data_type))
+    }
+
+    fn struct_type(fields: Vec<Field>) -> DataType {
+        Struct(fields.into())
+    }
+
+    fn struct_field(name: &str, fields: Vec<Field>) -> Field {
+        field(name, struct_type(fields))
+    }
+
+    fn arc_struct_field(name: &str, fields: Vec<Field>) -> FieldRef {
+        Arc::new(struct_field(name, fields))
+    }
+
     #[test]
     fn test_cast_simple_column() {
         let source = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
-        let target_field = Field::new("ints", DataType::Int64, true);
-        let result = cast_column(&source, &target_field).unwrap();
+        let target_field = field("ints", DataType::Int64);
+        let result = cast_column(&source, &target_field, 
&DEFAULT_CAST_OPTIONS).unwrap();
         let result = result.as_any().downcast_ref::<Int64Array>().unwrap();
         assert_eq!(result.len(), 3);
         assert_eq!(result.value(0), 1);
@@ -227,28 +307,45 @@ mod tests {
         assert_eq!(result.value(2), 3);
     }
 
+    #[test]
+    fn test_cast_column_with_options() {
+        let source = Arc::new(Int64Array::from(vec![1, i64::MAX])) as ArrayRef;
+        let target_field = field("ints", DataType::Int32);
+
+        let safe_opts = CastOptions {
+            // safe: false - return Err for failure
+            safe: false,
+            ..DEFAULT_CAST_OPTIONS
+        };
+        assert!(cast_column(&source, &target_field, &safe_opts).is_err());
+
+        let unsafe_opts = CastOptions {
+            // safe: true - return Null for failure
+            safe: true,
+            ..DEFAULT_CAST_OPTIONS
+        };
+        let result = cast_column(&source, &target_field, 
&unsafe_opts).unwrap();
+        let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
+        assert_eq!(result.value(0), 1);
+        assert!(result.is_null(1));
+    }
+
     #[test]
     fn test_cast_struct_with_missing_field() {
         let a_array = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
         let source_struct = StructArray::from(vec![(
-            Arc::new(Field::new("a", DataType::Int32, true)),
+            arc_field("a", DataType::Int32),
             Arc::clone(&a_array),
         )]);
         let source_col = Arc::new(source_struct) as ArrayRef;
 
-        let target_field = Field::new(
+        let target_field = struct_field(
             "s",
-            Struct(
-                vec![
-                    Arc::new(Field::new("a", DataType::Int32, true)),
-                    Arc::new(Field::new("b", DataType::Utf8, true)),
-                ]
-                .into(),
-            ),
-            true,
+            vec![field("a", DataType::Int32), field("b", DataType::Utf8)],
         );
 
-        let result = cast_column(&source_col, &target_field).unwrap();
+        let result =
+            cast_column(&source_col, &target_field, 
&DEFAULT_CAST_OPTIONS).unwrap();
         let struct_array = 
result.as_any().downcast_ref::<StructArray>().unwrap();
         assert_eq!(struct_array.fields().len(), 2);
         let a_result = get_column_as!(&struct_array, "a", Int32Array);
@@ -264,13 +361,9 @@ mod tests {
     #[test]
     fn test_cast_struct_source_not_struct() {
         let source = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
-        let target_field = Field::new(
-            "s",
-            Struct(vec![Arc::new(Field::new("a", DataType::Int32, 
true))].into()),
-            true,
-        );
+        let target_field = struct_field("s", vec![field("a", 
DataType::Int32)]);
 
-        let result = cast_column(&source, &target_field);
+        let result = cast_column(&source, &target_field, 
&DEFAULT_CAST_OPTIONS);
         assert!(result.is_err());
         let error_msg = result.unwrap_err().to_string();
         assert!(error_msg.contains("Cannot cast column of type"));
@@ -278,16 +371,34 @@ mod tests {
         assert!(error_msg.contains("Source must be a struct"));
     }
 
+    #[test]
+    fn test_cast_struct_incompatible_child_type() {
+        let a_array = Arc::new(BinaryArray::from(vec![
+            Some(b"a".as_ref()),
+            Some(b"b".as_ref()),
+        ])) as ArrayRef;
+        let source_struct =
+            StructArray::from(vec![(arc_field("a", DataType::Binary), 
a_array)]);
+        let source_col = Arc::new(source_struct) as ArrayRef;
+
+        let target_field = struct_field("s", vec![field("a", 
DataType::Int32)]);
+
+        let result = cast_column(&source_col, &target_field, 
&DEFAULT_CAST_OPTIONS);
+        assert!(result.is_err());
+        let error_msg = result.unwrap_err().to_string();
+        assert!(error_msg.contains("Cannot cast struct field 'a'"));
+    }
+
     #[test]
     fn test_validate_struct_compatibility_incompatible_types() {
         // Source struct: {field1: Binary, field2: String}
         let source_fields = vec![
-            Arc::new(Field::new("field1", DataType::Binary, true)),
-            Arc::new(Field::new("field2", DataType::Utf8, true)),
+            arc_field("field1", DataType::Binary),
+            arc_field("field2", DataType::Utf8),
         ];
 
         // Target struct: {field1: Int32}
-        let target_fields = vec![Arc::new(Field::new("field1", 
DataType::Int32, true))];
+        let target_fields = vec![arc_field("field1", DataType::Int32)];
 
         let result = validate_struct_compatibility(&source_fields, 
&target_fields);
         assert!(result.is_err());
@@ -301,29 +412,293 @@ mod tests {
     fn test_validate_struct_compatibility_compatible_types() {
         // Source struct: {field1: Int32, field2: String}
         let source_fields = vec![
-            Arc::new(Field::new("field1", DataType::Int32, true)),
-            Arc::new(Field::new("field2", DataType::Utf8, true)),
+            arc_field("field1", DataType::Int32),
+            arc_field("field2", DataType::Utf8),
         ];
 
         // Target struct: {field1: Int64} (Int32 can cast to Int64)
-        let target_fields = vec![Arc::new(Field::new("field1", 
DataType::Int64, true))];
+        let target_fields = vec![arc_field("field1", DataType::Int64)];
 
         let result = validate_struct_compatibility(&source_fields, 
&target_fields);
         assert!(result.is_ok());
-        assert!(result.unwrap());
     }
 
     #[test]
     fn test_validate_struct_compatibility_missing_field_in_source() {
         // Source struct: {field2: String} (missing field1)
-        let source_fields = vec![Arc::new(Field::new("field2", DataType::Utf8, 
true))];
+        let source_fields = vec![arc_field("field2", DataType::Utf8)];
 
         // Target struct: {field1: Int32}
-        let target_fields = vec![Arc::new(Field::new("field1", 
DataType::Int32, true))];
+        let target_fields = vec![arc_field("field1", DataType::Int32)];
 
         // Should be OK - missing fields will be filled with nulls
         let result = validate_struct_compatibility(&source_fields, 
&target_fields);
         assert!(result.is_ok());
-        assert!(result.unwrap());
+    }
+
+    #[test]
+    fn test_validate_struct_compatibility_additional_field_in_source() {
+        // Source struct: {field1: Int32, field2: String} (extra field2)
+        let source_fields = vec![
+            arc_field("field1", DataType::Int32),
+            arc_field("field2", DataType::Utf8),
+        ];
+
+        // Target struct: {field1: Int32}
+        let target_fields = vec![arc_field("field1", DataType::Int32)];
+
+        // Should be OK - extra fields in source are ignored
+        let result = validate_struct_compatibility(&source_fields, 
&target_fields);
+        assert!(result.is_ok());
+    }
+
+    #[test]
+    fn test_cast_struct_parent_nulls_retained() {
+        let a_array = Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as 
ArrayRef;
+        let fields = vec![arc_field("a", DataType::Int32)];
+        let nulls = Some(NullBuffer::from(vec![true, false]));
+        let source_struct = StructArray::new(fields.clone().into(), 
vec![a_array], nulls);
+        let source_col = Arc::new(source_struct) as ArrayRef;
+
+        let target_field = struct_field("s", vec![field("a", 
DataType::Int64)]);
+
+        let result =
+            cast_column(&source_col, &target_field, 
&DEFAULT_CAST_OPTIONS).unwrap();
+        let struct_array = 
result.as_any().downcast_ref::<StructArray>().unwrap();
+        assert_eq!(struct_array.null_count(), 1);
+        assert!(struct_array.is_valid(0));
+        assert!(struct_array.is_null(1));
+
+        let a_result = get_column_as!(&struct_array, "a", Int64Array);
+        assert_eq!(a_result.value(0), 1);
+        assert_eq!(a_result.value(1), 2);
+    }
+
+    #[test]
+    fn test_validate_struct_compatibility_nullable_to_non_nullable() {
+        // Source struct: {field1: Int32 nullable}
+        let source_fields = vec![arc_field("field1", DataType::Int32)];
+
+        // Target struct: {field1: Int32 non-nullable}
+        let target_fields = vec![Arc::new(non_null_field("field1", 
DataType::Int32))];
+
+        let result = validate_struct_compatibility(&source_fields, 
&target_fields);
+        assert!(result.is_err());
+        let error_msg = result.unwrap_err().to_string();
+        assert!(error_msg.contains("field1"));
+        assert!(error_msg.contains("non-nullable"));
+    }
+
+    #[test]
+    fn test_validate_struct_compatibility_non_nullable_to_nullable() {
+        // Source struct: {field1: Int32 non-nullable}
+        let source_fields = vec![Arc::new(non_null_field("field1", 
DataType::Int32))];
+
+        // Target struct: {field1: Int32 nullable}
+        let target_fields = vec![arc_field("field1", DataType::Int32)];
+
+        let result = validate_struct_compatibility(&source_fields, 
&target_fields);
+        assert!(result.is_ok());
+    }
+
+    #[test]
+    fn test_validate_struct_compatibility_nested_nullable_to_non_nullable() {
+        // Source struct: {field1: {nested: Int32 nullable}}
+        let source_fields = vec![Arc::new(non_null_field(
+            "field1",
+            struct_type(vec![field("nested", DataType::Int32)]),
+        ))];
+
+        // Target struct: {field1: {nested: Int32 non-nullable}}
+        let target_fields = vec![Arc::new(non_null_field(
+            "field1",
+            struct_type(vec![non_null_field("nested", DataType::Int32)]),
+        ))];
+
+        let result = validate_struct_compatibility(&source_fields, 
&target_fields);
+        assert!(result.is_err());
+        let error_msg = result.unwrap_err().to_string();
+        assert!(error_msg.contains("nested"));
+        assert!(error_msg.contains("non-nullable"));
+    }
+
+    #[test]
+    fn test_cast_nested_struct_with_extra_and_missing_fields() {
+        // Source inner struct has fields a, b, extra
+        let a = Arc::new(Int32Array::from(vec![Some(1), None])) as ArrayRef;
+        let b = Arc::new(Int32Array::from(vec![Some(2), Some(3)])) as ArrayRef;
+        let extra = Arc::new(Int32Array::from(vec![Some(9), Some(10)])) as 
ArrayRef;
+
+        let inner = StructArray::from(vec![
+            (arc_field("a", DataType::Int32), a),
+            (arc_field("b", DataType::Int32), b),
+            (arc_field("extra", DataType::Int32), extra),
+        ]);
+
+        let source_struct = StructArray::from(vec![(
+            arc_struct_field(
+                "inner",
+                vec![
+                    field("a", DataType::Int32),
+                    field("b", DataType::Int32),
+                    field("extra", DataType::Int32),
+                ],
+            ),
+            Arc::new(inner) as ArrayRef,
+        )]);
+        let source_col = Arc::new(source_struct) as ArrayRef;
+
+        // Target inner struct reorders fields, adds "missing", and drops 
"extra"
+        let target_field = struct_field(
+            "outer",
+            vec![struct_field(
+                "inner",
+                vec![
+                    field("b", DataType::Int64),
+                    field("a", DataType::Int32),
+                    field("missing", DataType::Int32),
+                ],
+            )],
+        );
+
+        let result =
+            cast_column(&source_col, &target_field, 
&DEFAULT_CAST_OPTIONS).unwrap();
+        let outer = result.as_any().downcast_ref::<StructArray>().unwrap();
+        let inner = get_column_as!(&outer, "inner", StructArray);
+        assert_eq!(inner.fields().len(), 3);
+
+        let b = get_column_as!(inner, "b", Int64Array);
+        assert_eq!(b.value(0), 2);
+        assert_eq!(b.value(1), 3);
+        assert!(!b.is_null(0));
+        assert!(!b.is_null(1));
+
+        let a = get_column_as!(inner, "a", Int32Array);
+        assert_eq!(a.value(0), 1);
+        assert!(a.is_null(1));
+
+        let missing = get_column_as!(inner, "missing", Int32Array);
+        assert!(missing.is_null(0));
+        assert!(missing.is_null(1));
+    }
+
+    #[test]
+    fn test_cast_struct_with_array_and_map_fields() {
+        // Array field with second row null
+        let arr_array = Arc::new(ListArray::from_iter_primitive::<Int32Type, 
_, _>(vec![
+            Some(vec![Some(1), Some(2)]),
+            None,
+        ])) as ArrayRef;
+
+        // Map field with second row null
+        let string_builder = StringBuilder::new();
+        let int_builder = Int32Builder::new();
+        let mut map_builder = MapBuilder::new(None, string_builder, 
int_builder);
+        map_builder.keys().append_value("a");
+        map_builder.values().append_value(1);
+        map_builder.append(true).unwrap();
+        map_builder.append(false).unwrap();
+        let map_array = Arc::new(map_builder.finish()) as ArrayRef;
+
+        let source_struct = StructArray::from(vec![
+            (
+                arc_field(
+                    "arr",
+                    DataType::List(Arc::new(field("item", DataType::Int32))),
+                ),
+                arr_array,
+            ),
+            (
+                arc_field(
+                    "map",
+                    DataType::Map(
+                        Arc::new(non_null_field(
+                            "entries",
+                            struct_type(vec![
+                                non_null_field("keys", DataType::Utf8),
+                                field("values", DataType::Int32),
+                            ]),
+                        )),
+                        false,
+                    ),
+                ),
+                map_array,
+            ),
+        ]);
+        let source_col = Arc::new(source_struct) as ArrayRef;
+
+        let target_field = struct_field(
+            "s",
+            vec![
+                field(
+                    "arr",
+                    DataType::List(Arc::new(field("item", DataType::Int32))),
+                ),
+                field(
+                    "map",
+                    DataType::Map(
+                        Arc::new(non_null_field(
+                            "entries",
+                            struct_type(vec![
+                                non_null_field("keys", DataType::Utf8),
+                                field("values", DataType::Int32),
+                            ]),
+                        )),
+                        false,
+                    ),
+                ),
+            ],
+        );
+
+        let result =
+            cast_column(&source_col, &target_field, 
&DEFAULT_CAST_OPTIONS).unwrap();
+        let struct_array = 
result.as_any().downcast_ref::<StructArray>().unwrap();
+
+        let arr = get_column_as!(&struct_array, "arr", ListArray);
+        assert!(!arr.is_null(0));
+        assert!(arr.is_null(1));
+        let arr0 = arr.value(0);
+        let values = arr0.as_any().downcast_ref::<Int32Array>().unwrap();
+        assert_eq!(values.value(0), 1);
+        assert_eq!(values.value(1), 2);
+
+        let map = get_column_as!(&struct_array, "map", MapArray);
+        assert!(!map.is_null(0));
+        assert!(map.is_null(1));
+        let map0 = map.value(0);
+        let entries = map0.as_any().downcast_ref::<StructArray>().unwrap();
+        let keys = get_column_as!(entries, "keys", StringArray);
+        let vals = get_column_as!(entries, "values", Int32Array);
+        assert_eq!(keys.value(0), "a");
+        assert_eq!(vals.value(0), 1);
+    }
+
+    #[test]
+    fn test_cast_struct_field_order_differs() {
+        let a = Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrayRef;
+        let b = Arc::new(Int32Array::from(vec![Some(3), None])) as ArrayRef;
+
+        let source_struct = StructArray::from(vec![
+            (arc_field("a", DataType::Int32), a),
+            (arc_field("b", DataType::Int32), b),
+        ]);
+        let source_col = Arc::new(source_struct) as ArrayRef;
+
+        let target_field = struct_field(
+            "s",
+            vec![field("b", DataType::Int64), field("a", DataType::Int32)],
+        );
+
+        let result =
+            cast_column(&source_col, &target_field, 
&DEFAULT_CAST_OPTIONS).unwrap();
+        let struct_array = 
result.as_any().downcast_ref::<StructArray>().unwrap();
+
+        let b_col = get_column_as!(&struct_array, "b", Int64Array);
+        assert_eq!(b_col.value(0), 3);
+        assert!(b_col.is_null(1));
+
+        let a_col = get_column_as!(&struct_array, "a", Int32Array);
+        assert_eq!(a_col.value(0), 1);
+        assert_eq!(a_col.value(1), 2);
     }
 }
diff --git a/datafusion/datasource/src/schema_adapter.rs 
b/datafusion/datasource/src/schema_adapter.rs
index 16de00500b..bd5833bb78 100644
--- a/datafusion/datasource/src/schema_adapter.rs
+++ b/datafusion/datasource/src/schema_adapter.rs
@@ -26,14 +26,20 @@ use arrow::{
     datatypes::{DataType, Field, Schema, SchemaRef},
 };
 use datafusion_common::{
+    format::DEFAULT_CAST_OPTIONS,
     nested_struct::{cast_column, validate_struct_compatibility},
     plan_err, ColumnStatistics,
 };
 use std::{fmt::Debug, sync::Arc};
 /// Function used by [`SchemaMapping`] to adapt a column from the file schema 
to
 /// the table schema.
-pub type CastColumnFn =
-    dyn Fn(&ArrayRef, &Field) -> datafusion_common::Result<ArrayRef> + Send + 
Sync;
+pub type CastColumnFn = dyn Fn(
+        &ArrayRef,
+        &Field,
+        &arrow::compute::CastOptions,
+    ) -> datafusion_common::Result<ArrayRef>
+    + Send
+    + Sync;
 
 /// Factory for creating [`SchemaAdapter`]
 ///
@@ -252,7 +258,9 @@ pub(crate) fn can_cast_field(
 ) -> datafusion_common::Result<bool> {
     match (file_field.data_type(), table_field.data_type()) {
         (DataType::Struct(source_fields), DataType::Struct(target_fields)) => {
-            validate_struct_compatibility(source_fields, target_fields)
+            // validate_struct_compatibility returns Result<()>; on success we 
can cast structs
+            validate_struct_compatibility(source_fields, target_fields)?;
+            Ok(true)
         }
         _ => {
             if can_cast_types(file_field.data_type(), table_field.data_type()) 
{
@@ -302,7 +310,13 @@ impl SchemaAdapter for DefaultSchemaAdapter {
             Arc::new(SchemaMapping::new(
                 Arc::clone(&self.projected_table_schema),
                 field_mappings,
-                Arc::new(|array: &ArrayRef, field: &Field| cast_column(array, 
field)),
+                Arc::new(
+                    |array: &ArrayRef,
+                     field: &Field,
+                     opts: &arrow::compute::CastOptions| {
+                        cast_column(array, field, opts)
+                    },
+                ),
             )),
             projection,
         ))
@@ -414,7 +428,13 @@ impl SchemaMapper for SchemaMapping {
                     || Ok(new_null_array(field.data_type(), batch_rows)),
                     // However, if it does exist in both, use the cast_column 
function
                     // to perform any necessary conversions
-                    |batch_idx| (self.cast_column)(&batch_cols[batch_idx], 
field),
+                    |batch_idx| {
+                        (self.cast_column)(
+                            &batch_cols[batch_idx],
+                            field,
+                            &DEFAULT_CAST_OPTIONS,
+                        )
+                    },
                 )
             })
             .collect::<datafusion_common::Result<Vec<_>, _>>()?;
@@ -641,7 +661,11 @@ mod tests {
         let mapping = SchemaMapping::new(
             Arc::clone(&projected_schema),
             field_mappings.clone(),
-            Arc::new(|array: &ArrayRef, field: &Field| cast_column(array, 
field)),
+            Arc::new(
+                |array: &ArrayRef, field: &Field, opts: 
&arrow::compute::CastOptions| {
+                    cast_column(array, field, opts)
+                },
+            ),
         );
 
         // Check that fields were set correctly


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to