This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new bf7859e5d9 Schema adapter helper (#16108)
bf7859e5d9 is described below

commit bf7859e5d9dbdc260674f5333a5cafa9c6e7bc12
Author: kosiew <kos...@gmail.com>
AuthorDate: Thu Jun 5 04:23:07 2025 +0800

    Schema adapter helper (#16108)
    
    * Add field casting utility functions and refactor schema mapping logic
    
    * Fix tests for field casting and schema mapping functionality
    
    * refactor: simplify SchemaMapping instantiation in DefaultSchemaAdapter
    
    * refactor: improve documentation for create_field_mapping and 
SchemaMapping::new functions
    
    * test: rename schema mapping test and add happy path scenario
    
    * trigger ci
    
    ---------
    
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
---
 datafusion/datasource/src/schema_adapter.rs | 292 +++++++++++++++++++++++++---
 1 file changed, 265 insertions(+), 27 deletions(-)

diff --git a/datafusion/datasource/src/schema_adapter.rs 
b/datafusion/datasource/src/schema_adapter.rs
index bacec7f4f9..519be97a81 100644
--- a/datafusion/datasource/src/schema_adapter.rs
+++ b/datafusion/datasource/src/schema_adapter.rs
@@ -23,7 +23,7 @@
 
 use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions};
 use arrow::compute::{can_cast_types, cast};
-use arrow::datatypes::{Schema, SchemaRef};
+use arrow::datatypes::{Field, Schema, SchemaRef};
 use datafusion_common::{plan_err, ColumnStatistics};
 use std::fmt::Debug;
 use std::sync::Arc;
@@ -225,6 +225,25 @@ pub(crate) struct DefaultSchemaAdapter {
     projected_table_schema: SchemaRef,
 }
 
+/// Checks if a file field can be cast to a table field
+///
+/// Returns Ok(true) if casting is possible, or an error explaining why 
casting is not possible
+pub(crate) fn can_cast_field(
+    file_field: &Field,
+    table_field: &Field,
+) -> datafusion_common::Result<bool> {
+    if can_cast_types(file_field.data_type(), table_field.data_type()) {
+        Ok(true)
+    } else {
+        plan_err!(
+            "Cannot cast file schema field {} of type {:?} to table schema 
field of type {:?}",
+            file_field.name(),
+            file_field.data_type(),
+            table_field.data_type()
+        )
+    }
+}
+
 impl SchemaAdapter for DefaultSchemaAdapter {
     /// Map a column index in the table schema to a column index in a 
particular
     /// file schema
@@ -248,40 +267,53 @@ impl SchemaAdapter for DefaultSchemaAdapter {
         &self,
         file_schema: &Schema,
     ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
-        let mut projection = Vec::with_capacity(file_schema.fields().len());
-        let mut field_mappings = vec![None; 
self.projected_table_schema.fields().len()];
-
-        for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
-            if let Some((table_idx, table_field)) =
-                self.projected_table_schema.fields().find(file_field.name())
-            {
-                match can_cast_types(file_field.data_type(), 
table_field.data_type()) {
-                    true => {
-                        field_mappings[table_idx] = Some(projection.len());
-                        projection.push(file_idx);
-                    }
-                    false => {
-                        return plan_err!(
-                            "Cannot cast file schema field {} of type {:?} to 
table schema field of type {:?}",
-                            file_field.name(),
-                            file_field.data_type(),
-                            table_field.data_type()
-                        )
-                    }
-                }
-            }
-        }
+        let (field_mappings, projection) = create_field_mapping(
+            file_schema,
+            &self.projected_table_schema,
+            can_cast_field,
+        )?;
 
         Ok((
-            Arc::new(SchemaMapping {
-                projected_table_schema: 
Arc::clone(&self.projected_table_schema),
+            Arc::new(SchemaMapping::new(
+                Arc::clone(&self.projected_table_schema),
                 field_mappings,
-            }),
+            )),
             projection,
         ))
     }
 }
 
+/// Helper function that creates field mappings between file schema and table 
schema
+///
+/// Maps columns from the file schema to their corresponding positions in the 
table schema,
+/// applying type compatibility checking via the provided predicate function.
+///
+/// Returns field mappings (for column reordering) and a projection (for field 
selection).
+pub(crate) fn create_field_mapping<F>(
+    file_schema: &Schema,
+    projected_table_schema: &SchemaRef,
+    can_map_field: F,
+) -> datafusion_common::Result<(Vec<Option<usize>>, Vec<usize>)>
+where
+    F: Fn(&Field, &Field) -> datafusion_common::Result<bool>,
+{
+    let mut projection = Vec::with_capacity(file_schema.fields().len());
+    let mut field_mappings = vec![None; projected_table_schema.fields().len()];
+
+    for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
+        if let Some((table_idx, table_field)) =
+            projected_table_schema.fields().find(file_field.name())
+        {
+            if can_map_field(file_field, table_field)? {
+                field_mappings[table_idx] = Some(projection.len());
+                projection.push(file_idx);
+            }
+        }
+    }
+
+    Ok((field_mappings, projection))
+}
+
 /// The SchemaMapping struct holds a mapping from the file schema to the table
 /// schema and any necessary type conversions.
 ///
