This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch df52
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/df52 by this push:
     new 43c843b6e fix: [df52] schema pruning crash on complex nested types 
(#3500)
43c843b6e is described below

commit 43c843b6e30e7013736cb59bcc69e25ccf94369e
Author: Andy Grove <[email protected]>
AuthorDate: Thu Feb 12 13:02:40 2026 -0700

    fix: [df52] schema pruning crash on complex nested types (#3500)
    
    * fix: [df52] schema pruning crash on complex nested types
    
    When `data_schema` is provided but `projection_vector` is None (the
    NativeBatchReader / native_iceberg_compat path), the base schema was
    incorrectly set to the pruned `required_schema`. This caused DataFusion
    to think the table had only the pruned columns, leading to column index
    misalignment in PhysicalExprAdapter. For example, reading "friends" at
    logical index 0 would map to physical index 0 ("id") instead of the
    correct index 4.
    
    Fix: when `data_schema` is provided without a `projection_vector`,
    compute the projection by mapping required field names to their indices
    in the full data schema. Also harden `wrap_all_type_mismatches` to use
    name-based lookup for physical fields instead of positional index.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * fix: handle field ID mapping in projection computation
    
    When computing a name-based projection from required_schema to
    data_schema, fall back to using required_schema directly when not
    all fields can be matched by name. This handles Parquet field ID
    mapping where column names differ between the read schema and file
    schema.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 native/core/src/parquet/parquet_exec.rs   | 52 +++++++++++++++++++++----------
 native/core/src/parquet/schema_adapter.rs | 15 ++++++++-
 2 files changed, 49 insertions(+), 18 deletions(-)

diff --git a/native/core/src/parquet/parquet_exec.rs 
b/native/core/src/parquet/parquet_exec.rs
index 79c7e06c6..f4cc7bf9f 100644
--- a/native/core/src/parquet/parquet_exec.rs
+++ b/native/core/src/parquet/parquet_exec.rs
@@ -80,18 +80,40 @@ pub(crate) fn init_datasource_exec(
         encryption_enabled,
     );
 
-    // dbg!(&required_schema, &data_schema);
-
-    // Determine the schema to use for ParquetSource
-    // // Use data_schema only if both data_schema and data_filters are set
-    let base_schema = match (&data_schema, &projection_vector) {
-        (Some(schema), Some(_)) => Arc::clone(schema),
-        _ => Arc::clone(&required_schema),
+    // Determine the schema and projection to use for ParquetSource.
+    // When data_schema is provided, use it as the base schema so DataFusion 
knows the full
+    // file schema. Compute a projection vector to select only the required 
columns.
+    let (base_schema, projection) = match (&data_schema, &projection_vector) {
+        (Some(schema), Some(proj)) => (Arc::clone(schema), Some(proj.clone())),
+        (Some(schema), None) => {
+            // Compute projection: map required_schema field names to 
data_schema indices.
+            // This is needed for schema pruning when the data_schema has more 
columns than
+            // the required_schema.
+            let projection: Vec<usize> = required_schema
+                .fields()
+                .iter()
+                .filter_map(|req_field| {
+                    schema.fields().iter().position(|data_field| {
+                        if case_sensitive {
+                            data_field.name() == req_field.name()
+                        } else {
+                            data_field.name().to_lowercase() == 
req_field.name().to_lowercase()
+                        }
+                    })
+                })
+                .collect();
+            // Only use data_schema + projection when all required fields were 
found by name.
+            // When some fields can't be matched (e.g., Parquet field ID 
mapping where names
+            // differ between required and data schemas), fall back to using 
required_schema
+            // directly with no projection.
+            if projection.len() == required_schema.fields().len() {
+                (Arc::clone(schema), Some(projection))
+            } else {
+                (Arc::clone(&required_schema), None)
+            }
+        }
+        _ => (Arc::clone(&required_schema), None),
     };
-    //let base_schema = required_schema;
-    // dbg!(&base_schema);
-    // dbg!(&data_schema);
-    // dbg!(&data_filters);
     let partition_fields: Vec<_> = partition_schema
         .iter()
         .flat_map(|s| s.fields().iter())
@@ -100,13 +122,9 @@ pub(crate) fn init_datasource_exec(
     let table_schema =
         
TableSchema::from_file_schema(base_schema).with_table_partition_cols(partition_fields);
 
-    // dbg!(&table_schema);
-
     let mut parquet_source =
         
ParquetSource::new(table_schema).with_table_parquet_options(table_parquet_options);
 
-    // dbg!(&parquet_source);
-
     // Create a conjunctive form of the vector because ParquetExecBuilder takes
     // a single expression
     if let Some(data_filters) = data_filters {
@@ -146,9 +164,9 @@ pub(crate) fn init_datasource_exec(
         .with_file_groups(file_groups)
         .with_expr_adapter(Some(expr_adapter_factory));
 
-    if let Some(projection_vector) = projection_vector {
+    if let Some(projection) = projection {
         file_scan_config_builder =
-            
file_scan_config_builder.with_projection_indices(Some(projection_vector))?;
+            
file_scan_config_builder.with_projection_indices(Some(projection))?;
     }
 
     let file_scan_config = file_scan_config_builder.build();
diff --git a/native/core/src/parquet/schema_adapter.rs 
b/native/core/src/parquet/schema_adapter.rs
index 491f0a8e8..2874b6cbf 100644
--- a/native/core/src/parquet/schema_adapter.rs
+++ b/native/core/src/parquet/schema_adapter.rs
@@ -262,9 +262,22 @@ impl SparkPhysicalExprAdapter {
         expr.transform(|e| {
             if let Some(column) = e.as_any().downcast_ref::<Column>() {
                 let col_idx = column.index();
+                let col_name = column.name();
 
                 let logical_field = 
self.logical_file_schema.fields().get(col_idx);
-                let physical_field = 
self.physical_file_schema.fields().get(col_idx);
+                // Look up physical field by name instead of index for 
correctness
+                // when logical and physical schemas have different column 
orderings
+                let physical_field = if self.parquet_options.case_sensitive {
+                    self.physical_file_schema
+                        .fields()
+                        .iter()
+                        .find(|f| f.name() == col_name)
+                } else {
+                    self.physical_file_schema
+                        .fields()
+                        .iter()
+                        .find(|f| f.name().to_lowercase() == 
col_name.to_lowercase())
+                };
 
                 if let (Some(logical_field), Some(physical_field)) = 
(logical_field, physical_field)
                 {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to