This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 05ba2d38 feat(reader): position-based column projection for Parquet 
files without field IDs (migrated tables) (#1777)
05ba2d38 is described below

commit 05ba2d38ccd93b7536addbc97fda89dcaccd49f8
Author: Matt Butrovich <[email protected]>
AuthorDate: Mon Nov 3 05:09:35 2025 -0500

    feat(reader): position-based column projection for Parquet files without 
field IDs (migrated tables) (#1777)
    
    ## What issue does this PR close?
    
    Partially address #1749.
    
    ## Rationale for this change
    
    **Background**: This issue was discovered when running Iceberg Java's
    test suite against our [experimental DataFusion Comet branch that uses
    iceberg-rust](https://github.com/apache/datafusion-comet/pull/2528).
    Many failures occurred in `TestMigrateTableAction.java`, which tests
    reading Parquet files from migrated tables (_e.g.,_ from Hive or Spark)
    that lack embedded field ID metadata.
    
    **Problem**: The Rust ArrowReader was unable to read these files, while
    Iceberg Java handles them using a position-based fallback where
    top-level field ID N maps to top-level Parquet column position N-1, and
    entire columns (including nested content) are projected.
    
    
    ## What changes are included in this PR?
    
    This PR implements position-based column projection for Parquet files
    without field IDs, enabling iceberg-rust to read migrated tables.
    
    **Solution**: Implemented fallback projection in
    `ArrowReader::get_arrow_projection_mask_fallback()` that matches Java's
    `ParquetSchemaUtil.pruneColumnsFallback()` behavior:
    - Detects Parquet files without field IDs by checking Arrow schema
    metadata
    - Maps top-level field IDs to top-level column positions (field IDs are
    1-indexed, positions are 0-indexed)
    - Uses `ProjectionMask::roots()` to project entire columns including
    nested content (structs, lists, maps)
    - Adds field ID metadata to the projected schema for
    `RecordBatchTransformer`
    - Supports schema evolution by allowing missing columns (filled with
    default values by `RecordBatchTransformer`)
    
    This implementation now matches Iceberg Java's behavior for reading
    migrated tables, enabling interoperability with Java-based tooling and
    workflows.
    
      ## Are these changes tested?
    
    Yes, comprehensive unit tests were added to verify the fallback path
    works correctly:
    - `test_read_parquet_file_without_field_ids` - Basic projection with
    primitive columns using position-based mapping
    - `test_read_parquet_without_field_ids_partial_projection` - Project
    subset of columns
    - `test_read_parquet_without_field_ids_schema_evolution` - Handle
    missing columns with NULL values
    - `test_read_parquet_without_field_ids_multiple_row_groups` - Verify
    behavior across row group boundaries
    - `test_read_parquet_without_field_ids_with_struct` - Project structs
    with nested fields (entire top-level column)
    - `test_read_parquet_without_field_ids_filter_eliminates_all_rows` -
    Comet saw a panic when all row groups were filtered out, this reproduces
    that scenario
    -
    `test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle`
    - Schema evolution with a column in the middle caused a panic at one
    point
    
    All tests verify that behavior matches Iceberg Java's
    `pruneColumnsFallback()` implementation in
    
    `/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java`.
    
    ---------
    
    Co-authored-by: Renjie Liu <[email protected]>
---
 crates/iceberg/src/arrow/delete_file_loader.rs |    1 +
 crates/iceberg/src/arrow/reader.rs             | 1007 +++++++++++++++++++++---
 2 files changed, 915 insertions(+), 93 deletions(-)

diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs 
b/crates/iceberg/src/arrow/delete_file_loader.rs
index 54fc4a58..f2c63cc5 100644
--- a/crates/iceberg/src/arrow/delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/delete_file_loader.rs
@@ -63,6 +63,7 @@ impl BasicDeleteFileLoader {
             data_file_path,
             self.file_io.clone(),
             false,
+            None,
         )
         .await?
         .build()?
diff --git a/crates/iceberg/src/arrow/reader.rs 
b/crates/iceberg/src/arrow/reader.rs
index e0894ad6..fed8f19c 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -183,22 +183,54 @@ impl ArrowReader {
 
         let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, 
task.schema.clone());
 
-        let mut record_batch_stream_builder = 
Self::create_parquet_record_batch_stream_builder(
+        // Migrated tables lack field IDs, requiring us to inspect the schema 
to choose
+        // between field-ID-based or position-based projection
+        let initial_stream_builder = 
Self::create_parquet_record_batch_stream_builder(
             &task.data_file_path,
             file_io.clone(),
             should_load_page_index,
+            None,
         )
         .await?;
 
-        // Create a projection mask for the batch stream to select which 
columns in the
-        // Parquet file that we want in the response
+        // Parquet files from Hive/Spark migrations lack field IDs in their 
metadata
+        let missing_field_ids = initial_stream_builder
+            .schema()
+            .fields()
+            .iter()
+            .next()
+            .is_some_and(|f| 
f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
+
+        // Adding position-based fallback IDs at schema level (not per-batch) 
enables projection
+        // on files that lack embedded field IDs. We recreate the builder to 
apply the modified schema.
+        let mut record_batch_stream_builder = if missing_field_ids {
+            let arrow_schema =
+                
add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema());
+            let options = ArrowReaderOptions::new().with_schema(arrow_schema);
+
+            Self::create_parquet_record_batch_stream_builder(
+                &task.data_file_path,
+                file_io.clone(),
+                should_load_page_index,
+                Some(options),
+            )
+            .await?
+        } else {
+            initial_stream_builder
+        };
+
+        // Fallback IDs don't match Parquet's embedded field IDs (since they 
don't exist),
+        // so we must use position-based projection instead of field-ID 
matching
         let projection_mask = Self::get_arrow_projection_mask(
             &task.project_field_ids,
             &task.schema,
             record_batch_stream_builder.parquet_schema(),
             record_batch_stream_builder.schema(),
+            missing_field_ids, // Whether to use position-based (true) or 
field-ID-based (false) projection
         )?;
-        record_batch_stream_builder = 
record_batch_stream_builder.with_projection(projection_mask);
+
+        record_batch_stream_builder =
+            
record_batch_stream_builder.with_projection(projection_mask.clone());
 
         // RecordBatchTransformer performs any transformations required on the 
RecordBatches
         // that come back from the file, such as type promotion, default 
column insertion
@@ -353,6 +385,7 @@ impl ArrowReader {
         data_file_path: &str,
         file_io: FileIO,
         should_load_page_index: bool,
+        arrow_reader_options: Option<ArrowReaderOptions>,
     ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead 
+ Sized>>> {
         // Get the metadata for the Parquet file we need to read and build
         // a reader for the data within
@@ -365,11 +398,9 @@ impl ArrowReader {
             .with_preload_page_index(should_load_page_index);
 
         // Create the record batch stream builder, which wraps the parquet 
file reader
-        let record_batch_stream_builder = 
ParquetRecordBatchStreamBuilder::new_with_options(
-            parquet_file_reader,
-            ArrowReaderOptions::new(),
-        )
-        .await?;
+        let options = arrow_reader_options.unwrap_or_default();
+        let record_batch_stream_builder =
+            
ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, 
options).await?;
         Ok(record_batch_stream_builder)
     }
 
@@ -503,13 +534,18 @@ impl ArrowReader {
         visit(&mut collector, predicate)?;
 
         let iceberg_field_ids = collector.field_ids();
-        let field_id_map = build_field_id_map(parquet_schema)?;
+
+        // Without embedded field IDs, we fall back to position-based mapping 
for compatibility
+        let field_id_map = match build_field_id_map(parquet_schema)? {
+            Some(map) => map,
+            None => build_fallback_field_id_map(parquet_schema),
+        };
 
         Ok((iceberg_field_ids, field_id_map))
     }
 
-    /// Insert the leaf field id into the field_ids using for projection.
-    /// For nested type, it will recursively insert the leaf field id.
+    /// Recursively extract leaf field IDs because Parquet projection works at 
the leaf column level.
+    /// Nested types (struct/list/map) are flattened in Parquet's columnar 
format.
     fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
         match field.field_type.as_ref() {
             Type::Primitive(_) => {
@@ -535,6 +571,7 @@ impl ArrowReader {
         iceberg_schema_of_task: &Schema,
         parquet_schema: &SchemaDescriptor,
         arrow_schema: &ArrowSchemaRef,
+        use_fallback: bool, // Whether file lacks embedded field IDs (e.g., 
migrated from Hive/Spark)
     ) -> Result<ProjectionMask> {
         fn type_promotion_is_valid(
             file_type: Option<&PrimitiveType>,
@@ -560,83 +597,132 @@ impl ArrowReader {
             }
         }
 
-        let mut leaf_field_ids = vec![];
-        for field_id in field_ids {
-            let field = iceberg_schema_of_task.field_by_id(*field_id);
-            if let Some(field) = field {
-                Self::include_leaf_field_id(field, &mut leaf_field_ids);
-            }
+        if field_ids.is_empty() {
+            return Ok(ProjectionMask::all());
         }
 
-        if leaf_field_ids.is_empty() {
-            Ok(ProjectionMask::all())
+        if use_fallback {
+            // Position-based projection necessary because file lacks embedded 
field IDs
+            Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema)
         } else {
-            // Build the map between field id and column index in Parquet 
schema.
-            let mut column_map = HashMap::new();
-
-            let fields = arrow_schema.fields();
-            // Pre-project only the fields that have been selected, possibly 
avoiding converting
-            // some Arrow types that are not yet supported.
-            let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
-            let projected_arrow_schema = ArrowSchema::new_with_metadata(
-                fields.filter_leaves(|_, f| {
-                    f.metadata()
-                        .get(PARQUET_FIELD_ID_META_KEY)
-                        .and_then(|field_id| i32::from_str(field_id).ok())
-                        .is_some_and(|field_id| {
-                            projected_fields.insert((*f).clone(), field_id);
-                            leaf_field_ids.contains(&field_id)
-                        })
-                }),
-                arrow_schema.metadata().clone(),
-            );
-            let iceberg_schema = 
arrow_schema_to_schema(&projected_arrow_schema)?;
-
-            fields.filter_leaves(|idx, field| {
-                let Some(field_id) = projected_fields.get(field).cloned() else 
{
-                    return false;
-                };
+            // Field-ID-based projection using embedded field IDs from Parquet 
metadata
+
+            // Parquet's columnar format requires leaf-level (not top-level 
struct/list/map) projection
+            let mut leaf_field_ids = vec![];
+            for field_id in field_ids {
+                let field = iceberg_schema_of_task.field_by_id(*field_id);
+                if let Some(field) = field {
+                    Self::include_leaf_field_id(field, &mut leaf_field_ids);
+                }
+            }
+
+            Self::get_arrow_projection_mask_with_field_ids(
+                &leaf_field_ids,
+                iceberg_schema_of_task,
+                parquet_schema,
+                arrow_schema,
+                type_promotion_is_valid,
+            )
+        }
+    }
+
+    /// Standard projection using embedded field IDs from Parquet metadata.
+    /// For iceberg-java compatibility with ParquetSchemaUtil.pruneColumns().
+    fn get_arrow_projection_mask_with_field_ids(
+        leaf_field_ids: &[i32],
+        iceberg_schema_of_task: &Schema,
+        parquet_schema: &SchemaDescriptor,
+        arrow_schema: &ArrowSchemaRef,
+        type_promotion_is_valid: fn(Option<&PrimitiveType>, 
Option<&PrimitiveType>) -> bool,
+    ) -> Result<ProjectionMask> {
+        let mut column_map = HashMap::new();
+        let fields = arrow_schema.fields();
+
+        // Pre-project only the fields that have been selected, possibly 
avoiding converting
+        // some Arrow types that are not yet supported.
+        let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
+        let projected_arrow_schema = ArrowSchema::new_with_metadata(
+            fields.filter_leaves(|_, f| {
+                f.metadata()
+                    .get(PARQUET_FIELD_ID_META_KEY)
+                    .and_then(|field_id| i32::from_str(field_id).ok())
+                    .is_some_and(|field_id| {
+                        projected_fields.insert((*f).clone(), field_id);
+                        leaf_field_ids.contains(&field_id)
+                    })
+            }),
+            arrow_schema.metadata().clone(),
+        );
+        let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
 
-                let iceberg_field = 
iceberg_schema_of_task.field_by_id(field_id);
-                let parquet_iceberg_field = 
iceberg_schema.field_by_id(field_id);
+        fields.filter_leaves(|idx, field| {
+            let Some(field_id) = projected_fields.get(field).cloned() else {
+                return false;
+            };
 
-                if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
-                    return false;
-                }
+            let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
+            let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
 
-                if !type_promotion_is_valid(
-                    parquet_iceberg_field
-                        .unwrap()
-                        .field_type
-                        .as_primitive_type(),
-                    iceberg_field.unwrap().field_type.as_primitive_type(),
-                ) {
-                    return false;
-                }
+            if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
+                return false;
+            }
 
-                column_map.insert(field_id, idx);
-                true
-            });
-
-            // Only project columns that exist in the Parquet file.
-            // Missing columns will be added by RecordBatchTransformer with 
default/NULL values.
-            // This supports schema evolution where new columns are added to 
the table schema
-            // but old Parquet files don't have them yet.
-            let mut indices = vec![];
-            for field_id in leaf_field_ids {
-                if let Some(col_idx) = column_map.get(&field_id) {
-                    indices.push(*col_idx);
-                }
-                // Skip fields that don't exist in the Parquet file - they 
will be added later
+            if !type_promotion_is_valid(
+                parquet_iceberg_field
+                    .unwrap()
+                    .field_type
+                    .as_primitive_type(),
+                iceberg_field.unwrap().field_type.as_primitive_type(),
+            ) {
+                return false;
+            }
+
+            column_map.insert(field_id, idx);
+            true
+        });
+
+        // Schema evolution: New columns may not exist in old Parquet files.
+        // We only project existing columns; RecordBatchTransformer adds 
default/NULL values.
+        let mut indices = vec![];
+        for field_id in leaf_field_ids {
+            if let Some(col_idx) = column_map.get(field_id) {
+                indices.push(*col_idx);
             }
+        }
+
+        if indices.is_empty() {
+            // Edge case: All requested columns are new (don't exist in file).
+            // Project all columns so RecordBatchTransformer has a batch to 
transform.
+            Ok(ProjectionMask::all())
+        } else {
+            Ok(ProjectionMask::leaves(parquet_schema, indices))
+        }
+    }
+
+    /// Fallback projection for Parquet files without field IDs.
+    /// Uses position-based matching: field ID N → column position N-1.
+    /// Projects entire top-level columns (including nested content) for 
iceberg-java compatibility.
+    fn get_arrow_projection_mask_fallback(
+        field_ids: &[i32],
+        parquet_schema: &SchemaDescriptor,
+    ) -> Result<ProjectionMask> {
+        // Position-based: field_id N → column N-1 (field IDs are 1-indexed)
+        let parquet_root_fields = parquet_schema.root_schema().get_fields();
+        let mut root_indices = vec![];
+
+        for field_id in field_ids.iter() {
+            let parquet_pos = (*field_id - 1) as usize;
 
-            if indices.is_empty() {
-                // If no columns from the projection exist in the file, 
project all columns
-                // This can happen if all requested columns are new and need 
to be added by the transformer
-                Ok(ProjectionMask::all())
-            } else {
-                Ok(ProjectionMask::leaves(parquet_schema, indices))
+            if parquet_pos < parquet_root_fields.len() {
+                root_indices.push(parquet_pos);
             }
+            // RecordBatchTransformer adds missing columns with NULL values
+        }
+
+        if root_indices.is_empty() {
+            Ok(ProjectionMask::all())
+        } else {
+            Ok(ProjectionMask::roots(parquet_schema, root_indices))
         }
     }
 
@@ -713,6 +799,13 @@ impl ArrowReader {
             ));
         };
 
+        // If all row groups were filtered out, return an empty RowSelection 
(select no rows)
+        if let Some(selected_row_groups) = selected_row_groups {
+            if selected_row_groups.is_empty() {
+                return Ok(RowSelection::from(Vec::new()));
+            }
+        }
+
         let mut selected_row_groups_idx = 0;
 
         let page_index = column_index
@@ -785,22 +878,16 @@ impl ArrowReader {
 }
 
 /// Build the map of parquet field id to Parquet column index in the schema.
-fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> 
Result<HashMap<i32, usize>> {
+/// Returns None if the Parquet file doesn't have field IDs embedded (e.g., 
migrated tables).
+fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> 
Result<Option<HashMap<i32, usize>>> {
     let mut column_map = HashMap::new();
+
     for (idx, field) in parquet_schema.columns().iter().enumerate() {
         let field_type = field.self_type();
         match field_type {
             ParquetType::PrimitiveType { basic_info, .. } => {
                 if !basic_info.has_id() {
-                    return Err(Error::new(
-                        ErrorKind::DataInvalid,
-                        format!(
-                            "Leave column idx: {}, name: {}, type {:?} in 
schema doesn't have field id",
-                            idx,
-                            basic_info.name(),
-                            field_type
-                        ),
-                    ));
+                    return Ok(None);
                 }
                 column_map.insert(basic_info.id(), idx);
             }
@@ -815,7 +902,59 @@ fn build_field_id_map(parquet_schema: &SchemaDescriptor) 
-> Result<HashMap<i32,
         };
     }
 
-    Ok(column_map)
+    Ok(Some(column_map))
+}
+
+/// Build a fallback field ID map for Parquet files without embedded field IDs.
+/// Position-based (1, 2, 3, ...) for compatibility with iceberg-java 
migrations.
+fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> 
HashMap<i32, usize> {
+    let mut column_map = HashMap::new();
+
+    // 1-indexed to match iceberg-java's convention
+    for (idx, _field) in parquet_schema.columns().iter().enumerate() {
+        let field_id = (idx + 1) as i32;
+        column_map.insert(field_id, idx);
+    }
+
+    column_map
+}
+
+/// Add position-based fallback field IDs to Arrow schema for Parquet files 
lacking them.
+/// Enables projection on migrated files (e.g., from Hive/Spark).
+///
+/// Why at schema level (not per-batch): Efficiency - avoids repeated schema 
modification.
+/// Why only top-level: Nested projection uses leaf column indices, not parent 
struct IDs.
+/// Why 1-indexed: Compatibility with iceberg-java's 
ParquetSchemaUtil.addFallbackIds().
+fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> 
Arc<ArrowSchema> {
+    debug_assert!(
+        arrow_schema
+            .fields()
+            .iter()
+            .next()
+            .is_none_or(|f| 
f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
+        "Schema already has field IDs"
+    );
+
+    use arrow_schema::Field;
+
+    let fields_with_fallback_ids: Vec<_> = arrow_schema
+        .fields()
+        .iter()
+        .enumerate()
+        .map(|(pos, field)| {
+            let mut metadata = field.metadata().clone();
+            let field_id = (pos + 1) as i32; // 1-indexed for Java 
compatibility
+            metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), 
field_id.to_string());
+
+            Field::new(field.name(), field.data_type().clone(), 
field.is_nullable())
+                .with_metadata(metadata)
+        })
+        .collect();
+
+    Arc::new(ArrowSchema::new_with_metadata(
+        fields_with_fallback_ids,
+        arrow_schema.metadata().clone(),
+    ))
 }
 
 /// A visitor to collect field ids from bound predicates.
@@ -1637,6 +1776,7 @@ message schema {
             &schema,
             &parquet_schema,
             &arrow_schema,
+            false,
         )
         .unwrap_err();
 
@@ -1652,6 +1792,7 @@ message schema {
             &schema,
             &parquet_schema,
             &arrow_schema,
+            false,
         )
         .unwrap_err();
 
@@ -1662,9 +1803,14 @@ message schema {
         );
 
         // Finally avoid selecting fields with unsupported data types
-        let mask =
-            ArrowReader::get_arrow_projection_mask(&[1], &schema, 
&parquet_schema, &arrow_schema)
-                .expect("Some ProjectionMask");
+        let mask = ArrowReader::get_arrow_projection_mask(
+            &[1],
+            &schema,
+            &parquet_schema,
+            &arrow_schema,
+            false,
+        )
+        .expect("Some ProjectionMask");
         assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
     }
 
@@ -2887,4 +3033,679 @@ message schema {
             "Should have ids 101-200 (all of row group 1)"
         );
     }
+
+    /// Test reading Parquet files without field ID metadata (e.g., migrated 
tables).
+    /// This exercises the position-based fallback path.
+    ///
+    /// Corresponds to Java's ParquetSchemaUtil.addFallbackIds() + 
pruneColumnsFallback()
+    /// in 
/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
+    #[tokio::test]
+    async fn test_read_parquet_file_without_field_ids() {
+        let schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::required(1, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                    NestedField::required(2, "age", 
Type::Primitive(PrimitiveType::Int)).into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        // Parquet file from a migrated table - no field ID metadata
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("name", DataType::Utf8, false),
+            Field::new("age", DataType::Int32, false),
+        ]));
+
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().to_str().unwrap().to_string();
+        let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+        let name_data = vec!["Alice", "Bob", "Charlie"];
+        let age_data = vec![30, 25, 35];
+
+        use arrow_array::Int32Array;
+        let name_col = Arc::new(StringArray::from(name_data.clone())) as 
ArrayRef;
+        let age_col = Arc::new(Int32Array::from(age_data.clone())) as ArrayRef;
+
+        let to_write = RecordBatch::try_new(arrow_schema.clone(), 
vec![name_col, age_col]).unwrap();
+
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .build();
+
+        let file = File::create(format!("{}/1.parquet", 
&table_location)).unwrap();
+        let mut writer = ArrowWriter::try_new(file, to_write.schema(), 
Some(props)).unwrap();
+
+        writer.write(&to_write).expect("Writing batch");
+        writer.close().unwrap();
+
+        let reader = ArrowReaderBuilder::new(file_io).build();
+
+        let tasks = Box::pin(futures::stream::iter(
+            vec![Ok(FileScanTask {
+                start: 0,
+                length: 0,
+                record_count: None,
+                data_file_path: format!("{}/1.parquet", table_location),
+                data_file_format: DataFileFormat::Parquet,
+                schema: schema.clone(),
+                project_field_ids: vec![1, 2],
+                predicate: None,
+                deletes: vec![],
+            })]
+            .into_iter(),
+        )) as FileScanTaskStream;
+
+        let result = reader
+            .read(tasks)
+            .unwrap()
+            .try_collect::<Vec<RecordBatch>>()
+            .await
+            .unwrap();
+
+        assert_eq!(result.len(), 1);
+        let batch = &result[0];
+        assert_eq!(batch.num_rows(), 3);
+        assert_eq!(batch.num_columns(), 2);
+
+        // Verify position-based mapping: field_id 1 → position 0, field_id 2 
→ position 1
+        let name_array = batch.column(0).as_string::<i32>();
+        assert_eq!(name_array.value(0), "Alice");
+        assert_eq!(name_array.value(1), "Bob");
+        assert_eq!(name_array.value(2), "Charlie");
+
+        let age_array = batch
+            .column(1)
+            .as_primitive::<arrow_array::types::Int32Type>();
+        assert_eq!(age_array.value(0), 30);
+        assert_eq!(age_array.value(1), 25);
+        assert_eq!(age_array.value(2), 35);
+    }
+
+    /// Test reading Parquet files without field IDs with partial projection.
+    /// Only a subset of columns are requested, verifying position-based 
fallback
+    /// handles column selection correctly.
+    #[tokio::test]
+    async fn test_read_parquet_without_field_ids_partial_projection() {
+        use arrow_array::Int32Array;
+
+        let schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::required(1, "col1", 
Type::Primitive(PrimitiveType::String)).into(),
+                    NestedField::required(2, "col2", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::required(3, "col3", 
Type::Primitive(PrimitiveType::String)).into(),
+                    NestedField::required(4, "col4", 
Type::Primitive(PrimitiveType::Int)).into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("col1", DataType::Utf8, false),
+            Field::new("col2", DataType::Int32, false),
+            Field::new("col3", DataType::Utf8, false),
+            Field::new("col4", DataType::Int32, false),
+        ]));
+
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().to_str().unwrap().to_string();
+        let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+        let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as 
ArrayRef;
+        let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
+        let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as 
ArrayRef;
+        let col4_data = Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef;
+
+        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
+            col1_data, col2_data, col3_data, col4_data,
+        ])
+        .unwrap();
+
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .build();
+
+        let file = File::create(format!("{}/1.parquet", 
&table_location)).unwrap();
+        let mut writer = ArrowWriter::try_new(file, to_write.schema(), 
Some(props)).unwrap();
+
+        writer.write(&to_write).expect("Writing batch");
+        writer.close().unwrap();
+
+        let reader = ArrowReaderBuilder::new(file_io).build();
+
+        let tasks = Box::pin(futures::stream::iter(
+            vec![Ok(FileScanTask {
+                start: 0,
+                length: 0,
+                record_count: None,
+                data_file_path: format!("{}/1.parquet", table_location),
+                data_file_format: DataFileFormat::Parquet,
+                schema: schema.clone(),
+                project_field_ids: vec![1, 3],
+                predicate: None,
+                deletes: vec![],
+            })]
+            .into_iter(),
+        )) as FileScanTaskStream;
+
+        let result = reader
+            .read(tasks)
+            .unwrap()
+            .try_collect::<Vec<RecordBatch>>()
+            .await
+            .unwrap();
+
+        assert_eq!(result.len(), 1);
+        let batch = &result[0];
+        assert_eq!(batch.num_rows(), 2);
+        assert_eq!(batch.num_columns(), 2);
+
+        let col1_array = batch.column(0).as_string::<i32>();
+        assert_eq!(col1_array.value(0), "a");
+        assert_eq!(col1_array.value(1), "b");
+
+        let col3_array = batch.column(1).as_string::<i32>();
+        assert_eq!(col3_array.value(0), "c");
+        assert_eq!(col3_array.value(1), "d");
+    }
+
+    /// Test reading Parquet files without field IDs with schema evolution.
+    /// The Iceberg schema has more fields than the Parquet file, testing that
+    /// missing columns are filled with NULLs.
+    #[tokio::test]
+    async fn test_read_parquet_without_field_ids_schema_evolution() {
+        use arrow_array::{Array, Int32Array};
+
+        // Schema with field 3 added after the file was written
+        let schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::required(1, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                    NestedField::required(2, "age", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::optional(3, "city", 
Type::Primitive(PrimitiveType::String)).into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("name", DataType::Utf8, false),
+            Field::new("age", DataType::Int32, false),
+        ]));
+
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().to_str().unwrap().to_string();
+        let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+        let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as 
ArrayRef;
+        let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
+
+        let to_write =
+            RecordBatch::try_new(arrow_schema.clone(), vec![name_data, 
age_data]).unwrap();
+
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .build();
+
+        let file = File::create(format!("{}/1.parquet", 
&table_location)).unwrap();
+        let mut writer = ArrowWriter::try_new(file, to_write.schema(), 
Some(props)).unwrap();
+
+        writer.write(&to_write).expect("Writing batch");
+        writer.close().unwrap();
+
+        let reader = ArrowReaderBuilder::new(file_io).build();
+
+        let tasks = Box::pin(futures::stream::iter(
+            vec![Ok(FileScanTask {
+                start: 0,
+                length: 0,
+                record_count: None,
+                data_file_path: format!("{}/1.parquet", table_location),
+                data_file_format: DataFileFormat::Parquet,
+                schema: schema.clone(),
+                project_field_ids: vec![1, 2, 3],
+                predicate: None,
+                deletes: vec![],
+            })]
+            .into_iter(),
+        )) as FileScanTaskStream;
+
+        let result = reader
+            .read(tasks)
+            .unwrap()
+            .try_collect::<Vec<RecordBatch>>()
+            .await
+            .unwrap();
+
+        assert_eq!(result.len(), 1);
+        let batch = &result[0];
+        assert_eq!(batch.num_rows(), 2);
+        assert_eq!(batch.num_columns(), 3);
+
+        let name_array = batch.column(0).as_string::<i32>();
+        assert_eq!(name_array.value(0), "Alice");
+        assert_eq!(name_array.value(1), "Bob");
+
+        let age_array = batch
+            .column(1)
+            .as_primitive::<arrow_array::types::Int32Type>();
+        assert_eq!(age_array.value(0), 30);
+        assert_eq!(age_array.value(1), 25);
+
+        // Verify missing column filled with NULLs
+        let city_array = batch.column(2).as_string::<i32>();
+        assert_eq!(city_array.null_count(), 2);
+        assert!(city_array.is_null(0));
+        assert!(city_array.is_null(1));
+    }
+
+    /// Test reading Parquet files without field IDs that have multiple row 
groups.
+    /// This ensures the position-based fallback works correctly across row 
group boundaries.
+    #[tokio::test]
+    async fn test_read_parquet_without_field_ids_multiple_row_groups() {
+        use arrow_array::Int32Array;
+
+        let schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::required(1, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                    NestedField::required(2, "value", 
Type::Primitive(PrimitiveType::Int)).into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("name", DataType::Utf8, false),
+            Field::new("value", DataType::Int32, false),
+        ]));
+
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().to_str().unwrap().to_string();
+        let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+        // Small row group size to create multiple row groups
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .set_write_batch_size(2)
+            .set_max_row_group_size(2)
+            .build();
+
+        let file = File::create(format!("{}/1.parquet", 
&table_location)).unwrap();
+        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), 
Some(props)).unwrap();
+
+        // Write 6 rows in 3 batches (will create 3 row groups)
+        for batch_num in 0..3 {
+            let name_data = Arc::new(StringArray::from(vec![
+                format!("name_{}", batch_num * 2),
+                format!("name_{}", batch_num * 2 + 1),
+            ])) as ArrayRef;
+            let value_data =
+                Arc::new(Int32Array::from(vec![batch_num * 2, batch_num * 2 + 
1])) as ArrayRef;
+
+            let batch =
+                RecordBatch::try_new(arrow_schema.clone(), vec![name_data, 
value_data]).unwrap();
+            writer.write(&batch).expect("Writing batch");
+        }
+        writer.close().unwrap();
+
+        let reader = ArrowReaderBuilder::new(file_io).build();
+
+        let tasks = Box::pin(futures::stream::iter(
+            vec![Ok(FileScanTask {
+                start: 0,
+                length: 0,
+                record_count: None,
+                data_file_path: format!("{}/1.parquet", table_location),
+                data_file_format: DataFileFormat::Parquet,
+                schema: schema.clone(),
+                project_field_ids: vec![1, 2],
+                predicate: None,
+                deletes: vec![],
+            })]
+            .into_iter(),
+        )) as FileScanTaskStream;
+
+        let result = reader
+            .read(tasks)
+            .unwrap()
+            .try_collect::<Vec<RecordBatch>>()
+            .await
+            .unwrap();
+
+        assert!(!result.is_empty());
+
+        let mut all_names = Vec::new();
+        let mut all_values = Vec::new();
+
+        for batch in &result {
+            let name_array = batch.column(0).as_string::<i32>();
+            let value_array = batch
+                .column(1)
+                .as_primitive::<arrow_array::types::Int32Type>();
+
+            for i in 0..batch.num_rows() {
+                all_names.push(name_array.value(i).to_string());
+                all_values.push(value_array.value(i));
+            }
+        }
+
+        assert_eq!(all_names.len(), 6);
+        assert_eq!(all_values.len(), 6);
+
+        for i in 0..6 {
+            assert_eq!(all_names[i], format!("name_{}", i));
+            assert_eq!(all_values[i], i as i32);
+        }
+    }
+
+    /// Test reading Parquet files without field IDs with nested types 
(struct).
+    /// Java's pruneColumnsFallback() projects entire top-level columns 
including nested content.
+    /// This test verifies that a top-level struct field is projected 
correctly with all its nested fields.
+    #[tokio::test]
+    async fn test_read_parquet_without_field_ids_with_struct() {
+        use arrow_array::{Int32Array, StructArray};
+        use arrow_schema::Fields;
+
+        let schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::required(
+                        2,
+                        "person",
+                        Type::Struct(crate::spec::StructType::new(vec![
+                            NestedField::required(
+                                3,
+                                "name",
+                                Type::Primitive(PrimitiveType::String),
+                            )
+                            .into(),
+                            NestedField::required(4, "age", 
Type::Primitive(PrimitiveType::Int))
+                                .into(),
+                        ])),
+                    )
+                    .into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new(
+                "person",
+                DataType::Struct(Fields::from(vec![
+                    Field::new("name", DataType::Utf8, false),
+                    Field::new("age", DataType::Int32, false),
+                ])),
+                false,
+            ),
+        ]));
+
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().to_str().unwrap().to_string();
+        let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+        let id_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
+        let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as 
ArrayRef;
+        let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
+        let person_data = Arc::new(StructArray::from(vec![
+            (
+                Arc::new(Field::new("name", DataType::Utf8, false)),
+                name_data,
+            ),
+            (
+                Arc::new(Field::new("age", DataType::Int32, false)),
+                age_data,
+            ),
+        ])) as ArrayRef;
+
+        let to_write =
+            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, 
person_data]).unwrap();
+
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .build();
+
+        let file = File::create(format!("{}/1.parquet", 
&table_location)).unwrap();
+        let mut writer = ArrowWriter::try_new(file, to_write.schema(), 
Some(props)).unwrap();
+
+        writer.write(&to_write).expect("Writing batch");
+        writer.close().unwrap();
+
+        let reader = ArrowReaderBuilder::new(file_io).build();
+
+        let tasks = Box::pin(futures::stream::iter(
+            vec![Ok(FileScanTask {
+                start: 0,
+                length: 0,
+                record_count: None,
+                data_file_path: format!("{}/1.parquet", table_location),
+                data_file_format: DataFileFormat::Parquet,
+                schema: schema.clone(),
+                project_field_ids: vec![1, 2],
+                predicate: None,
+                deletes: vec![],
+            })]
+            .into_iter(),
+        )) as FileScanTaskStream;
+
+        let result = reader
+            .read(tasks)
+            .unwrap()
+            .try_collect::<Vec<RecordBatch>>()
+            .await
+            .unwrap();
+
+        assert_eq!(result.len(), 1);
+        let batch = &result[0];
+        assert_eq!(batch.num_rows(), 2);
+        assert_eq!(batch.num_columns(), 2);
+
+        let id_array = batch
+            .column(0)
+            .as_primitive::<arrow_array::types::Int32Type>();
+        assert_eq!(id_array.value(0), 1);
+        assert_eq!(id_array.value(1), 2);
+
+        let person_array = batch.column(1).as_struct();
+        assert_eq!(person_array.num_columns(), 2);
+
+        let name_array = person_array.column(0).as_string::<i32>();
+        assert_eq!(name_array.value(0), "Alice");
+        assert_eq!(name_array.value(1), "Bob");
+
+        let age_array = person_array
+            .column(1)
+            .as_primitive::<arrow_array::types::Int32Type>();
+        assert_eq!(age_array.value(0), 30);
+        assert_eq!(age_array.value(1), 25);
+    }
+
+    /// Test reading Parquet files without field IDs with schema evolution - 
column added in the middle.
+    /// When a new column is inserted between existing columns in the schema 
order,
+    /// the fallback projection must correctly map field IDs to output 
positions.
+    #[tokio::test]
+    async fn 
test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle() {
+        use arrow_array::{Array, Int32Array};
+
+        let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
+            Field::new("col0", DataType::Int32, true),
+            Field::new("col1", DataType::Int32, true),
+        ]));
+
+        // New column added between existing columns: col0 (id=1), newCol 
(id=5), col1 (id=2)
+        let schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::optional(1, "col0", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::optional(5, "newCol", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::optional(2, "col1", 
Type::Primitive(PrimitiveType::Int)).into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().to_str().unwrap().to_string();
+        let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+        let col0_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
+        let col1_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
+
+        let to_write =
+            RecordBatch::try_new(arrow_schema_old.clone(), vec![col0_data, 
col1_data]).unwrap();
+
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .build();
+
+        let file = File::create(format!("{}/1.parquet", 
&table_location)).unwrap();
+        let mut writer = ArrowWriter::try_new(file, to_write.schema(), 
Some(props)).unwrap();
+        writer.write(&to_write).expect("Writing batch");
+        writer.close().unwrap();
+
+        let reader = ArrowReaderBuilder::new(file_io).build();
+
+        let tasks = Box::pin(futures::stream::iter(
+            vec![Ok(FileScanTask {
+                start: 0,
+                length: 0,
+                record_count: None,
+                data_file_path: format!("{}/1.parquet", table_location),
+                data_file_format: DataFileFormat::Parquet,
+                schema: schema.clone(),
+                project_field_ids: vec![1, 5, 2],
+                predicate: None,
+                deletes: vec![],
+            })]
+            .into_iter(),
+        )) as FileScanTaskStream;
+
+        let result = reader
+            .read(tasks)
+            .unwrap()
+            .try_collect::<Vec<RecordBatch>>()
+            .await
+            .unwrap();
+
+        assert_eq!(result.len(), 1);
+        let batch = &result[0];
+        assert_eq!(batch.num_rows(), 2);
+        assert_eq!(batch.num_columns(), 3);
+
+        let result_col0 = batch
+            .column(0)
+            .as_primitive::<arrow_array::types::Int32Type>();
+        assert_eq!(result_col0.value(0), 1);
+        assert_eq!(result_col0.value(1), 2);
+
+        // New column should be NULL (doesn't exist in old file)
+        let result_newcol = batch
+            .column(1)
+            .as_primitive::<arrow_array::types::Int32Type>();
+        assert_eq!(result_newcol.null_count(), 2);
+        assert!(result_newcol.is_null(0));
+        assert!(result_newcol.is_null(1));
+
+        let result_col1 = batch
+            .column(2)
+            .as_primitive::<arrow_array::types::Int32Type>();
+        assert_eq!(result_col1.value(0), 10);
+        assert_eq!(result_col1.value(1), 20);
+    }
+
+    /// Test reading Parquet files without field IDs with a filter that 
eliminates all row groups.
+    /// During development of field ID mapping, we saw a panic when 
row_selection_enabled=true and
+    /// all row groups are filtered out.
+    #[tokio::test]
+    async fn test_read_parquet_without_field_ids_filter_eliminates_all_rows() {
+        use arrow_array::{Float64Array, Int32Array};
+
+        // Schema with fields that will use fallback IDs 1, 2, 3
+        let schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::required(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                    NestedField::required(3, "value", 
Type::Primitive(PrimitiveType::Double))
+                        .into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, false),
+            Field::new("value", DataType::Float64, false),
+        ]));
+
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().to_str().unwrap().to_string();
+        let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+        // Write data where all ids are >= 10
+        let id_data = Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef;
+        let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as 
ArrayRef;
+        let value_data = Arc::new(Float64Array::from(vec![100.0, 200.0, 
300.0])) as ArrayRef;
+
+        let to_write =
+            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, 
name_data, value_data])
+                .unwrap();
+
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .build();
+
+        let file = File::create(format!("{}/1.parquet", 
&table_location)).unwrap();
+        let mut writer = ArrowWriter::try_new(file, to_write.schema(), 
Some(props)).unwrap();
+        writer.write(&to_write).expect("Writing batch");
+        writer.close().unwrap();
+
+        // Filter that eliminates all row groups: id < 5
+        let predicate = Reference::new("id").less_than(Datum::int(5));
+
+        // Enable both row_group_filtering and row_selection - triggered the 
panic
+        let reader = ArrowReaderBuilder::new(file_io)
+            .with_row_group_filtering_enabled(true)
+            .with_row_selection_enabled(true)
+            .build();
+
+        let tasks = Box::pin(futures::stream::iter(
+            vec![Ok(FileScanTask {
+                start: 0,
+                length: 0,
+                record_count: None,
+                data_file_path: format!("{}/1.parquet", table_location),
+                data_file_format: DataFileFormat::Parquet,
+                schema: schema.clone(),
+                project_field_ids: vec![1, 2, 3],
+                predicate: Some(predicate.bind(schema, true).unwrap()),
+                deletes: vec![],
+            })]
+            .into_iter(),
+        )) as FileScanTaskStream;
+
+        // Should no longer panic
+        let result = reader
+            .read(tasks)
+            .unwrap()
+            .try_collect::<Vec<RecordBatch>>()
+            .await
+            .unwrap();
+
+        // Should return empty results
+        assert!(result.is_empty() || result.iter().all(|batch| 
batch.num_rows() == 0));
+    }
 }

Reply via email to