@@ -304,6 +336,21 @@ pub struct SchemaMapping {
     field_mappings: Vec<Option<usize>>,
 }
 
+impl SchemaMapping {
+    /// Creates a new SchemaMapping instance
+    ///
+    /// Initializes the field mappings needed to transform file data to the 
projected table schema
+    pub fn new(
+        projected_table_schema: SchemaRef,
+        field_mappings: Vec<Option<usize>>,
+    ) -> Self {
+        Self {
+            projected_table_schema,
+            field_mappings,
+        }
+    }
+}
+
 impl SchemaMapper for SchemaMapping {
     /// Adapts a `RecordBatch` to match the `projected_table_schema` using the 
stored mapping and
     /// conversions.
@@ -462,4 +509,195 @@ mod tests {
         assert_eq!(table_col_stats[0], ColumnStatistics::new_unknown(),);
         assert_eq!(table_col_stats[1], ColumnStatistics::new_unknown(),);
     }
+
+    #[test]
+    fn test_can_cast_field() {
+        // Same type should work
+        let from_field = Field::new("col", DataType::Int32, true);
+        let to_field = Field::new("col", DataType::Int32, true);
+        assert!(can_cast_field(&from_field, &to_field).unwrap());
+
+        // Casting Int32 to Float64 is allowed
+        let from_field = Field::new("col", DataType::Int32, true);
+        let to_field = Field::new("col", DataType::Float64, true);
+        assert!(can_cast_field(&from_field, &to_field).unwrap());
+
+        // Casting Float64 to Utf8 should work (converts to string)
+        let from_field = Field::new("col", DataType::Float64, true);
+        let to_field = Field::new("col", DataType::Utf8, true);
+        assert!(can_cast_field(&from_field, &to_field).unwrap());
+
+        // Binary to Utf8 is not supported - this is an example of a cast that 
should fail
+        // Note: We use Binary instead of Utf8->Int32 because Arrow actually 
supports that cast
+        let from_field = Field::new("col", DataType::Binary, true);
+        let to_field = Field::new("col", DataType::Decimal128(10, 2), true);
+        let result = can_cast_field(&from_field, &to_field);
+        assert!(result.is_err());
+        let error_msg = result.unwrap_err().to_string();
+        assert!(error_msg.contains("Cannot cast file schema field col"));
+    }
+
+    #[test]
+    fn test_create_field_mapping() {
+        // Define the table schema
+        let table_schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Utf8, true),
+            Field::new("c", DataType::Float64, true),
+        ]));
+
+        // Define file schema: different order, missing column c, and b has 
different type
+        let file_schema = Schema::new(vec![
+            Field::new("b", DataType::Float64, true), // Different type but 
castable to Utf8
+            Field::new("a", DataType::Int32, true),   // Same type
+            Field::new("d", DataType::Boolean, true), // Not in table schema
+        ]);
+
+        // Custom can_map_field function that allows all mappings for testing
+        let allow_all = |_: &Field, _: &Field| Ok(true);
+
+        // Test field mapping
+        let (field_mappings, projection) =
+            create_field_mapping(&file_schema, &table_schema, 
allow_all).unwrap();
+
+        // Expected:
+        // - field_mappings[0] (a) maps to projection[1]
+        // - field_mappings[1] (b) maps to projection[0]
+        // - field_mappings[2] (c) is None (not in file)
+        assert_eq!(field_mappings, vec![Some(1), Some(0), None]);
+        assert_eq!(projection, vec![0, 1]); // Projecting file columns b, a
+
+        // Test with a failing mapper
+        let fails_all = |_: &Field, _: &Field| Ok(false);
+        let (field_mappings, projection) =
+            create_field_mapping(&file_schema, &table_schema, 
fails_all).unwrap();
+
+        // Should have no mappings or projections if all cast checks fail
+        assert_eq!(field_mappings, vec![None, None, None]);
+        assert_eq!(projection, Vec::<usize>::new());
+
+        // Test with error-producing mapper
+        let error_mapper = |_: &Field, _: &Field| plan_err!("Test error");
+        let result = create_field_mapping(&file_schema, &table_schema, 
error_mapper);
+        assert!(result.is_err());
+        assert!(result.unwrap_err().to_string().contains("Test error"));
+    }
+
+    #[test]
+    fn test_schema_mapping_new() {
+        // Define the projected table schema
+        let projected_schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Utf8, true),
+        ]));
+
+        // Define field mappings from table to file
+        let field_mappings = vec![Some(1), Some(0)];
+
+        // Create SchemaMapping manually
+        let mapping =
+            SchemaMapping::new(Arc::clone(&projected_schema), 
field_mappings.clone());
+
+        // Check that fields were set correctly
+        assert_eq!(*mapping.projected_table_schema, *projected_schema);
+        assert_eq!(mapping.field_mappings, field_mappings);
+
+        // Test with a batch to ensure it works properly
+        let batch = RecordBatch::try_new(
+            Arc::new(Schema::new(vec![
+                Field::new("b_file", DataType::Utf8, true),
+                Field::new("a_file", DataType::Int32, true),
+            ])),
+            vec![
+                Arc::new(arrow::array::StringArray::from(vec!["hello", 
"world"])),
+                Arc::new(arrow::array::Int32Array::from(vec![1, 2])),
+            ],
+        )
+        .unwrap();
+
+        // Test that map_batch works with our manually created mapping
+        let mapped_batch = mapping.map_batch(batch).unwrap();
+
+        // Verify the mapped batch has the correct schema and data
+        assert_eq!(*mapped_batch.schema(), *projected_schema);
+        assert_eq!(mapped_batch.num_columns(), 2);
+        assert_eq!(mapped_batch.column(0).len(), 2); // a column
+        assert_eq!(mapped_batch.column(1).len(), 2); // b column
+    }
+
+    #[test]
+    fn test_map_schema_error_path() {
+        // Define the table schema
+        let table_schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Utf8, true),
+            Field::new("c", DataType::Decimal128(10, 2), true), // Use Decimal 
which has stricter cast rules
+        ]));
+
+        // Define file schema with incompatible type for column c
+        let file_schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Float64, true), // Different but castable
+            Field::new("c", DataType::Binary, true),  // Not castable to 
Decimal128
+        ]);
+
+        // Create DefaultSchemaAdapter
+        let adapter = DefaultSchemaAdapter {
+            projected_table_schema: Arc::clone(&table_schema),
+        };
+
+        // map_schema should error due to incompatible types
+        let result = adapter.map_schema(&file_schema);
+        assert!(result.is_err());
+        let error_msg = result.unwrap_err().to_string();
+        assert!(error_msg.contains("Cannot cast file schema field c"));
+    }
+
+    #[test]
+    fn test_map_schema_happy_path() {
+        // Define the table schema
+        let table_schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Utf8, true),
+            Field::new("c", DataType::Decimal128(10, 2), true),
+        ]));
+
+        // Create DefaultSchemaAdapter
+        let adapter = DefaultSchemaAdapter {
+            projected_table_schema: Arc::clone(&table_schema),
+        };
+
+        // Define compatible file schema (missing column c)
+        let compatible_file_schema = Schema::new(vec![
+            Field::new("a", DataType::Int64, true), // Can be cast to Int32
+            Field::new("b", DataType::Float64, true), // Can be cast to Utf8
+        ]);
+
+        // Test successful schema mapping
+        let (mapper, projection) = 
adapter.map_schema(&compatible_file_schema).unwrap();
+
+        // Verify field_mappings and projection created correctly
+        assert_eq!(projection, vec![0, 1]); // Projecting a and b
+
+        // Verify the SchemaMapping works with actual data
+        let file_batch = RecordBatch::try_new(
+            Arc::new(compatible_file_schema.clone()),
+            vec![
+                Arc::new(arrow::array::Int64Array::from(vec![100, 200])),
+                Arc::new(arrow::array::Float64Array::from(vec![1.5, 2.5])),
+            ],
+        )
+        .unwrap();
+
+        let mapped_batch = mapper.map_batch(file_batch).unwrap();
+
+        // Verify correct schema mapping
+        assert_eq!(*mapped_batch.schema(), *table_schema);
+        assert_eq!(mapped_batch.num_columns(), 3); // a, b, c
+
+        // Column c should be null since it wasn't in the file schema
+        let c_array = mapped_batch.column(2);
+        assert_eq!(c_array.len(), 2);
+        assert_eq!(c_array.null_count(), 2);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to