This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch df52
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/df52 by this push:
new 43c843b6e fix: [df52] schema pruning crash on complex nested types
(#3500)
43c843b6e is described below
commit 43c843b6e30e7013736cb59bcc69e25ccf94369e
Author: Andy Grove <[email protected]>
AuthorDate: Thu Feb 12 13:02:40 2026 -0700
fix: [df52] schema pruning crash on complex nested types (#3500)
* fix: [df52] schema pruning crash on complex nested types
When `data_schema` is provided but `projection_vector` is None (the
NativeBatchReader / native_iceberg_compat path), the base schema was
incorrectly set to the pruned `required_schema`. This caused DataFusion
to think the table had only the pruned columns, leading to column index
misalignment in PhysicalExprAdapter. For example, reading "friends" at
logical index 0 would map to physical index 0 ("id") instead of the
correct index 4.
Fix: when `data_schema` is provided without a `projection_vector`,
compute the projection by mapping required field names to their indices
in the full data schema. Also harden `wrap_all_type_mismatches` to use
name-based lookup for physical fields instead of positional index.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* fix: handle field ID mapping in projection computation
When computing a name-based projection from required_schema to
data_schema, fall back to using required_schema directly when not
all fields can be matched by name. This handles Parquet field ID
mapping where column names differ between the read schema and file
schema.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
---------
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
native/core/src/parquet/parquet_exec.rs | 52 +++++++++++++++++++++----------
native/core/src/parquet/schema_adapter.rs | 15 ++++++++-
2 files changed, 49 insertions(+), 18 deletions(-)
diff --git a/native/core/src/parquet/parquet_exec.rs
b/native/core/src/parquet/parquet_exec.rs
index 79c7e06c6..f4cc7bf9f 100644
--- a/native/core/src/parquet/parquet_exec.rs
+++ b/native/core/src/parquet/parquet_exec.rs
@@ -80,18 +80,40 @@ pub(crate) fn init_datasource_exec(
encryption_enabled,
);
- // dbg!(&required_schema, &data_schema);
-
- // Determine the schema to use for ParquetSource
- // // Use data_schema only if both data_schema and data_filters are set
- let base_schema = match (&data_schema, &projection_vector) {
- (Some(schema), Some(_)) => Arc::clone(schema),
- _ => Arc::clone(&required_schema),
+ // Determine the schema and projection to use for ParquetSource.
+ // When data_schema is provided, use it as the base schema so DataFusion
knows the full
+ // file schema. Compute a projection vector to select only the required
columns.
+ let (base_schema, projection) = match (&data_schema, &projection_vector) {
+ (Some(schema), Some(proj)) => (Arc::clone(schema), Some(proj.clone())),
+ (Some(schema), None) => {
+ // Compute projection: map required_schema field names to
data_schema indices.
+ // This is needed for schema pruning when the data_schema has more
columns than
+ // the required_schema.
+ let projection: Vec<usize> = required_schema
+ .fields()
+ .iter()
+ .filter_map(|req_field| {
+ schema.fields().iter().position(|data_field| {
+ if case_sensitive {
+ data_field.name() == req_field.name()
+ } else {
+ data_field.name().to_lowercase() ==
req_field.name().to_lowercase()
+ }
+ })
+ })
+ .collect();
+ // Only use data_schema + projection when all required fields were
found by name.
+ // When some fields can't be matched (e.g., Parquet field ID
mapping where names
+ // differ between required and data schemas), fall back to using
required_schema
+ // directly with no projection.
+ if projection.len() == required_schema.fields().len() {
+ (Arc::clone(schema), Some(projection))
+ } else {
+ (Arc::clone(&required_schema), None)
+ }
+ }
+ _ => (Arc::clone(&required_schema), None),
};
- //let base_schema = required_schema;
- // dbg!(&base_schema);
- // dbg!(&data_schema);
- // dbg!(&data_filters);
let partition_fields: Vec<_> = partition_schema
.iter()
.flat_map(|s| s.fields().iter())
@@ -100,13 +122,9 @@ pub(crate) fn init_datasource_exec(
let table_schema =
TableSchema::from_file_schema(base_schema).with_table_partition_cols(partition_fields);
- // dbg!(&table_schema);
-
let mut parquet_source =
ParquetSource::new(table_schema).with_table_parquet_options(table_parquet_options);
- // dbg!(&parquet_source);
-
// Create a conjunctive form of the vector because ParquetExecBuilder takes
// a single expression
if let Some(data_filters) = data_filters {
@@ -146,9 +164,9 @@ pub(crate) fn init_datasource_exec(
.with_file_groups(file_groups)
.with_expr_adapter(Some(expr_adapter_factory));
- if let Some(projection_vector) = projection_vector {
+ if let Some(projection) = projection {
file_scan_config_builder =
-
file_scan_config_builder.with_projection_indices(Some(projection_vector))?;
+
file_scan_config_builder.with_projection_indices(Some(projection))?;
}
let file_scan_config = file_scan_config_builder.build();
diff --git a/native/core/src/parquet/schema_adapter.rs
b/native/core/src/parquet/schema_adapter.rs
index 491f0a8e8..2874b6cbf 100644
--- a/native/core/src/parquet/schema_adapter.rs
+++ b/native/core/src/parquet/schema_adapter.rs
@@ -262,9 +262,22 @@ impl SparkPhysicalExprAdapter {
expr.transform(|e| {
if let Some(column) = e.as_any().downcast_ref::<Column>() {
let col_idx = column.index();
+ let col_name = column.name();
let logical_field =
self.logical_file_schema.fields().get(col_idx);
- let physical_field =
self.physical_file_schema.fields().get(col_idx);
+ // Look up physical field by name instead of index for
correctness
+ // when logical and physical schemas have different column
orderings
+ let physical_field = if self.parquet_options.case_sensitive {
+ self.physical_file_schema
+ .fields()
+ .iter()
+ .find(|f| f.name() == col_name)
+ } else {
+ self.physical_file_schema
+ .fields()
+ .iter()
+ .find(|f| f.name().to_lowercase() ==
col_name.to_lowercase())
+ };
if let (Some(logical_field), Some(physical_field)) =
(logical_field, physical_field)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]