This is an automated email from the ASF dual-hosted git repository. comphead pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new e711f147e4 Generalize struct-to-struct casting with CastOptions and SchemaAdapter integration (#17468) e711f147e4 is described below commit e711f147e41fdb6c5899cd72bf2f4579011dac13 Author: kosiew <kos...@gmail.com> AuthorDate: Mon Sep 15 23:18:18 2025 +0800 Generalize struct-to-struct casting with CastOptions and SchemaAdapter integration (#17468) * feat: enhance struct column casting with options for null handling and type adaptation * feat: reexport `cast_column` from `nested_struct` * feat: update cast_column function to include CastOptions for enhanced type casting --- datafusion/common/src/lib.rs | 1 + datafusion/common/src/nested_struct.rs | 481 +++++++++++++++++++++++++--- datafusion/datasource/src/schema_adapter.rs | 36 ++- 3 files changed, 459 insertions(+), 59 deletions(-) diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 3a558fa867..e70d1d27fe 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -82,6 +82,7 @@ pub use functional_dependencies::{ }; use hashbrown::hash_map::DefaultHashBuilder; pub use join_type::{JoinConstraint, JoinSide, JoinType}; +pub use nested_struct::cast_column; pub use null_equality::NullEquality; pub use param_value::ParamValues; pub use scalar::{ScalarType, ScalarValue}; diff --git a/datafusion/common/src/nested_struct.rs b/datafusion/common/src/nested_struct.rs index f349b360f2..6e8a380df9 100644 --- a/datafusion/common/src/nested_struct.rs +++ b/datafusion/common/src/nested_struct.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::error::{DataFusionError, Result, _plan_err}; +use crate::error::{Result, _plan_err}; use arrow::{ array::{new_null_array, Array, ArrayRef, StructArray}, - compute::cast, + compute::{cast_with_options, CastOptions}, datatypes::{DataType::Struct, Field, FieldRef}, }; use std::sync::Arc; @@ -52,36 +52,44 @@ use std::sync::Arc; fn cast_struct_column( source_col: &ArrayRef, target_fields: &[Arc<Field>], + cast_options: &CastOptions, ) -> Result<ArrayRef> { - if let Some(struct_array) = source_col.as_any().downcast_ref::<StructArray>() { - let mut children: Vec<(Arc<Field>, Arc<dyn Array>)> = Vec::new(); + if let Some(source_struct) = source_col.as_any().downcast_ref::<StructArray>() { + validate_struct_compatibility(source_struct.fields(), target_fields)?; + + let mut fields: Vec<Arc<Field>> = Vec::with_capacity(target_fields.len()); + let mut arrays: Vec<ArrayRef> = Vec::with_capacity(target_fields.len()); let num_rows = source_col.len(); for target_child_field in target_fields { - let field_arc = Arc::clone(target_child_field); - match struct_array.column_by_name(target_child_field.name()) { + fields.push(Arc::clone(target_child_field)); + match source_struct.column_by_name(target_child_field.name()) { Some(source_child_col) => { let adapted_child = - cast_column(source_child_col, target_child_field)?; - children.push((field_arc, adapted_child)); + cast_column(source_child_col, target_child_field, cast_options) + .map_err(|e| { + e.context(format!( + "While casting struct field '{}'", + target_child_field.name() + )) + })?; + arrays.push(adapted_child); } None => { - children.push(( - field_arc, - new_null_array(target_child_field.data_type(), num_rows), - )); + arrays.push(new_null_array(target_child_field.data_type(), num_rows)); } } } - let struct_array = StructArray::from(children); + let struct_array = + StructArray::new(fields.into(), arrays, source_struct.nulls().cloned()); Ok(Arc::new(struct_array)) } else { // Return error if source is not a struct type - Err(DataFusionError::Plan(format!( + _plan_err!( "Cannot cast column of type {:?} to struct type. Source must be a struct to cast to struct.", source_col.data_type() - ))) + ) } } @@ -94,6 +102,28 @@ fn cast_struct_column( /// - **Struct Types**: Delegates to `cast_struct_column` for struct-to-struct casting only /// - **Non-Struct Types**: Uses Arrow's standard `cast` function for primitive type conversions /// +/// ## Cast Options +/// The `cast_options` argument controls how Arrow handles values that cannot be represented +/// in the target type. When `safe` is `false` (DataFusion's default) the cast will return an +/// error if such a value is encountered. Setting `safe` to `true` instead produces `NULL` +/// for out-of-range or otherwise invalid values. The options also allow customizing how +/// temporal values are formatted when cast to strings. +/// +/// ``` +/// use std::sync::Arc; +/// use arrow::array::{Int64Array, ArrayRef}; +/// use arrow::compute::CastOptions; +/// use arrow::datatypes::{DataType, Field}; +/// use datafusion_common::nested_struct::cast_column; +/// +/// let source: ArrayRef = Arc::new(Int64Array::from(vec![1, i64::MAX])); +/// let target = Field::new("ints", DataType::Int32, true); +/// // Permit lossy conversions by producing NULL on overflow instead of erroring +/// let options = CastOptions { safe: true, ..Default::default() }; +/// let result = cast_column(&source, &target, &options).unwrap(); +/// assert!(result.is_null(1)); +/// ``` +/// /// ## Struct Casting Requirements /// The struct casting logic requires that the source column must already be a struct type. /// This makes the function useful for: @@ -104,6 +134,7 @@ fn cast_struct_column( /// # Arguments /// * `source_col` - The source array to cast /// * `target_field` - The target field definition (including type and metadata) +/// * `cast_options` - Options that govern strictness and formatting of the cast /// /// # Returns /// A `Result<ArrayRef>` containing the cast array @@ -114,10 +145,20 @@ fn cast_struct_column( /// - Arrow's cast function fails for non-struct types /// - Memory allocation fails during struct construction /// - Invalid data type combinations are encountered -pub fn cast_column(source_col: &ArrayRef, target_field: &Field) -> Result<ArrayRef> { +pub fn cast_column( + source_col: &ArrayRef, + target_field: &Field, + cast_options: &CastOptions, +) -> Result<ArrayRef> { match target_field.data_type() { - Struct(target_fields) => cast_struct_column(source_col, target_fields), - _ => Ok(cast(source_col, target_field.data_type())?), + Struct(target_fields) => { + cast_struct_column(source_col, target_fields, cast_options) + } + _ => Ok(cast_with_options( + source_col, + target_field.data_type(), + cast_options, + )?), } } @@ -141,7 +182,7 @@ pub fn cast_column(source_col: &ArrayRef, target_field: &Field) -> Result<ArrayR /// * `target_fields` - Fields from the target struct type /// /// # Returns -/// * `Ok(true)` if the structs are compatible for casting +/// * `Ok(())` if the structs are compatible for casting /// * `Err(DataFusionError)` with detailed error message if incompatible /// /// # Examples @@ -149,7 +190,7 @@ pub fn cast_column(source_col: &ArrayRef, target_field: &Field) -> Result<ArrayR /// // Compatible: source has extra field, target has missing field /// // Source: {a: i32, b: string, c: f64} /// // Target: {a: i64, d: bool} -/// // Result: Ok(true) - 'a' can cast i32->i64, 'b','c' ignored, 'd' filled with nulls +/// // Result: Ok(()) - 'a' can cast i32->i64, 'b','c' ignored, 'd' filled with nulls /// /// // Incompatible: matching field has incompatible types /// // Source: {a: string} @@ -159,7 +200,7 @@ pub fn cast_column(source_col: &ArrayRef, target_field: &Field) -> Result<ArrayR pub fn validate_struct_compatibility( source_fields: &[FieldRef], target_fields: &[FieldRef], -) -> Result<bool> { +) -> Result<()> { // Check compatibility for each target field for target_field in target_fields { // Look for matching field in source by name @@ -167,6 +208,15 @@ pub fn validate_struct_compatibility( .iter() .find(|f| f.name() == target_field.name()) { + // Ensure nullability is compatible. It is invalid to cast a nullable + // source field to a non-nullable target field as this may discard + // null values. + if source_field.is_nullable() && !target_field.is_nullable() { + return _plan_err!( + "Cannot cast nullable struct field '{}' to non-nullable field", + target_field.name() + ); + } // Check if the matching field types are compatible match (source_field.data_type(), target_field.data_type()) { // Recursively validate nested structs @@ -193,15 +243,21 @@ pub fn validate_struct_compatibility( } // Extra fields in source are OK - they'll be ignored - Ok(true) + Ok(()) } #[cfg(test)] mod tests { + use super::*; + use crate::format::DEFAULT_CAST_OPTIONS; use arrow::{ - array::{Int32Array, Int64Array, StringArray}, - datatypes::{DataType, Field}, + array::{ + BinaryArray, Int32Array, Int32Builder, Int64Array, ListArray, MapArray, + MapBuilder, StringArray, StringBuilder, + }, + buffer::NullBuffer, + datatypes::{DataType, Field, FieldRef, Int32Type}, }; /// Macro to extract and downcast a column from a StructArray macro_rules! get_column_as { @@ -215,11 +271,35 @@ mod tests { }; } + fn field(name: &str, data_type: DataType) -> Field { + Field::new(name, data_type, true) + } + + fn non_null_field(name: &str, data_type: DataType) -> Field { + Field::new(name, data_type, false) + } + + fn arc_field(name: &str, data_type: DataType) -> FieldRef { + Arc::new(field(name, data_type)) + } + + fn struct_type(fields: Vec<Field>) -> DataType { + Struct(fields.into()) + } + + fn struct_field(name: &str, fields: Vec<Field>) -> Field { + field(name, struct_type(fields)) + } + + fn arc_struct_field(name: &str, fields: Vec<Field>) -> FieldRef { + Arc::new(struct_field(name, fields)) + } + #[test] fn test_cast_simple_column() { let source = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; - let target_field = Field::new("ints", DataType::Int64, true); - let result = cast_column(&source, &target_field).unwrap(); + let target_field = field("ints", DataType::Int64); + let result = cast_column(&source, &target_field, &DEFAULT_CAST_OPTIONS).unwrap(); let result = result.as_any().downcast_ref::<Int64Array>().unwrap(); assert_eq!(result.len(), 3); assert_eq!(result.value(0), 1); @@ -227,28 +307,45 @@ mod tests { assert_eq!(result.value(2), 3); } + #[test] + fn test_cast_column_with_options() { + let source = Arc::new(Int64Array::from(vec![1, i64::MAX])) as ArrayRef; + let target_field = field("ints", DataType::Int32); + + let safe_opts = CastOptions { + // safe: false - return Err for failure + safe: false, + ..DEFAULT_CAST_OPTIONS + }; + assert!(cast_column(&source, &target_field, &safe_opts).is_err()); + + let unsafe_opts = CastOptions { + // safe: true - return Null for failure + safe: true, + ..DEFAULT_CAST_OPTIONS + }; + let result = cast_column(&source, &target_field, &unsafe_opts).unwrap(); + let result = result.as_any().downcast_ref::<Int32Array>().unwrap(); + assert_eq!(result.value(0), 1); + assert!(result.is_null(1)); + } + #[test] fn test_cast_struct_with_missing_field() { let a_array = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef; let source_struct = StructArray::from(vec![( - Arc::new(Field::new("a", DataType::Int32, true)), + arc_field("a", DataType::Int32), Arc::clone(&a_array), )]); let source_col = Arc::new(source_struct) as ArrayRef; - let target_field = Field::new( + let target_field = struct_field( "s", - Struct( - vec![ - Arc::new(Field::new("a", DataType::Int32, true)), - Arc::new(Field::new("b", DataType::Utf8, true)), - ] - .into(), - ), - true, + vec![field("a", DataType::Int32), field("b", DataType::Utf8)], ); - let result = cast_column(&source_col, &target_field).unwrap(); + let result = + cast_column(&source_col, &target_field, &DEFAULT_CAST_OPTIONS).unwrap(); let struct_array = result.as_any().downcast_ref::<StructArray>().unwrap(); assert_eq!(struct_array.fields().len(), 2); let a_result = get_column_as!(&struct_array, "a", Int32Array); @@ -264,13 +361,9 @@ mod tests { #[test] fn test_cast_struct_source_not_struct() { let source = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef; - let target_field = Field::new( - "s", - Struct(vec![Arc::new(Field::new("a", DataType::Int32, true))].into()), - true, - ); + let target_field = struct_field("s", vec![field("a", DataType::Int32)]); - let result = cast_column(&source, &target_field); + let result = cast_column(&source, &target_field, &DEFAULT_CAST_OPTIONS); assert!(result.is_err()); let error_msg = result.unwrap_err().to_string(); assert!(error_msg.contains("Cannot cast column of type")); @@ -278,16 +371,34 @@ mod tests { assert!(error_msg.contains("Source must be a struct")); } + #[test] + fn test_cast_struct_incompatible_child_type() { + let a_array = Arc::new(BinaryArray::from(vec![ + Some(b"a".as_ref()), + Some(b"b".as_ref()), + ])) as ArrayRef; + let source_struct = + StructArray::from(vec![(arc_field("a", DataType::Binary), a_array)]); + let source_col = Arc::new(source_struct) as ArrayRef; + + let target_field = struct_field("s", vec![field("a", DataType::Int32)]); + + let result = cast_column(&source_col, &target_field, &DEFAULT_CAST_OPTIONS); + assert!(result.is_err()); + let error_msg = result.unwrap_err().to_string(); + assert!(error_msg.contains("Cannot cast struct field 'a'")); + } + #[test] fn test_validate_struct_compatibility_incompatible_types() { // Source struct: {field1: Binary, field2: String} let source_fields = vec![ - Arc::new(Field::new("field1", DataType::Binary, true)), - Arc::new(Field::new("field2", DataType::Utf8, true)), + arc_field("field1", DataType::Binary), + arc_field("field2", DataType::Utf8), ]; // Target struct: {field1: Int32} - let target_fields = vec![Arc::new(Field::new("field1", DataType::Int32, true))]; + let target_fields = vec![arc_field("field1", DataType::Int32)]; let result = validate_struct_compatibility(&source_fields, &target_fields); assert!(result.is_err()); @@ -301,29 +412,293 @@ mod tests { fn test_validate_struct_compatibility_compatible_types() { // Source struct: {field1: Int32, field2: String} let source_fields = vec![ - Arc::new(Field::new("field1", DataType::Int32, true)), - Arc::new(Field::new("field2", DataType::Utf8, true)), + arc_field("field1", DataType::Int32), + arc_field("field2", DataType::Utf8), ]; // Target struct: {field1: Int64} (Int32 can cast to Int64) - let target_fields = vec![Arc::new(Field::new("field1", DataType::Int64, true))]; + let target_fields = vec![arc_field("field1", DataType::Int64)]; let result = validate_struct_compatibility(&source_fields, &target_fields); assert!(result.is_ok()); - assert!(result.unwrap()); } #[test] fn test_validate_struct_compatibility_missing_field_in_source() { // Source struct: {field2: String} (missing field1) - let source_fields = vec![Arc::new(Field::new("field2", DataType::Utf8, true))]; + let source_fields = vec![arc_field("field2", DataType::Utf8)]; // Target struct: {field1: Int32} - let target_fields = vec![Arc::new(Field::new("field1", DataType::Int32, true))]; + let target_fields = vec![arc_field("field1", DataType::Int32)]; // Should be OK - missing fields will be filled with nulls let result = validate_struct_compatibility(&source_fields, &target_fields); assert!(result.is_ok()); - assert!(result.unwrap()); + } + + #[test] + fn test_validate_struct_compatibility_additional_field_in_source() { + // Source struct: {field1: Int32, field2: String} (extra field2) + let source_fields = vec![ + arc_field("field1", DataType::Int32), + arc_field("field2", DataType::Utf8), + ]; + + // Target struct: {field1: Int32} + let target_fields = vec![arc_field("field1", DataType::Int32)]; + + // Should be OK - extra fields in source are ignored + let result = validate_struct_compatibility(&source_fields, &target_fields); + assert!(result.is_ok()); + } + + #[test] + fn test_cast_struct_parent_nulls_retained() { + let a_array = Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrayRef; + let fields = vec![arc_field("a", DataType::Int32)]; + let nulls = Some(NullBuffer::from(vec![true, false])); + let source_struct = StructArray::new(fields.clone().into(), vec![a_array], nulls); + let source_col = Arc::new(source_struct) as ArrayRef; + + let target_field = struct_field("s", vec![field("a", DataType::Int64)]); + + let result = + cast_column(&source_col, &target_field, &DEFAULT_CAST_OPTIONS).unwrap(); + let struct_array = result.as_any().downcast_ref::<StructArray>().unwrap(); + assert_eq!(struct_array.null_count(), 1); + assert!(struct_array.is_valid(0)); + assert!(struct_array.is_null(1)); + + let a_result = get_column_as!(&struct_array, "a", Int64Array); + assert_eq!(a_result.value(0), 1); + assert_eq!(a_result.value(1), 2); + } + + #[test] + fn test_validate_struct_compatibility_nullable_to_non_nullable() { + // Source struct: {field1: Int32 nullable} + let source_fields = vec![arc_field("field1", DataType::Int32)]; + + // Target struct: {field1: Int32 non-nullable} + let target_fields = vec![Arc::new(non_null_field("field1", DataType::Int32))]; + + let result = validate_struct_compatibility(&source_fields, &target_fields); + assert!(result.is_err()); + let error_msg = result.unwrap_err().to_string(); + assert!(error_msg.contains("field1")); + assert!(error_msg.contains("non-nullable")); + } + + #[test] + fn test_validate_struct_compatibility_non_nullable_to_nullable() { + // Source struct: {field1: Int32 non-nullable} + let source_fields = vec![Arc::new(non_null_field("field1", DataType::Int32))]; + + // Target struct: {field1: Int32 nullable} + let target_fields = vec![arc_field("field1", DataType::Int32)]; + + let result = validate_struct_compatibility(&source_fields, &target_fields); + assert!(result.is_ok()); + } + + #[test] + fn test_validate_struct_compatibility_nested_nullable_to_non_nullable() { + // Source struct: {field1: {nested: Int32 nullable}} + let source_fields = vec![Arc::new(non_null_field( + "field1", + struct_type(vec![field("nested", DataType::Int32)]), + ))]; + + // Target struct: {field1: {nested: Int32 non-nullable}} + let target_fields = vec![Arc::new(non_null_field( + "field1", + struct_type(vec![non_null_field("nested", DataType::Int32)]), + ))]; + + let result = validate_struct_compatibility(&source_fields, &target_fields); + assert!(result.is_err()); + let error_msg = result.unwrap_err().to_string(); + assert!(error_msg.contains("nested")); + assert!(error_msg.contains("non-nullable")); + } + + #[test] + fn test_cast_nested_struct_with_extra_and_missing_fields() { + // Source inner struct has fields a, b, extra + let a = Arc::new(Int32Array::from(vec![Some(1), None])) as ArrayRef; + let b = Arc::new(Int32Array::from(vec![Some(2), Some(3)])) as ArrayRef; + let extra = Arc::new(Int32Array::from(vec![Some(9), Some(10)])) as ArrayRef; + + let inner = StructArray::from(vec![ + (arc_field("a", DataType::Int32), a), + (arc_field("b", DataType::Int32), b), + (arc_field("extra", DataType::Int32), extra), + ]); + + let source_struct = StructArray::from(vec![( + arc_struct_field( + "inner", + vec![ + field("a", DataType::Int32), + field("b", DataType::Int32), + field("extra", DataType::Int32), + ], + ), + Arc::new(inner) as ArrayRef, + )]); + let source_col = Arc::new(source_struct) as ArrayRef; + + // Target inner struct reorders fields, adds "missing", and drops "extra" + let target_field = struct_field( + "outer", + vec![struct_field( + "inner", + vec![ + field("b", DataType::Int64), + field("a", DataType::Int32), + field("missing", DataType::Int32), + ], + )], + ); + + let result = + cast_column(&source_col, &target_field, &DEFAULT_CAST_OPTIONS).unwrap(); + let outer = result.as_any().downcast_ref::<StructArray>().unwrap(); + let inner = get_column_as!(&outer, "inner", StructArray); + assert_eq!(inner.fields().len(), 3); + + let b = get_column_as!(inner, "b", Int64Array); + assert_eq!(b.value(0), 2); + assert_eq!(b.value(1), 3); + assert!(!b.is_null(0)); + assert!(!b.is_null(1)); + + let a = get_column_as!(inner, "a", Int32Array); + assert_eq!(a.value(0), 1); + assert!(a.is_null(1)); + + let missing = get_column_as!(inner, "missing", Int32Array); + assert!(missing.is_null(0)); + assert!(missing.is_null(1)); + } + + #[test] + fn test_cast_struct_with_array_and_map_fields() { + // Array field with second row null + let arr_array = Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![ + Some(vec![Some(1), Some(2)]), + None, + ])) as ArrayRef; + + // Map field with second row null + let string_builder = StringBuilder::new(); + let int_builder = Int32Builder::new(); + let mut map_builder = MapBuilder::new(None, string_builder, int_builder); + map_builder.keys().append_value("a"); + map_builder.values().append_value(1); + map_builder.append(true).unwrap(); + map_builder.append(false).unwrap(); + let map_array = Arc::new(map_builder.finish()) as ArrayRef; + + let source_struct = StructArray::from(vec![ + ( + arc_field( + "arr", + DataType::List(Arc::new(field("item", DataType::Int32))), + ), + arr_array, + ), + ( + arc_field( + "map", + DataType::Map( + Arc::new(non_null_field( + "entries", + struct_type(vec![ + non_null_field("keys", DataType::Utf8), + field("values", DataType::Int32), + ]), + )), + false, + ), + ), + map_array, + ), + ]); + let source_col = Arc::new(source_struct) as ArrayRef; + + let target_field = struct_field( + "s", + vec![ + field( + "arr", + DataType::List(Arc::new(field("item", DataType::Int32))), + ), + field( + "map", + DataType::Map( + Arc::new(non_null_field( + "entries", + struct_type(vec![ + non_null_field("keys", DataType::Utf8), + field("values", DataType::Int32), + ]), + )), + false, + ), + ), + ], + ); + + let result = + cast_column(&source_col, &target_field, &DEFAULT_CAST_OPTIONS).unwrap(); + let struct_array = result.as_any().downcast_ref::<StructArray>().unwrap(); + + let arr = get_column_as!(&struct_array, "arr", ListArray); + assert!(!arr.is_null(0)); + assert!(arr.is_null(1)); + let arr0 = arr.value(0); + let values = arr0.as_any().downcast_ref::<Int32Array>().unwrap(); + assert_eq!(values.value(0), 1); + assert_eq!(values.value(1), 2); + + let map = get_column_as!(&struct_array, "map", MapArray); + assert!(!map.is_null(0)); + assert!(map.is_null(1)); + let map0 = map.value(0); + let entries = map0.as_any().downcast_ref::<StructArray>().unwrap(); + let keys = get_column_as!(entries, "keys", StringArray); + let vals = get_column_as!(entries, "values", Int32Array); + assert_eq!(keys.value(0), "a"); + assert_eq!(vals.value(0), 1); + } + + #[test] + fn test_cast_struct_field_order_differs() { + let a = Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrayRef; + let b = Arc::new(Int32Array::from(vec![Some(3), None])) as ArrayRef; + + let source_struct = StructArray::from(vec![ + (arc_field("a", DataType::Int32), a), + (arc_field("b", DataType::Int32), b), + ]); + let source_col = Arc::new(source_struct) as ArrayRef; + + let target_field = struct_field( + "s", + vec![field("b", DataType::Int64), field("a", DataType::Int32)], + ); + + let result = + cast_column(&source_col, &target_field, &DEFAULT_CAST_OPTIONS).unwrap(); + let struct_array = result.as_any().downcast_ref::<StructArray>().unwrap(); + + let b_col = get_column_as!(&struct_array, "b", Int64Array); + assert_eq!(b_col.value(0), 3); + assert!(b_col.is_null(1)); + + let a_col = get_column_as!(&struct_array, "a", Int32Array); + assert_eq!(a_col.value(0), 1); + assert_eq!(a_col.value(1), 2); } } diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index 16de00500b..bd5833bb78 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -26,14 +26,20 @@ use arrow::{ datatypes::{DataType, Field, Schema, SchemaRef}, }; use datafusion_common::{ + format::DEFAULT_CAST_OPTIONS, nested_struct::{cast_column, validate_struct_compatibility}, plan_err, ColumnStatistics, }; use std::{fmt::Debug, sync::Arc}; /// Function used by [`SchemaMapping`] to adapt a column from the file schema to /// the table schema. -pub type CastColumnFn = - dyn Fn(&ArrayRef, &Field) -> datafusion_common::Result<ArrayRef> + Send + Sync; +pub type CastColumnFn = dyn Fn( + &ArrayRef, + &Field, + &arrow::compute::CastOptions, + ) -> datafusion_common::Result<ArrayRef> + + Send + + Sync; /// Factory for creating [`SchemaAdapter`] /// @@ -252,7 +258,9 @@ pub(crate) fn can_cast_field( ) -> datafusion_common::Result<bool> { match (file_field.data_type(), table_field.data_type()) { (DataType::Struct(source_fields), DataType::Struct(target_fields)) => { - validate_struct_compatibility(source_fields, target_fields) + // validate_struct_compatibility returns Result<()>; on success we can cast structs + validate_struct_compatibility(source_fields, target_fields)?; + Ok(true) } _ => { if can_cast_types(file_field.data_type(), table_field.data_type()) { @@ -302,7 +310,13 @@ impl SchemaAdapter for DefaultSchemaAdapter { Arc::new(SchemaMapping::new( Arc::clone(&self.projected_table_schema), field_mappings, - Arc::new(|array: &ArrayRef, field: &Field| cast_column(array, field)), + Arc::new( + |array: &ArrayRef, + field: &Field, + opts: &arrow::compute::CastOptions| { + cast_column(array, field, opts) + }, + ), )), projection, )) @@ -414,7 +428,13 @@ impl SchemaMapper for SchemaMapping { || Ok(new_null_array(field.data_type(), batch_rows)), // However, if it does exist in both, use the cast_column function // to perform any necessary conversions - |batch_idx| (self.cast_column)(&batch_cols[batch_idx], field), + |batch_idx| { + (self.cast_column)( + &batch_cols[batch_idx], + field, + &DEFAULT_CAST_OPTIONS, + ) + }, ) }) .collect::<datafusion_common::Result<Vec<_>, _>>()?; @@ -641,7 +661,11 @@ mod tests { let mapping = SchemaMapping::new( Arc::clone(&projected_schema), field_mappings.clone(), - Arc::new(|array: &ArrayRef, field: &Field| cast_column(array, field)), + Arc::new( + |array: &ArrayRef, field: &Field, opts: &arrow::compute::CastOptions| { + cast_column(array, field, opts) + }, + ), ); // Check that fields were set correctly --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org