This is an automated email from the ASF dual-hosted git repository.

blaginin pushed a commit to branch annarose/dict-coercion
in repository https://gitbox.apache.org/repos/asf/datafusion-sandbox.git

commit 639971a3f836025051d7b4168625ad3603674278
Author: kosiew <[email protected]>
AuthorDate: Thu Feb 5 22:37:13 2026 +0800

    Refactor schema rewriter: remove lifetimes, extract column/cast helpers, 
add mismatch coverage (#20166)
    
    ## Which issue does this PR close?
    
    * Closes #20161.
    
    ## Rationale for this change
    
    This change is a focused refactor of the `PhysicalExprAdapter` schema
    rewriter to improve readability and maintainability while preserving
    behavior.
    
    Key motivations:
    
    * Reduce complexity from explicit lifetimes by storing schema references
    as `SchemaRef`.
    * Make column/index/type handling easier to follow by extracting helper
    functions.
    * Strengthen the test suite to ensure refactors do not alter adapter
    output.
    
    ## What changes are included in this PR?
    
    * Refactored `DefaultPhysicalExprAdapterRewriter` to own `SchemaRef`
    values instead of borrowing `&Schema`.
    
      * Simplifies construction and avoids lifetime plumbing.
    * Simplified column rewrite logic by:
    
    * Early-exiting when both the physical index and data type already
    match.
    * Extracting `resolve_column` to handle physical index/name resolution.
    * Extracting `create_cast_column_expr` to validate cast compatibility
    (including nested structs) and build `CastColumnExpr`.
    * Minor cleanups in struct compatibility validation and field selection
    to ensure the cast checks are performed against the *actual* physical
    field resolved by the final column index.
    * Test updates and additions:
    
    * Simplified construction of expected struct `Field`s in tests for
    clarity.
    * Added `test_rewrite_column_index_and_type_mismatch` to validate the
    combined case where the logical column index differs from the physical
    schema *and* the data type requires casting.
    
    ## Are these changes tested?
    
    Yes.
    
    * Existing unit tests continue to pass.
    * Added a new unit test to cover the index-and-type mismatch scenario
    for column rewriting, asserting:
    
      * The inner `Column` points to the correct physical index.
    * The resulting expression is a `CastColumnExpr` producing the expected
    logical type.
    
    ## Are there any user-facing changes?
    
    No.
    
    * This is a refactor/cleanup intended to preserve existing behavior.
    * No public API changes, no behavioral changes expected in query
    results.
    
    ## LLM-generated code disclosure
    
    This PR includes LLM-generated code and comments. All LLM-generated
    content has been manually reviewed and tested.
---
 .../physical-expr-adapter/src/schema_rewriter.rs   | 177 +++++++++++++++------
 1 file changed, 127 insertions(+), 50 deletions(-)

diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs 
b/datafusion/physical-expr-adapter/src/schema_rewriter.rs
index 7b94ed263..5a9ee8502 100644
--- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs
+++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs
@@ -26,7 +26,7 @@ use std::sync::Arc;
 
 use arrow::array::RecordBatch;
 use arrow::compute::can_cast_types;
-use arrow::datatypes::{DataType, Schema, SchemaRef};
+use arrow::datatypes::{DataType, Field, SchemaRef};
 use datafusion_common::{
     Result, ScalarValue, exec_err,
     nested_struct::validate_struct_compatibility,
@@ -260,20 +260,20 @@ impl DefaultPhysicalExprAdapter {
 impl PhysicalExprAdapter for DefaultPhysicalExprAdapter {
     fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn 
PhysicalExpr>> {
         let rewriter = DefaultPhysicalExprAdapterRewriter {
-            logical_file_schema: &self.logical_file_schema,
-            physical_file_schema: &self.physical_file_schema,
+            logical_file_schema: Arc::clone(&self.logical_file_schema),
+            physical_file_schema: Arc::clone(&self.physical_file_schema),
         };
         expr.transform(|expr| rewriter.rewrite_expr(Arc::clone(&expr)))
             .data()
     }
 }
 
-struct DefaultPhysicalExprAdapterRewriter<'a> {
-    logical_file_schema: &'a Schema,
-    physical_file_schema: &'a Schema,
+struct DefaultPhysicalExprAdapterRewriter {
+    logical_file_schema: SchemaRef,
+    physical_file_schema: SchemaRef,
 }
 
-impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
+impl DefaultPhysicalExprAdapterRewriter {
     fn rewrite_expr(
         &self,
         expr: Arc<dyn PhysicalExpr>,
@@ -421,18 +421,13 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
         };
         let physical_field = 
self.physical_file_schema.field(physical_column_index);
 
-        let column = match (
-            column.index() == physical_column_index,
-            logical_field.data_type() == physical_field.data_type(),
-        ) {
-            // If the column index matches and the data types match, we can 
use the column as is
-            (true, true) => return Ok(Transformed::no(expr)),
-            // If the indexes or data types do not match, we need to create a 
new column expression
-            (true, _) => column.clone(),
-            (false, _) => {
-                Column::new_with_schema(logical_field.name(), 
self.physical_file_schema)?
-            }
-        };
+        if column.index() == physical_column_index
+            && logical_field.data_type() == physical_field.data_type()
+        {
+            return Ok(Transformed::no(expr));
+        }
+
+        let column = self.resolve_column(column, physical_column_index)?;
 
         if logical_field.data_type() == physical_field.data_type() {
             // If the data types match, we can use the column as is
@@ -443,24 +438,60 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
         // TODO: add optimization to move the cast from the column to literal 
expressions in the case of `col = 123`
         // since that's much cheaper to evalaute.
         // See 
https://github.com/apache/datafusion/issues/15780#issuecomment-2824716928
-        //
+        self.create_cast_column_expr(column, logical_field)
+    }
+
+    /// Resolves a column expression, handling index and type mismatches.
+    ///
+    /// Returns the appropriate Column expression when the column's index or 
data type
+    /// don't match the physical schema. Assumes that the early-exit case 
(both index
+    /// and type match) has already been checked by the caller.
+    fn resolve_column(
+        &self,
+        column: &Column,
+        physical_column_index: usize,
+    ) -> Result<Column> {
+        if column.index() == physical_column_index {
+            Ok(column.clone())
+        } else {
+            Column::new_with_schema(column.name(), 
self.physical_file_schema.as_ref())
+        }
+    }
+
+    /// Validates type compatibility and creates a CastColumnExpr if needed.
+    ///
+    /// Checks whether the physical field can be cast to the logical field 
type,
+    /// handling both struct and scalar types. Returns a CastColumnExpr with 
the
+    /// appropriate configuration.
+    fn create_cast_column_expr(
+        &self,
+        column: Column,
+        logical_field: &Field,
+    ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
+        let actual_physical_field = 
self.physical_file_schema.field(column.index());
+
         // For struct types, use validate_struct_compatibility which handles:
         // - Missing fields in source (filled with nulls)
         // - Extra fields in source (ignored)
         // - Recursive validation of nested structs
         // For non-struct types, use Arrow's can_cast_types
-        match (physical_field.data_type(), logical_field.data_type()) {
+        match (actual_physical_field.data_type(), logical_field.data_type()) {
             (DataType::Struct(physical_fields), 
DataType::Struct(logical_fields)) => {
-                validate_struct_compatibility(physical_fields, 
logical_fields)?;
+                validate_struct_compatibility(
+                    physical_fields.as_ref(),
+                    logical_fields.as_ref(),
+                )?;
             }
             _ => {
-                let is_compatible =
-                    can_cast_types(physical_field.data_type(), 
logical_field.data_type());
+                let is_compatible = can_cast_types(
+                    actual_physical_field.data_type(),
+                    logical_field.data_type(),
+                );
                 if !is_compatible {
                     return exec_err!(
                         "Cannot cast column '{}' from '{}' (physical data 
type) to '{}' (logical data type)",
                         column.name(),
-                        physical_field.data_type(),
+                        actual_physical_field.data_type(),
                         logical_field.data_type()
                     );
                 }
@@ -469,7 +500,7 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
 
         let cast_expr = Arc::new(CastColumnExpr::new(
             Arc::new(column),
-            Arc::new(physical_field.clone()),
+            Arc::new(actual_physical_field.clone()),
             Arc::new(logical_field.clone()),
             None,
         ));
@@ -777,30 +808,32 @@ mod tests {
 
         let result = adapter.rewrite(column_expr).unwrap();
 
+        let physical_struct_fields: Fields = vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, true),
+        ]
+        .into();
+        let physical_field = Arc::new(Field::new(
+            "data",
+            DataType::Struct(physical_struct_fields),
+            false,
+        ));
+
+        let logical_struct_fields: Fields = vec![
+            Field::new("id", DataType::Int64, false),
+            Field::new("name", DataType::Utf8View, true),
+        ]
+        .into();
+        let logical_field = Arc::new(Field::new(
+            "data",
+            DataType::Struct(logical_struct_fields),
+            false,
+        ));
+
         let expected = Arc::new(CastColumnExpr::new(
             Arc::new(Column::new("data", 0)),
-            Arc::new(Field::new(
-                "data",
-                DataType::Struct(
-                    vec![
-                        Field::new("id", DataType::Int32, false),
-                        Field::new("name", DataType::Utf8, true),
-                    ]
-                    .into(),
-                ),
-                false,
-            )),
-            Arc::new(Field::new(
-                "data",
-                DataType::Struct(
-                    vec![
-                        Field::new("id", DataType::Int64, false),
-                        Field::new("name", DataType::Utf8View, true),
-                    ]
-                    .into(),
-                ),
-                false,
-            )),
+            physical_field,
+            logical_field,
             None,
         )) as Arc<dyn PhysicalExpr>;
 
@@ -1193,8 +1226,8 @@ mod tests {
         )]);
 
         let rewriter = DefaultPhysicalExprAdapterRewriter {
-            logical_file_schema: &logical_schema,
-            physical_file_schema: &physical_schema,
+            logical_file_schema: Arc::new(logical_schema),
+            physical_file_schema: Arc::new(physical_schema),
         };
 
         // Test that when a field exists in physical schema, it returns None
@@ -1415,4 +1448,48 @@ mod tests {
         assert!(format!("{:?}", adapter1).contains("BatchAdapter"));
         assert!(format!("{:?}", adapter2).contains("BatchAdapter"));
     }
+
+    #[test]
+    fn test_rewrite_column_index_and_type_mismatch() {
+        let physical_schema = Schema::new(vec![
+            Field::new("b", DataType::Utf8, true),
+            Field::new("a", DataType::Int32, false), // Index 1
+        ]);
+
+        let logical_schema = Schema::new(vec![
+            Field::new("a", DataType::Int64, false), // Index 0, Different Type
+            Field::new("b", DataType::Utf8, true),
+        ]);
+
+        let factory = DefaultPhysicalExprAdapterFactory;
+        let adapter = factory
+            .create(Arc::new(logical_schema), Arc::new(physical_schema))
+            .unwrap();
+
+        // Logical column "a" is at index 0
+        let column_expr = Arc::new(Column::new("a", 0));
+
+        let result = adapter.rewrite(column_expr).unwrap();
+
+        // Should be a CastColumnExpr
+        let cast_expr = result
+            .as_any()
+            .downcast_ref::<CastColumnExpr>()
+            .expect("Expected CastColumnExpr");
+
+        // Verify the inner column points to the correct physical index (1)
+        let inner_col = cast_expr
+            .expr()
+            .as_any()
+            .downcast_ref::<Column>()
+            .expect("Expected inner Column");
+        assert_eq!(inner_col.name(), "a");
+        assert_eq!(inner_col.index(), 1); // Physical index is 1
+
+        // Verify cast types
+        assert_eq!(
+            cast_expr.data_type(&Schema::empty()).unwrap(),
+            DataType::Int64
+        );
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to