This is an automated email from the ASF dual-hosted git repository.

kosiew pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c30f13c82 Add nested struct casting support and integrate into 
SchemaAdapter (#16371)
0c30f13c82 is described below

commit 0c30f13c82cd4d5ddb753048152345c10301cc27
Author: kosiew <kos...@gmail.com>
AuthorDate: Fri Jun 27 18:46:58 2025 +0800

    Add nested struct casting support and integrate into SchemaAdapter (#16371)
    
    This commit introduces a new `nested_struct` module to support recursive 
struct-to-struct casting in DataFusion. It enables safe and flexible schema 
evolution through:
    
    - Recursive casting of nested structs via `cast_column` and 
`cast_struct_column`
    - Filling missing target fields with null values
    - Ignoring extra fields present in the source but absent in the target
    - Validating field compatibility using `validate_struct_compatibility`, 
including nested levels
    
    Integration updates include:
    
    - Enhancing `SchemaAdapter` and `SchemaMapping` to use the new nested 
struct casting logic
    - Injecting a customizable `cast_column` function into `SchemaMapping`
    - Updating schema mapping to support nested structs for file-to-table 
schema projection
    
    Test coverage:
    
    - Unit tests for simple, nested, missing, and incompatible struct scenarios
    - Adapter tests for batch mapping and statistics mapping with nested fields
    - Ensures existing functionality remains intact
    
    These changes provide robust support for evolving complex data schemas and 
improve the safety of nested data transformations, especially when working with 
Parquet and JSON data sources.
---
 datafusion/common/src/lib.rs                |   1 +
 datafusion/common/src/nested_struct.rs      | 329 +++++++++++++++++++++++++
 datafusion/core/src/datasource/mod.rs       |  18 +-
 datafusion/datasource/src/schema_adapter.rs | 359 ++++++++++++++++++++++++++--
 4 files changed, 674 insertions(+), 33 deletions(-)

diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index d89e08c7d4..3ea7321ef3 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -46,6 +46,7 @@ pub mod file_options;
 pub mod format;
 pub mod hash_utils;
 pub mod instant;
+pub mod nested_struct;
 mod null_equality;
 pub mod parsers;
 pub mod pruning;
diff --git a/datafusion/common/src/nested_struct.rs 
b/datafusion/common/src/nested_struct.rs
new file mode 100644
index 0000000000..f349b360f2
--- /dev/null
+++ b/datafusion/common/src/nested_struct.rs
@@ -0,0 +1,329 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{DataFusionError, Result, _plan_err};
+use arrow::{
+    array::{new_null_array, Array, ArrayRef, StructArray},
+    compute::cast,
+    datatypes::{DataType::Struct, Field, FieldRef},
+};
+use std::sync::Arc;
+
+/// Cast a struct column to match target struct fields, handling nested 
structs recursively.
+///
+/// This function implements struct-to-struct casting with the assumption that 
**structs should
+/// always be allowed to cast to other structs**. However, the source column 
must already be
+/// a struct type - non-struct sources will result in an error.
+///
+/// ## Field Matching Strategy
+/// - **By Name**: Source struct fields are matched to target fields by name 
(case-sensitive)
+/// - **Type Adaptation**: When a matching field is found, it is recursively 
cast to the target field's type
+/// - **Missing Fields**: Target fields not present in the source are filled 
with null values
+/// - **Extra Fields**: Source fields not present in the target are ignored
+///
+/// ## Nested Struct Handling
+/// - Nested structs are handled recursively using the same casting rules
+/// - Each level of nesting follows the same field matching and null-filling 
strategy
+/// - This allows for complex struct transformations while maintaining data 
integrity
+///
+/// # Arguments
+/// * `source_col` - The source array to cast (must be a struct array)
+/// * `target_fields` - The target struct field definitions to cast to
+///
+/// # Returns
+/// A `Result<ArrayRef>` containing the cast struct array
+///
+/// # Errors
+/// Returns a `DataFusionError::Plan` if the source column is not a struct type
+fn cast_struct_column(
+    source_col: &ArrayRef,
+    target_fields: &[Arc<Field>],
+) -> Result<ArrayRef> {
+    if let Some(struct_array) = 
source_col.as_any().downcast_ref::<StructArray>() {
+        let mut children: Vec<(Arc<Field>, Arc<dyn Array>)> = Vec::new();
+        let num_rows = source_col.len();
+
+        for target_child_field in target_fields {
+            let field_arc = Arc::clone(target_child_field);
+            match struct_array.column_by_name(target_child_field.name()) {
+                Some(source_child_col) => {
+                    let adapted_child =
+                        cast_column(source_child_col, target_child_field)?;
+                    children.push((field_arc, adapted_child));
+                }
+                None => {
+                    children.push((
+                        field_arc,
+                        new_null_array(target_child_field.data_type(), 
num_rows),
+                    ));
+                }
+            }
+        }
+
+        let struct_array = StructArray::from(children);
+        Ok(Arc::new(struct_array))
+    } else {
+        // Return error if source is not a struct type
+        Err(DataFusionError::Plan(format!(
+            "Cannot cast column of type {:?} to struct type. Source must be a 
struct to cast to struct.",
+            source_col.data_type()
+        )))
+    }
+}
+
+/// Cast a column to match the target field type, with special handling for 
nested structs.
+///
+/// This function serves as the main entry point for column casting 
operations. For struct
+/// types, it enforces that **only struct columns can be cast to struct 
types**.
+///
+/// ## Casting Behavior
+/// - **Struct Types**: Delegates to `cast_struct_column` for struct-to-struct 
casting only
+/// - **Non-Struct Types**: Uses Arrow's standard `cast` function for 
primitive type conversions
+///
+/// ## Struct Casting Requirements
+/// The struct casting logic requires that the source column must already be a 
struct type.
+/// This makes the function useful for:
+/// - Schema evolution scenarios where struct layouts change over time
+/// - Data migration between different struct schemas  
+/// - Type-safe data processing pipelines that maintain struct type integrity
+///
+/// # Arguments
+/// * `source_col` - The source array to cast
+/// * `target_field` - The target field definition (including type and 
metadata)
+///
+/// # Returns
+/// A `Result<ArrayRef>` containing the cast array
+///
+/// # Errors
+/// Returns an error if:
+/// - Attempting to cast a non-struct column to a struct type
+/// - Arrow's cast function fails for non-struct types
+/// - Memory allocation fails during struct construction
+/// - Invalid data type combinations are encountered
+pub fn cast_column(source_col: &ArrayRef, target_field: &Field) -> 
Result<ArrayRef> {
+    match target_field.data_type() {
+        Struct(target_fields) => cast_struct_column(source_col, target_fields),
+        _ => Ok(cast(source_col, target_field.data_type())?),
+    }
+}
+
+/// Validates compatibility between source and target struct fields for 
casting operations.
+///
+/// This function implements comprehensive struct compatibility checking by 
examining:
+/// - Field name matching between source and target structs  
+/// - Type castability for each matching field (including recursive struct 
validation)
+/// - Proper handling of missing fields (target fields not in source are 
allowed - filled with nulls)
+/// - Proper handling of extra fields (source fields not in target are allowed 
- ignored)
+///
+/// # Compatibility Rules
+/// - **Field Matching**: Fields are matched by name (case-sensitive)
+/// - **Missing Target Fields**: Allowed - will be filled with null values 
during casting
+/// - **Extra Source Fields**: Allowed - will be ignored during casting  
+/// - **Type Compatibility**: Each matching field must be castable using 
Arrow's type system
+/// - **Nested Structs**: Recursively validates nested struct compatibility
+///
+/// # Arguments
+/// * `source_fields` - Fields from the source struct type
+/// * `target_fields` - Fields from the target struct type
+///
+/// # Returns
+/// * `Ok(true)` if the structs are compatible for casting
+/// * `Err(DataFusionError)` with detailed error message if incompatible
+///
+/// # Examples
+/// ```text
+/// // Compatible: source has extra field, target has missing field
+/// // Source: {a: i32, b: string, c: f64}  
+/// // Target: {a: i64, d: bool}
+/// // Result: Ok(true) - 'a' can cast i32->i64, 'b','c' ignored, 'd' filled 
with nulls
+///
+/// // Incompatible: matching field has incompatible types
+/// // Source: {a: string}
+/// // Target: {a: binary}
+/// // Result: Err(...) - string cannot cast to binary
+/// ```
+pub fn validate_struct_compatibility(
+    source_fields: &[FieldRef],
+    target_fields: &[FieldRef],
+) -> Result<bool> {
+    // Check compatibility for each target field
+    for target_field in target_fields {
+        // Look for matching field in source by name
+        if let Some(source_field) = source_fields
+            .iter()
+            .find(|f| f.name() == target_field.name())
+        {
+            // Check if the matching field types are compatible
+            match (source_field.data_type(), target_field.data_type()) {
+                // Recursively validate nested structs
+                (Struct(source_nested), Struct(target_nested)) => {
+                    validate_struct_compatibility(source_nested, 
target_nested)?;
+                }
+                // For non-struct types, use the existing castability check
+                _ => {
+                    if !arrow::compute::can_cast_types(
+                        source_field.data_type(),
+                        target_field.data_type(),
+                    ) {
+                        return _plan_err!(
+                            "Cannot cast struct field '{}' from type {:?} to 
type {:?}",
+                            target_field.name(),
+                            source_field.data_type(),
+                            target_field.data_type()
+                        );
+                    }
+                }
+            }
+        }
+        // Missing fields in source are OK - they'll be filled with nulls
+    }
+
+    // Extra fields in source are OK - they'll be ignored
+    Ok(true)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::{
+        array::{Int32Array, Int64Array, StringArray},
+        datatypes::{DataType, Field},
+    };
+    /// Macro to extract and downcast a column from a StructArray
+    macro_rules! get_column_as {
+        ($struct_array:expr, $column_name:expr, $array_type:ty) => {
+            $struct_array
+                .column_by_name($column_name)
+                .unwrap()
+                .as_any()
+                .downcast_ref::<$array_type>()
+                .unwrap()
+        };
+    }
+
+    #[test]
+    fn test_cast_simple_column() {
+        let source = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
+        let target_field = Field::new("ints", DataType::Int64, true);
+        let result = cast_column(&source, &target_field).unwrap();
+        let result = result.as_any().downcast_ref::<Int64Array>().unwrap();
+        assert_eq!(result.len(), 3);
+        assert_eq!(result.value(0), 1);
+        assert_eq!(result.value(1), 2);
+        assert_eq!(result.value(2), 3);
+    }
+
+    #[test]
+    fn test_cast_struct_with_missing_field() {
+        let a_array = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
+        let source_struct = StructArray::from(vec![(
+            Arc::new(Field::new("a", DataType::Int32, true)),
+            Arc::clone(&a_array),
+        )]);
+        let source_col = Arc::new(source_struct) as ArrayRef;
+
+        let target_field = Field::new(
+            "s",
+            Struct(
+                vec![
+                    Arc::new(Field::new("a", DataType::Int32, true)),
+                    Arc::new(Field::new("b", DataType::Utf8, true)),
+                ]
+                .into(),
+            ),
+            true,
+        );
+
+        let result = cast_column(&source_col, &target_field).unwrap();
+        let struct_array = 
result.as_any().downcast_ref::<StructArray>().unwrap();
+        assert_eq!(struct_array.fields().len(), 2);
+        let a_result = get_column_as!(&struct_array, "a", Int32Array);
+        assert_eq!(a_result.value(0), 1);
+        assert_eq!(a_result.value(1), 2);
+
+        let b_result = get_column_as!(&struct_array, "b", StringArray);
+        assert_eq!(b_result.len(), 2);
+        assert!(b_result.is_null(0));
+        assert!(b_result.is_null(1));
+    }
+
+    #[test]
+    fn test_cast_struct_source_not_struct() {
+        let source = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
+        let target_field = Field::new(
+            "s",
+            Struct(vec![Arc::new(Field::new("a", DataType::Int32, 
true))].into()),
+            true,
+        );
+
+        let result = cast_column(&source, &target_field);
+        assert!(result.is_err());
+        let error_msg = result.unwrap_err().to_string();
+        assert!(error_msg.contains("Cannot cast column of type"));
+        assert!(error_msg.contains("to struct type"));
+        assert!(error_msg.contains("Source must be a struct"));
+    }
+
+    #[test]
+    fn test_validate_struct_compatibility_incompatible_types() {
+        // Source struct: {field1: Binary, field2: String}
+        let source_fields = vec![
+            Arc::new(Field::new("field1", DataType::Binary, true)),
+            Arc::new(Field::new("field2", DataType::Utf8, true)),
+        ];
+
+        // Target struct: {field1: Int32}
+        let target_fields = vec![Arc::new(Field::new("field1", 
DataType::Int32, true))];
+
+        let result = validate_struct_compatibility(&source_fields, 
&target_fields);
+        assert!(result.is_err());
+        let error_msg = result.unwrap_err().to_string();
+        assert!(error_msg.contains("Cannot cast struct field 'field1'"));
+        assert!(error_msg.contains("Binary"));
+        assert!(error_msg.contains("Int32"));
+    }
+
+    #[test]
+    fn test_validate_struct_compatibility_compatible_types() {
+        // Source struct: {field1: Int32, field2: String}
+        let source_fields = vec![
+            Arc::new(Field::new("field1", DataType::Int32, true)),
+            Arc::new(Field::new("field2", DataType::Utf8, true)),
+        ];
+
+        // Target struct: {field1: Int64} (Int32 can cast to Int64)
+        let target_fields = vec![Arc::new(Field::new("field1", 
DataType::Int64, true))];
+
+        let result = validate_struct_compatibility(&source_fields, 
&target_fields);
+        assert!(result.is_ok());
+        assert!(result.unwrap());
+    }
+
+    #[test]
+    fn test_validate_struct_compatibility_missing_field_in_source() {
+        // Source struct: {field2: String} (missing field1)
+        let source_fields = vec![Arc::new(Field::new("field2", DataType::Utf8, 
true))];
+
+        // Target struct: {field1: Int32}
+        let target_fields = vec![Arc::new(Field::new("field1", 
DataType::Int32, true))];
+
+        // Should be OK - missing fields will be filled with nulls
+        let result = validate_struct_compatibility(&source_fields, 
&target_fields);
+        assert!(result.is_ok());
+        assert!(result.unwrap());
+    }
+}
diff --git a/datafusion/core/src/datasource/mod.rs 
b/datafusion/core/src/datasource/mod.rs
index b3d69064ff..94d651ddad 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -51,11 +51,8 @@ pub use datafusion_physical_expr::create_ordering;
 #[cfg(all(test, feature = "parquet"))]
 mod tests {
 
-    use datafusion_datasource::schema_adapter::{
-        DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, 
SchemaMapper,
-    };
-
     use crate::prelude::SessionContext;
+    use ::object_store::{path::Path, ObjectMeta};
     use arrow::{
         array::{Int32Array, StringArray},
         datatypes::{DataType, Field, Schema, SchemaRef},
@@ -63,13 +60,17 @@ mod tests {
     };
     use datafusion_common::{record_batch, test_util::batches_to_sort_string};
     use datafusion_datasource::{
-        file::FileSource, file_scan_config::FileScanConfigBuilder,
-        source::DataSourceExec, PartitionedFile,
+        file::FileSource,
+        file_scan_config::FileScanConfigBuilder,
+        schema_adapter::{
+            DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
+            SchemaMapper,
+        },
+        source::DataSourceExec,
+        PartitionedFile,
     };
     use datafusion_datasource_parquet::source::ParquetSource;
-    use datafusion_execution::object_store::ObjectStoreUrl;
     use datafusion_physical_plan::collect;
-    use object_store::{path::Path, ObjectMeta};
     use std::{fs, sync::Arc};
     use tempfile::TempDir;
 
@@ -79,6 +80,7 @@ mod tests {
         // record batches returned from parquet.  This can be useful for 
schema evolution
         // where older files may not have all columns.
 
+        use datafusion_execution::object_store::ObjectStoreUrl;
         let tmp_dir = TempDir::new().unwrap();
         let table_dir = tmp_dir.path().join("parquet_test");
         fs::DirBuilder::new().create(table_dir.as_path()).unwrap();
diff --git a/datafusion/datasource/src/schema_adapter.rs 
b/datafusion/datasource/src/schema_adapter.rs
index 519be97a81..b43041c8d1 100644
--- a/datafusion/datasource/src/schema_adapter.rs
+++ b/datafusion/datasource/src/schema_adapter.rs
@@ -20,13 +20,20 @@
 //! Adapter provides a method of translating the RecordBatches that come out 
of the
 //! physical format into how they should be used by DataFusion.  For instance, 
a schema
 //! can be stored external to a parquet file that maps parquet logical types 
to arrow types.
-
-use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions};
-use arrow::compute::{can_cast_types, cast};
-use arrow::datatypes::{Field, Schema, SchemaRef};
-use datafusion_common::{plan_err, ColumnStatistics};
-use std::fmt::Debug;
-use std::sync::Arc;
+use arrow::{
+    array::{new_null_array, ArrayRef, RecordBatch, RecordBatchOptions},
+    compute::can_cast_types,
+    datatypes::{DataType, Field, Schema, SchemaRef},
+};
+use datafusion_common::{
+    nested_struct::{cast_column, validate_struct_compatibility},
+    plan_err, ColumnStatistics,
+};
+use std::{fmt::Debug, sync::Arc};
+/// Function used by [`SchemaMapping`] to adapt a column from the file schema 
to
+/// the table schema.
+pub type CastColumnFn =
+    dyn Fn(&ArrayRef, &Field) -> datafusion_common::Result<ArrayRef> + Send + 
Sync;
 
 /// Factory for creating [`SchemaAdapter`]
 ///
@@ -232,15 +239,22 @@ pub(crate) fn can_cast_field(
     file_field: &Field,
     table_field: &Field,
 ) -> datafusion_common::Result<bool> {
-    if can_cast_types(file_field.data_type(), table_field.data_type()) {
-        Ok(true)
-    } else {
-        plan_err!(
-            "Cannot cast file schema field {} of type {:?} to table schema 
field of type {:?}",
-            file_field.name(),
-            file_field.data_type(),
-            table_field.data_type()
-        )
+    match (file_field.data_type(), table_field.data_type()) {
+        (DataType::Struct(source_fields), DataType::Struct(target_fields)) => {
+            validate_struct_compatibility(source_fields, target_fields)
+        }
+        _ => {
+            if can_cast_types(file_field.data_type(), table_field.data_type()) 
{
+                Ok(true)
+            } else {
+                plan_err!(
+                    "Cannot cast file schema field {} of type {:?} to table 
schema field of type {:?}",
+                    file_field.name(),
+                    file_field.data_type(),
+                    table_field.data_type()
+                )
+            }
+        }
     }
 }
 
@@ -277,6 +291,7 @@ impl SchemaAdapter for DefaultSchemaAdapter {
             Arc::new(SchemaMapping::new(
                 Arc::clone(&self.projected_table_schema),
                 field_mappings,
+                Arc::new(|array: &ArrayRef, field: &Field| cast_column(array, 
field)),
             )),
             projection,
         ))
@@ -323,7 +338,6 @@ where
 /// `projected_table_schema` as it can only operate on the projected fields.
 ///
 /// [`map_batch`]: Self::map_batch
-#[derive(Debug)]
 pub struct SchemaMapping {
     /// The schema of the table. This is the expected schema after conversion
     /// and it should match the schema of the query result.
@@ -334,6 +348,19 @@ pub struct SchemaMapping {
     /// They are Options instead of just plain `usize`s because the table could
     /// have fields that don't exist in the file.
     field_mappings: Vec<Option<usize>>,
+    /// Function used to adapt a column from the file schema to the table 
schema
+    /// when it exists in both schemas
+    cast_column: Arc<CastColumnFn>,
+}
+
+impl Debug for SchemaMapping {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("SchemaMapping")
+            .field("projected_table_schema", &self.projected_table_schema)
+            .field("field_mappings", &self.field_mappings)
+            .field("cast_column", &"<fn>")
+            .finish()
+    }
 }
 
 impl SchemaMapping {
@@ -343,10 +370,12 @@ impl SchemaMapping {
     pub fn new(
         projected_table_schema: SchemaRef,
         field_mappings: Vec<Option<usize>>,
+        cast_column: Arc<CastColumnFn>,
     ) -> Self {
         Self {
             projected_table_schema,
             field_mappings,
+            cast_column,
         }
     }
 }
@@ -373,9 +402,9 @@ impl SchemaMapper for SchemaMapping {
                     // If this field only exists in the table, and not in the 
file, then we know
                     // that it's null, so just return that.
                     || Ok(new_null_array(field.data_type(), batch_rows)),
-                    // However, if it does exist in both, then try to cast it 
to the correct output
-                    // type
-                    |batch_idx| cast(&batch_cols[batch_idx], 
field.data_type()),
+                    // However, if it does exist in both, use the cast_column 
function
+                    // to perform any necessary conversions
+                    |batch_idx| (self.cast_column)(&batch_cols[batch_idx], 
field),
                 )
             })
             .collect::<datafusion_common::Result<Vec<_>, _>>()?;
@@ -421,10 +450,14 @@ impl SchemaMapper for SchemaMapping {
 
 #[cfg(test)]
 mod tests {
-    use arrow::datatypes::{DataType, Field};
-    use datafusion_common::{stats::Precision, Statistics};
-
     use super::*;
+    use arrow::{
+        array::{Array, ArrayRef, StringBuilder, StructArray, 
TimestampMillisecondArray},
+        compute::cast,
+        datatypes::{DataType, Field, TimeUnit},
+        record_batch::RecordBatch,
+    };
+    use datafusion_common::{stats::Precision, Result, ScalarValue, Statistics};
 
     #[test]
     fn test_schema_mapping_map_statistics_basic() {
@@ -595,8 +628,11 @@ mod tests {
         let field_mappings = vec![Some(1), Some(0)];
 
         // Create SchemaMapping manually
-        let mapping =
-            SchemaMapping::new(Arc::clone(&projected_schema), 
field_mappings.clone());
+        let mapping = SchemaMapping::new(
+            Arc::clone(&projected_schema),
+            field_mappings.clone(),
+            Arc::new(|array: &ArrayRef, field: &Field| cast_column(array, 
field)),
+        );
 
         // Check that fields were set correctly
         assert_eq!(*mapping.projected_table_schema, *projected_schema);
@@ -700,4 +736,277 @@ mod tests {
         assert_eq!(c_array.len(), 2);
         assert_eq!(c_array.null_count(), 2);
     }
+
+    #[test]
+    fn test_adapt_struct_with_added_nested_fields() -> Result<()> {
+        let (file_schema, table_schema) = 
create_test_schemas_with_nested_fields();
+        let batch = create_test_batch_with_struct_data(&file_schema)?;
+
+        let adapter = DefaultSchemaAdapter {
+            projected_table_schema: Arc::clone(&table_schema),
+        };
+        let (mapper, _) = adapter.map_schema(file_schema.as_ref())?;
+        let mapped_batch = mapper.map_batch(batch)?;
+
+        verify_adapted_batch_with_nested_fields(&mapped_batch, &table_schema)?;
+        Ok(())
+    }
+
+    #[test]
+    fn test_map_column_statistics_struct() -> Result<()> {
+        let (file_schema, table_schema) = 
create_test_schemas_with_nested_fields();
+
+        let adapter = DefaultSchemaAdapter {
+            projected_table_schema: Arc::clone(&table_schema),
+        };
+        let (mapper, _) = adapter.map_schema(file_schema.as_ref())?;
+
+        let file_stats = vec![
+            create_test_column_statistics(
+                0,
+                100,
+                Some(ScalarValue::Int32(Some(1))),
+                Some(ScalarValue::Int32(Some(100))),
+                Some(ScalarValue::Int32(Some(5100))),
+            ),
+            create_test_column_statistics(10, 50, None, None, None),
+        ];
+
+        let table_stats = mapper.map_column_statistics(&file_stats)?;
+        assert_eq!(table_stats.len(), 1);
+        verify_column_statistics(
+            &table_stats[0],
+            Some(0),
+            Some(100),
+            Some(ScalarValue::Int32(Some(1))),
+            Some(ScalarValue::Int32(Some(100))),
+            Some(ScalarValue::Int32(Some(5100))),
+        );
+        let missing_stats = mapper.map_column_statistics(&[])?;
+        assert_eq!(missing_stats.len(), 1);
+        assert_eq!(missing_stats[0], ColumnStatistics::new_unknown());
+        Ok(())
+    }
+
+    fn create_test_schemas_with_nested_fields() -> (SchemaRef, SchemaRef) {
+        let file_schema = Arc::new(Schema::new(vec![Field::new(
+            "info",
+            DataType::Struct(
+                vec![
+                    Field::new("location", DataType::Utf8, true),
+                    Field::new(
+                        "timestamp_utc",
+                        DataType::Timestamp(TimeUnit::Millisecond, 
Some("UTC".into())),
+                        true,
+                    ),
+                ]
+                .into(),
+            ),
+            true,
+        )]));
+
+        let table_schema = Arc::new(Schema::new(vec![Field::new(
+            "info",
+            DataType::Struct(
+                vec![
+                    Field::new("location", DataType::Utf8, true),
+                    Field::new(
+                        "timestamp_utc",
+                        DataType::Timestamp(TimeUnit::Millisecond, 
Some("UTC".into())),
+                        true,
+                    ),
+                    Field::new(
+                        "reason",
+                        DataType::Struct(
+                            vec![
+                                Field::new("_level", DataType::Float64, true),
+                                Field::new(
+                                    "details",
+                                    DataType::Struct(
+                                        vec![
+                                            Field::new("rurl", DataType::Utf8, 
true),
+                                            Field::new("s", DataType::Float64, 
true),
+                                            Field::new("t", DataType::Utf8, 
true),
+                                        ]
+                                        .into(),
+                                    ),
+                                    true,
+                                ),
+                            ]
+                            .into(),
+                        ),
+                        true,
+                    ),
+                ]
+                .into(),
+            ),
+            true,
+        )]));
+
+        (file_schema, table_schema)
+    }
+
+    fn create_test_batch_with_struct_data(
+        file_schema: &SchemaRef,
+    ) -> Result<RecordBatch> {
+        let mut location_builder = StringBuilder::new();
+        location_builder.append_value("San Francisco");
+        location_builder.append_value("New York");
+
+        let timestamp_array = TimestampMillisecondArray::from(vec![
+            Some(1640995200000),
+            Some(1641081600000),
+        ]);
+
+        let timestamp_type =
+            DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into()));
+        let timestamp_array = cast(&timestamp_array, &timestamp_type)?;
+
+        let info_struct = StructArray::from(vec![
+            (
+                Arc::new(Field::new("location", DataType::Utf8, true)),
+                Arc::new(location_builder.finish()) as ArrayRef,
+            ),
+            (
+                Arc::new(Field::new("timestamp_utc", timestamp_type, true)),
+                timestamp_array,
+            ),
+        ]);
+
+        Ok(RecordBatch::try_new(
+            Arc::clone(file_schema),
+            vec![Arc::new(info_struct)],
+        )?)
+    }
+
+    fn verify_adapted_batch_with_nested_fields(
+        mapped_batch: &RecordBatch,
+        table_schema: &SchemaRef,
+    ) -> Result<()> {
+        assert_eq!(mapped_batch.schema(), *table_schema);
+        assert_eq!(mapped_batch.num_rows(), 2);
+
+        let info_col = mapped_batch.column(0);
+        let info_array = info_col
+            .as_any()
+            .downcast_ref::<StructArray>()
+            .expect("Expected info column to be a StructArray");
+
+        verify_preserved_fields(info_array)?;
+        verify_reason_field_structure(info_array)?;
+        Ok(())
+    }
+
+    fn verify_preserved_fields(info_array: &StructArray) -> Result<()> {
+        let location_col = info_array
+            .column_by_name("location")
+            .expect("Expected location field in struct");
+        let location_array = location_col
+            .as_any()
+            .downcast_ref::<arrow::array::StringArray>()
+            .expect("Expected location to be a StringArray");
+        assert_eq!(location_array.value(0), "San Francisco");
+        assert_eq!(location_array.value(1), "New York");
+
+        let timestamp_col = info_array
+            .column_by_name("timestamp_utc")
+            .expect("Expected timestamp_utc field in struct");
+        let timestamp_array = timestamp_col
+            .as_any()
+            .downcast_ref::<TimestampMillisecondArray>()
+            .expect("Expected timestamp_utc to be a 
TimestampMillisecondArray");
+        assert_eq!(timestamp_array.value(0), 1640995200000);
+        assert_eq!(timestamp_array.value(1), 1641081600000);
+        Ok(())
+    }
+
+    fn verify_reason_field_structure(info_array: &StructArray) -> Result<()> {
+        let reason_col = info_array
+            .column_by_name("reason")
+            .expect("Expected reason field in struct");
+        let reason_array = reason_col
+            .as_any()
+            .downcast_ref::<StructArray>()
+            .expect("Expected reason to be a StructArray");
+        assert_eq!(reason_array.fields().len(), 2);
+        assert!(reason_array.column_by_name("_level").is_some());
+        assert!(reason_array.column_by_name("details").is_some());
+
+        let details_col = reason_array
+            .column_by_name("details")
+            .expect("Expected details field in reason struct");
+        let details_array = details_col
+            .as_any()
+            .downcast_ref::<StructArray>()
+            .expect("Expected details to be a StructArray");
+        assert_eq!(details_array.fields().len(), 3);
+        assert!(details_array.column_by_name("rurl").is_some());
+        assert!(details_array.column_by_name("s").is_some());
+        assert!(details_array.column_by_name("t").is_some());
+        for i in 0..2 {
+            assert!(reason_array.is_null(i), "reason field should be null");
+        }
+        Ok(())
+    }
+
+    fn verify_column_statistics(
+        stats: &ColumnStatistics,
+        expected_null_count: Option<usize>,
+        expected_distinct_count: Option<usize>,
+        expected_min: Option<ScalarValue>,
+        expected_max: Option<ScalarValue>,
+        expected_sum: Option<ScalarValue>,
+    ) {
+        if let Some(count) = expected_null_count {
+            assert_eq!(
+                stats.null_count,
+                Precision::Exact(count),
+                "Null count should match expected value"
+            );
+        }
+        if let Some(count) = expected_distinct_count {
+            assert_eq!(
+                stats.distinct_count,
+                Precision::Exact(count),
+                "Distinct count should match expected value"
+            );
+        }
+        if let Some(min) = expected_min {
+            assert_eq!(
+                stats.min_value,
+                Precision::Exact(min),
+                "Min value should match expected value"
+            );
+        }
+        if let Some(max) = expected_max {
+            assert_eq!(
+                stats.max_value,
+                Precision::Exact(max),
+                "Max value should match expected value"
+            );
+        }
+        if let Some(sum) = expected_sum {
+            assert_eq!(
+                stats.sum_value,
+                Precision::Exact(sum),
+                "Sum value should match expected value"
+            );
+        }
+    }
+
+    fn create_test_column_statistics(
+        null_count: usize,
+        distinct_count: usize,
+        min_value: Option<ScalarValue>,
+        max_value: Option<ScalarValue>,
+        sum_value: Option<ScalarValue>,
+    ) -> ColumnStatistics {
+        ColumnStatistics {
+            null_count: Precision::Exact(null_count),
+            distinct_count: Precision::Exact(distinct_count),
+            min_value: min_value.map_or_else(|| Precision::Absent, 
Precision::Exact),
+            max_value: max_value.map_or_else(|| Precision::Absent, 
Precision::Exact),
+            sum_value: sum_value.map_or_else(|| Precision::Absent, 
Precision::Exact),
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org


Reply via email to