This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 05ba2d38 feat(reader): position-based column projection for Parquet
files without field IDs (migrated tables) (#1777)
05ba2d38 is described below
commit 05ba2d38ccd93b7536addbc97fda89dcaccd49f8
Author: Matt Butrovich <[email protected]>
AuthorDate: Mon Nov 3 05:09:35 2025 -0500
feat(reader): position-based column projection for Parquet files without
field IDs (migrated tables) (#1777)
## What issue does this PR close?
Partially address #1749.
## Rationale for this change
**Background**: This issue was discovered when running Iceberg Java's
test suite against our [experimental DataFusion Comet branch that uses
iceberg-rust](https://github.com/apache/datafusion-comet/pull/2528).
Many failures occurred in `TestMigrateTableAction.java`, which tests
reading Parquet files from migrated tables (_e.g.,_ from Hive or Spark)
that lack embedded field ID metadata.
**Problem**: The Rust ArrowReader was unable to read these files, while
Iceberg Java handles them using a position-based fallback where
top-level field ID N maps to top-level Parquet column position N-1, and
entire columns (including nested content) are projected.
## What changes are included in this PR?
This PR implements position-based column projection for Parquet files
without field IDs, enabling iceberg-rust to read migrated tables.
**Solution**: Implemented fallback projection in
`ArrowReader::get_arrow_projection_mask_fallback()` that matches Java's
`ParquetSchemaUtil.pruneColumnsFallback()` behavior:
- Detects Parquet files without field IDs by checking Arrow schema
metadata
- Maps top-level field IDs to top-level column positions (field IDs are
1-indexed, positions are 0-indexed)
- Uses `ProjectionMask::roots()` to project entire columns including
nested content (structs, lists, maps)
- Adds field ID metadata to the projected schema for
`RecordBatchTransformer`
- Supports schema evolution by allowing missing columns (filled with
default values by `RecordBatchTransformer`)
This implementation now matches Iceberg Java's behavior for reading
migrated tables, enabling interoperability with Java-based tooling and
workflows.
## Are these changes tested?
Yes, comprehensive unit tests were added to verify the fallback path
works correctly:
- `test_read_parquet_file_without_field_ids` - Basic projection with
primitive columns using position-based mapping
- `test_read_parquet_without_field_ids_partial_projection` - Project
subset of columns
- `test_read_parquet_without_field_ids_schema_evolution` - Handle
missing columns with NULL values
- `test_read_parquet_without_field_ids_multiple_row_groups` - Verify
behavior across row group boundaries
- `test_read_parquet_without_field_ids_with_struct` - Project structs
with nested fields (entire top-level column)
- `test_read_parquet_without_field_ids_filter_eliminates_all_rows` -
Comet saw a panic when all row groups were filtered out, this reproduces
that scenario
-
`test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle`
- Schema evolution with a column in the middle caused a panic at one
point
All tests verify that behavior matches Iceberg Java's
`pruneColumnsFallback()` implementation in
`/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java`.
---------
Co-authored-by: Renjie Liu <[email protected]>
---
crates/iceberg/src/arrow/delete_file_loader.rs | 1 +
crates/iceberg/src/arrow/reader.rs | 1007 +++++++++++++++++++++---
2 files changed, 915 insertions(+), 93 deletions(-)
diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs
b/crates/iceberg/src/arrow/delete_file_loader.rs
index 54fc4a58..f2c63cc5 100644
--- a/crates/iceberg/src/arrow/delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/delete_file_loader.rs
@@ -63,6 +63,7 @@ impl BasicDeleteFileLoader {
data_file_path,
self.file_io.clone(),
false,
+ None,
)
.await?
.build()?
diff --git a/crates/iceberg/src/arrow/reader.rs
b/crates/iceberg/src/arrow/reader.rs
index e0894ad6..fed8f19c 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -183,22 +183,54 @@ impl ArrowReader {
let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes,
task.schema.clone());
- let mut record_batch_stream_builder =
Self::create_parquet_record_batch_stream_builder(
+ // Migrated tables lack field IDs, requiring us to inspect the schema
to choose
+ // between field-ID-based or position-based projection
+ let initial_stream_builder =
Self::create_parquet_record_batch_stream_builder(
&task.data_file_path,
file_io.clone(),
should_load_page_index,
+ None,
)
.await?;
- // Create a projection mask for the batch stream to select which
columns in the
- // Parquet file that we want in the response
+ // Parquet files from Hive/Spark migrations lack field IDs in their
metadata
+ let missing_field_ids = initial_stream_builder
+ .schema()
+ .fields()
+ .iter()
+ .next()
+ .is_some_and(|f|
f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
+
+ // Adding position-based fallback IDs at schema level (not per-batch)
enables projection
+ // on files that lack embedded field IDs. We recreate the builder to
apply the modified schema.
+ let mut record_batch_stream_builder = if missing_field_ids {
+ let arrow_schema =
+
add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema());
+ let options = ArrowReaderOptions::new().with_schema(arrow_schema);
+
+ Self::create_parquet_record_batch_stream_builder(
+ &task.data_file_path,
+ file_io.clone(),
+ should_load_page_index,
+ Some(options),
+ )
+ .await?
+ } else {
+ initial_stream_builder
+ };
+
+ // Fallback IDs don't match Parquet's embedded field IDs (since they
don't exist),
+ // so we must use position-based projection instead of field-ID
matching
let projection_mask = Self::get_arrow_projection_mask(
&task.project_field_ids,
&task.schema,
record_batch_stream_builder.parquet_schema(),
record_batch_stream_builder.schema(),
+ missing_field_ids, // Whether to use position-based (true) or
field-ID-based (false) projection
)?;
- record_batch_stream_builder =
record_batch_stream_builder.with_projection(projection_mask);
+
+ record_batch_stream_builder =
+
record_batch_stream_builder.with_projection(projection_mask.clone());
// RecordBatchTransformer performs any transformations required on the
RecordBatches
// that come back from the file, such as type promotion, default
column insertion
@@ -353,6 +385,7 @@ impl ArrowReader {
data_file_path: &str,
file_io: FileIO,
should_load_page_index: bool,
+ arrow_reader_options: Option<ArrowReaderOptions>,
) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead
+ Sized>>> {
// Get the metadata for the Parquet file we need to read and build
// a reader for the data within
@@ -365,11 +398,9 @@ impl ArrowReader {
.with_preload_page_index(should_load_page_index);
// Create the record batch stream builder, which wraps the parquet
file reader
- let record_batch_stream_builder =
ParquetRecordBatchStreamBuilder::new_with_options(
- parquet_file_reader,
- ArrowReaderOptions::new(),
- )
- .await?;
+ let options = arrow_reader_options.unwrap_or_default();
+ let record_batch_stream_builder =
+
ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader,
options).await?;
Ok(record_batch_stream_builder)
}
@@ -503,13 +534,18 @@ impl ArrowReader {
visit(&mut collector, predicate)?;
let iceberg_field_ids = collector.field_ids();
- let field_id_map = build_field_id_map(parquet_schema)?;
+
+ // Without embedded field IDs, we fall back to position-based mapping
for compatibility
+ let field_id_map = match build_field_id_map(parquet_schema)? {
+ Some(map) => map,
+ None => build_fallback_field_id_map(parquet_schema),
+ };
Ok((iceberg_field_ids, field_id_map))
}
- /// Insert the leaf field id into the field_ids using for projection.
- /// For nested type, it will recursively insert the leaf field id.
+ /// Recursively extract leaf field IDs because Parquet projection works at
the leaf column level.
+ /// Nested types (struct/list/map) are flattened in Parquet's columnar
format.
fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
match field.field_type.as_ref() {
Type::Primitive(_) => {
@@ -535,6 +571,7 @@ impl ArrowReader {
iceberg_schema_of_task: &Schema,
parquet_schema: &SchemaDescriptor,
arrow_schema: &ArrowSchemaRef,
+ use_fallback: bool, // Whether file lacks embedded field IDs (e.g.,
migrated from Hive/Spark)
) -> Result<ProjectionMask> {
fn type_promotion_is_valid(
file_type: Option<&PrimitiveType>,
@@ -560,83 +597,132 @@ impl ArrowReader {
}
}
- let mut leaf_field_ids = vec![];
- for field_id in field_ids {
- let field = iceberg_schema_of_task.field_by_id(*field_id);
- if let Some(field) = field {
- Self::include_leaf_field_id(field, &mut leaf_field_ids);
- }
+ if field_ids.is_empty() {
+ return Ok(ProjectionMask::all());
}
- if leaf_field_ids.is_empty() {
- Ok(ProjectionMask::all())
+ if use_fallback {
+ // Position-based projection necessary because file lacks embedded
field IDs
+ Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema)
} else {
- // Build the map between field id and column index in Parquet
schema.
- let mut column_map = HashMap::new();
-
- let fields = arrow_schema.fields();
- // Pre-project only the fields that have been selected, possibly
avoiding converting
- // some Arrow types that are not yet supported.
- let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
- let projected_arrow_schema = ArrowSchema::new_with_metadata(
- fields.filter_leaves(|_, f| {
- f.metadata()
- .get(PARQUET_FIELD_ID_META_KEY)
- .and_then(|field_id| i32::from_str(field_id).ok())
- .is_some_and(|field_id| {
- projected_fields.insert((*f).clone(), field_id);
- leaf_field_ids.contains(&field_id)
- })
- }),
- arrow_schema.metadata().clone(),
- );
- let iceberg_schema =
arrow_schema_to_schema(&projected_arrow_schema)?;
-
- fields.filter_leaves(|idx, field| {
- let Some(field_id) = projected_fields.get(field).cloned() else
{
- return false;
- };
+ // Field-ID-based projection using embedded field IDs from Parquet
metadata
+
+ // Parquet's columnar format requires leaf-level (not top-level
struct/list/map) projection
+ let mut leaf_field_ids = vec![];
+ for field_id in field_ids {
+ let field = iceberg_schema_of_task.field_by_id(*field_id);
+ if let Some(field) = field {
+ Self::include_leaf_field_id(field, &mut leaf_field_ids);
+ }
+ }
+
+ Self::get_arrow_projection_mask_with_field_ids(
+ &leaf_field_ids,
+ iceberg_schema_of_task,
+ parquet_schema,
+ arrow_schema,
+ type_promotion_is_valid,
+ )
+ }
+ }
+
+ /// Standard projection using embedded field IDs from Parquet metadata.
+ /// For iceberg-java compatibility with ParquetSchemaUtil.pruneColumns().
+ fn get_arrow_projection_mask_with_field_ids(
+ leaf_field_ids: &[i32],
+ iceberg_schema_of_task: &Schema,
+ parquet_schema: &SchemaDescriptor,
+ arrow_schema: &ArrowSchemaRef,
+ type_promotion_is_valid: fn(Option<&PrimitiveType>,
Option<&PrimitiveType>) -> bool,
+ ) -> Result<ProjectionMask> {
+ let mut column_map = HashMap::new();
+ let fields = arrow_schema.fields();
+
+ // Pre-project only the fields that have been selected, possibly
avoiding converting
+ // some Arrow types that are not yet supported.
+ let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
+ let projected_arrow_schema = ArrowSchema::new_with_metadata(
+ fields.filter_leaves(|_, f| {
+ f.metadata()
+ .get(PARQUET_FIELD_ID_META_KEY)
+ .and_then(|field_id| i32::from_str(field_id).ok())
+ .is_some_and(|field_id| {
+ projected_fields.insert((*f).clone(), field_id);
+ leaf_field_ids.contains(&field_id)
+ })
+ }),
+ arrow_schema.metadata().clone(),
+ );
+ let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
- let iceberg_field =
iceberg_schema_of_task.field_by_id(field_id);
- let parquet_iceberg_field =
iceberg_schema.field_by_id(field_id);
+ fields.filter_leaves(|idx, field| {
+ let Some(field_id) = projected_fields.get(field).cloned() else {
+ return false;
+ };
- if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
- return false;
- }
+ let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
+ let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
- if !type_promotion_is_valid(
- parquet_iceberg_field
- .unwrap()
- .field_type
- .as_primitive_type(),
- iceberg_field.unwrap().field_type.as_primitive_type(),
- ) {
- return false;
- }
+ if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
+ return false;
+ }
- column_map.insert(field_id, idx);
- true
- });
-
- // Only project columns that exist in the Parquet file.
- // Missing columns will be added by RecordBatchTransformer with
default/NULL values.
- // This supports schema evolution where new columns are added to
the table schema
- // but old Parquet files don't have them yet.
- let mut indices = vec![];
- for field_id in leaf_field_ids {
- if let Some(col_idx) = column_map.get(&field_id) {
- indices.push(*col_idx);
- }
- // Skip fields that don't exist in the Parquet file - they
will be added later
+ if !type_promotion_is_valid(
+ parquet_iceberg_field
+ .unwrap()
+ .field_type
+ .as_primitive_type(),
+ iceberg_field.unwrap().field_type.as_primitive_type(),
+ ) {
+ return false;
+ }
+
+ column_map.insert(field_id, idx);
+ true
+ });
+
+ // Schema evolution: New columns may not exist in old Parquet files.
+ // We only project existing columns; RecordBatchTransformer adds
default/NULL values.
+ let mut indices = vec![];
+ for field_id in leaf_field_ids {
+ if let Some(col_idx) = column_map.get(field_id) {
+ indices.push(*col_idx);
}
+ }
+
+ if indices.is_empty() {
+ // Edge case: All requested columns are new (don't exist in file).
+ // Project all columns so RecordBatchTransformer has a batch to
transform.
+ Ok(ProjectionMask::all())
+ } else {
+ Ok(ProjectionMask::leaves(parquet_schema, indices))
+ }
+ }
+
+ /// Fallback projection for Parquet files without field IDs.
+ /// Uses position-based matching: field ID N → column position N-1.
+ /// Projects entire top-level columns (including nested content) for
iceberg-java compatibility.
+ fn get_arrow_projection_mask_fallback(
+ field_ids: &[i32],
+ parquet_schema: &SchemaDescriptor,
+ ) -> Result<ProjectionMask> {
+ // Position-based: field_id N → column N-1 (field IDs are 1-indexed)
+ let parquet_root_fields = parquet_schema.root_schema().get_fields();
+ let mut root_indices = vec![];
+
+ for field_id in field_ids.iter() {
+ let parquet_pos = (*field_id - 1) as usize;
- if indices.is_empty() {
- // If no columns from the projection exist in the file,
project all columns
- // This can happen if all requested columns are new and need
to be added by the transformer
- Ok(ProjectionMask::all())
- } else {
- Ok(ProjectionMask::leaves(parquet_schema, indices))
+ if parquet_pos < parquet_root_fields.len() {
+ root_indices.push(parquet_pos);
}
+ // RecordBatchTransformer adds missing columns with NULL values
+ }
+
+ if root_indices.is_empty() {
+ Ok(ProjectionMask::all())
+ } else {
+ Ok(ProjectionMask::roots(parquet_schema, root_indices))
}
}
@@ -713,6 +799,13 @@ impl ArrowReader {
));
};
+ // If all row groups were filtered out, return an empty RowSelection
(select no rows)
+ if let Some(selected_row_groups) = selected_row_groups {
+ if selected_row_groups.is_empty() {
+ return Ok(RowSelection::from(Vec::new()));
+ }
+ }
+
let mut selected_row_groups_idx = 0;
let page_index = column_index
@@ -785,22 +878,16 @@ impl ArrowReader {
}
/// Build the map of parquet field id to Parquet column index in the schema.
-fn build_field_id_map(parquet_schema: &SchemaDescriptor) ->
Result<HashMap<i32, usize>> {
+/// Returns None if the Parquet file doesn't have field IDs embedded (e.g.,
migrated tables).
+fn build_field_id_map(parquet_schema: &SchemaDescriptor) ->
Result<Option<HashMap<i32, usize>>> {
let mut column_map = HashMap::new();
+
for (idx, field) in parquet_schema.columns().iter().enumerate() {
let field_type = field.self_type();
match field_type {
ParquetType::PrimitiveType { basic_info, .. } => {
if !basic_info.has_id() {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- format!(
- "Leave column idx: {}, name: {}, type {:?} in
schema doesn't have field id",
- idx,
- basic_info.name(),
- field_type
- ),
- ));
+ return Ok(None);
}
column_map.insert(basic_info.id(), idx);
}
@@ -815,7 +902,59 @@ fn build_field_id_map(parquet_schema: &SchemaDescriptor)
-> Result<HashMap<i32,
};
}
- Ok(column_map)
+ Ok(Some(column_map))
+}
+
+/// Build a fallback field ID map for Parquet files without embedded field IDs.
+/// Position-based (1, 2, 3, ...) for compatibility with iceberg-java
migrations.
+fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) ->
HashMap<i32, usize> {
+ let mut column_map = HashMap::new();
+
+ // 1-indexed to match iceberg-java's convention
+ for (idx, _field) in parquet_schema.columns().iter().enumerate() {
+ let field_id = (idx + 1) as i32;
+ column_map.insert(field_id, idx);
+ }
+
+ column_map
+}
+
+/// Add position-based fallback field IDs to Arrow schema for Parquet files
lacking them.
+/// Enables projection on migrated files (e.g., from Hive/Spark).
+///
+/// Why at schema level (not per-batch): Efficiency - avoids repeated schema
modification.
+/// Why only top-level: Nested projection uses leaf column indices, not parent
struct IDs.
+/// Why 1-indexed: Compatibility with iceberg-java's
ParquetSchemaUtil.addFallbackIds().
+fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) ->
Arc<ArrowSchema> {
+ debug_assert!(
+ arrow_schema
+ .fields()
+ .iter()
+ .next()
+ .is_none_or(|f|
f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
+ "Schema already has field IDs"
+ );
+
+ use arrow_schema::Field;
+
+ let fields_with_fallback_ids: Vec<_> = arrow_schema
+ .fields()
+ .iter()
+ .enumerate()
+ .map(|(pos, field)| {
+ let mut metadata = field.metadata().clone();
+ let field_id = (pos + 1) as i32; // 1-indexed for Java
compatibility
+ metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(),
field_id.to_string());
+
+ Field::new(field.name(), field.data_type().clone(),
field.is_nullable())
+ .with_metadata(metadata)
+ })
+ .collect();
+
+ Arc::new(ArrowSchema::new_with_metadata(
+ fields_with_fallback_ids,
+ arrow_schema.metadata().clone(),
+ ))
}
/// A visitor to collect field ids from bound predicates.
@@ -1637,6 +1776,7 @@ message schema {
&schema,
&parquet_schema,
&arrow_schema,
+ false,
)
.unwrap_err();
@@ -1652,6 +1792,7 @@ message schema {
&schema,
&parquet_schema,
&arrow_schema,
+ false,
)
.unwrap_err();
@@ -1662,9 +1803,14 @@ message schema {
);
// Finally avoid selecting fields with unsupported data types
- let mask =
- ArrowReader::get_arrow_projection_mask(&[1], &schema,
&parquet_schema, &arrow_schema)
- .expect("Some ProjectionMask");
+ let mask = ArrowReader::get_arrow_projection_mask(
+ &[1],
+ &schema,
+ &parquet_schema,
+ &arrow_schema,
+ false,
+ )
+ .expect("Some ProjectionMask");
assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
}
@@ -2887,4 +3033,679 @@ message schema {
"Should have ids 101-200 (all of row group 1)"
);
}
+
+ /// Test reading Parquet files without field ID metadata (e.g., migrated
tables).
+ /// This exercises the position-based fallback path.
+ ///
+ /// Corresponds to Java's ParquetSchemaUtil.addFallbackIds() +
pruneColumnsFallback()
+ /// in
/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
+ #[tokio::test]
+ async fn test_read_parquet_file_without_field_ids() {
+ let schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(1, "name",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::required(2, "age",
Type::Primitive(PrimitiveType::Int)).into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ // Parquet file from a migrated table - no field ID metadata
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("name", DataType::Utf8, false),
+ Field::new("age", DataType::Int32, false),
+ ]));
+
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().to_str().unwrap().to_string();
+ let file_io =
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+ let name_data = vec!["Alice", "Bob", "Charlie"];
+ let age_data = vec![30, 25, 35];
+
+ use arrow_array::Int32Array;
+ let name_col = Arc::new(StringArray::from(name_data.clone())) as
ArrayRef;
+ let age_col = Arc::new(Int32Array::from(age_data.clone())) as ArrayRef;
+
+ let to_write = RecordBatch::try_new(arrow_schema.clone(),
vec![name_col, age_col]).unwrap();
+
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .build();
+
+ let file = File::create(format!("{}/1.parquet",
&table_location)).unwrap();
+ let mut writer = ArrowWriter::try_new(file, to_write.schema(),
Some(props)).unwrap();
+
+ writer.write(&to_write).expect("Writing batch");
+ writer.close().unwrap();
+
+ let reader = ArrowReaderBuilder::new(file_io).build();
+
+ let tasks = Box::pin(futures::stream::iter(
+ vec![Ok(FileScanTask {
+ start: 0,
+ length: 0,
+ record_count: None,
+ data_file_path: format!("{}/1.parquet", table_location),
+ data_file_format: DataFileFormat::Parquet,
+ schema: schema.clone(),
+ project_field_ids: vec![1, 2],
+ predicate: None,
+ deletes: vec![],
+ })]
+ .into_iter(),
+ )) as FileScanTaskStream;
+
+ let result = reader
+ .read(tasks)
+ .unwrap()
+ .try_collect::<Vec<RecordBatch>>()
+ .await
+ .unwrap();
+
+ assert_eq!(result.len(), 1);
+ let batch = &result[0];
+ assert_eq!(batch.num_rows(), 3);
+ assert_eq!(batch.num_columns(), 2);
+
+ // Verify position-based mapping: field_id 1 → position 0, field_id 2
→ position 1
+ let name_array = batch.column(0).as_string::<i32>();
+ assert_eq!(name_array.value(0), "Alice");
+ assert_eq!(name_array.value(1), "Bob");
+ assert_eq!(name_array.value(2), "Charlie");
+
+ let age_array = batch
+ .column(1)
+ .as_primitive::<arrow_array::types::Int32Type>();
+ assert_eq!(age_array.value(0), 30);
+ assert_eq!(age_array.value(1), 25);
+ assert_eq!(age_array.value(2), 35);
+ }
+
+ /// Test reading Parquet files without field IDs with partial projection.
+ /// Only a subset of columns are requested, verifying position-based
fallback
+ /// handles column selection correctly.
+ #[tokio::test]
+ async fn test_read_parquet_without_field_ids_partial_projection() {
+ use arrow_array::Int32Array;
+
+ let schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(1, "col1",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::required(2, "col2",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(3, "col3",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::required(4, "col4",
Type::Primitive(PrimitiveType::Int)).into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("col1", DataType::Utf8, false),
+ Field::new("col2", DataType::Int32, false),
+ Field::new("col3", DataType::Utf8, false),
+ Field::new("col4", DataType::Int32, false),
+ ]));
+
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().to_str().unwrap().to_string();
+ let file_io =
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+ let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as
ArrayRef;
+ let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
+ let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as
ArrayRef;
+ let col4_data = Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef;
+
+ let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
+ col1_data, col2_data, col3_data, col4_data,
+ ])
+ .unwrap();
+
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .build();
+
+ let file = File::create(format!("{}/1.parquet",
&table_location)).unwrap();
+ let mut writer = ArrowWriter::try_new(file, to_write.schema(),
Some(props)).unwrap();
+
+ writer.write(&to_write).expect("Writing batch");
+ writer.close().unwrap();
+
+ let reader = ArrowReaderBuilder::new(file_io).build();
+
+ let tasks = Box::pin(futures::stream::iter(
+ vec![Ok(FileScanTask {
+ start: 0,
+ length: 0,
+ record_count: None,
+ data_file_path: format!("{}/1.parquet", table_location),
+ data_file_format: DataFileFormat::Parquet,
+ schema: schema.clone(),
+ project_field_ids: vec![1, 3],
+ predicate: None,
+ deletes: vec![],
+ })]
+ .into_iter(),
+ )) as FileScanTaskStream;
+
+ let result = reader
+ .read(tasks)
+ .unwrap()
+ .try_collect::<Vec<RecordBatch>>()
+ .await
+ .unwrap();
+
+ assert_eq!(result.len(), 1);
+ let batch = &result[0];
+ assert_eq!(batch.num_rows(), 2);
+ assert_eq!(batch.num_columns(), 2);
+
+ let col1_array = batch.column(0).as_string::<i32>();
+ assert_eq!(col1_array.value(0), "a");
+ assert_eq!(col1_array.value(1), "b");
+
+ let col3_array = batch.column(1).as_string::<i32>();
+ assert_eq!(col3_array.value(0), "c");
+ assert_eq!(col3_array.value(1), "d");
+ }
+
+ /// Test reading Parquet files without field IDs with schema evolution.
+ /// The Iceberg schema has more fields than the Parquet file, testing that
+ /// missing columns are filled with NULLs.
+ #[tokio::test]
+ async fn test_read_parquet_without_field_ids_schema_evolution() {
+ use arrow_array::{Array, Int32Array};
+
+ // Schema with field 3 added after the file was written
+ let schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(1, "name",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::required(2, "age",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(3, "city",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("name", DataType::Utf8, false),
+ Field::new("age", DataType::Int32, false),
+ ]));
+
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().to_str().unwrap().to_string();
+ let file_io =
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+ let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as
ArrayRef;
+ let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
+
+ let to_write =
+ RecordBatch::try_new(arrow_schema.clone(), vec![name_data,
age_data]).unwrap();
+
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .build();
+
+ let file = File::create(format!("{}/1.parquet",
&table_location)).unwrap();
+ let mut writer = ArrowWriter::try_new(file, to_write.schema(),
Some(props)).unwrap();
+
+ writer.write(&to_write).expect("Writing batch");
+ writer.close().unwrap();
+
+ let reader = ArrowReaderBuilder::new(file_io).build();
+
+ let tasks = Box::pin(futures::stream::iter(
+ vec![Ok(FileScanTask {
+ start: 0,
+ length: 0,
+ record_count: None,
+ data_file_path: format!("{}/1.parquet", table_location),
+ data_file_format: DataFileFormat::Parquet,
+ schema: schema.clone(),
+ project_field_ids: vec![1, 2, 3],
+ predicate: None,
+ deletes: vec![],
+ })]
+ .into_iter(),
+ )) as FileScanTaskStream;
+
+ let result = reader
+ .read(tasks)
+ .unwrap()
+ .try_collect::<Vec<RecordBatch>>()
+ .await
+ .unwrap();
+
+ assert_eq!(result.len(), 1);
+ let batch = &result[0];
+ assert_eq!(batch.num_rows(), 2);
+ assert_eq!(batch.num_columns(), 3);
+
+ let name_array = batch.column(0).as_string::<i32>();
+ assert_eq!(name_array.value(0), "Alice");
+ assert_eq!(name_array.value(1), "Bob");
+
+ let age_array = batch
+ .column(1)
+ .as_primitive::<arrow_array::types::Int32Type>();
+ assert_eq!(age_array.value(0), 30);
+ assert_eq!(age_array.value(1), 25);
+
+ // Verify missing column filled with NULLs
+ let city_array = batch.column(2).as_string::<i32>();
+ assert_eq!(city_array.null_count(), 2);
+ assert!(city_array.is_null(0));
+ assert!(city_array.is_null(1));
+ }
+
+ /// Test reading Parquet files without field IDs that have multiple row
groups.
+ /// This ensures the position-based fallback works correctly across row
group boundaries.
+ #[tokio::test]
+ async fn test_read_parquet_without_field_ids_multiple_row_groups() {
+ use arrow_array::Int32Array;
+
+ let schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(1, "name",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::required(2, "value",
Type::Primitive(PrimitiveType::Int)).into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("name", DataType::Utf8, false),
+ Field::new("value", DataType::Int32, false),
+ ]));
+
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().to_str().unwrap().to_string();
+ let file_io =
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+ // Small row group size to create multiple row groups
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .set_write_batch_size(2)
+ .set_max_row_group_size(2)
+ .build();
+
+ let file = File::create(format!("{}/1.parquet",
&table_location)).unwrap();
+ let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(),
Some(props)).unwrap();
+
+ // Write 6 rows in 3 batches (will create 3 row groups)
+ for batch_num in 0..3 {
+ let name_data = Arc::new(StringArray::from(vec![
+ format!("name_{}", batch_num * 2),
+ format!("name_{}", batch_num * 2 + 1),
+ ])) as ArrayRef;
+ let value_data =
+ Arc::new(Int32Array::from(vec![batch_num * 2, batch_num * 2 +
1])) as ArrayRef;
+
+ let batch =
+ RecordBatch::try_new(arrow_schema.clone(), vec![name_data,
value_data]).unwrap();
+ writer.write(&batch).expect("Writing batch");
+ }
+ writer.close().unwrap();
+
+ let reader = ArrowReaderBuilder::new(file_io).build();
+
+ let tasks = Box::pin(futures::stream::iter(
+ vec![Ok(FileScanTask {
+ start: 0,
+ length: 0,
+ record_count: None,
+ data_file_path: format!("{}/1.parquet", table_location),
+ data_file_format: DataFileFormat::Parquet,
+ schema: schema.clone(),
+ project_field_ids: vec![1, 2],
+ predicate: None,
+ deletes: vec![],
+ })]
+ .into_iter(),
+ )) as FileScanTaskStream;
+
+ let result = reader
+ .read(tasks)
+ .unwrap()
+ .try_collect::<Vec<RecordBatch>>()
+ .await
+ .unwrap();
+
+ assert!(!result.is_empty());
+
+ let mut all_names = Vec::new();
+ let mut all_values = Vec::new();
+
+ for batch in &result {
+ let name_array = batch.column(0).as_string::<i32>();
+ let value_array = batch
+ .column(1)
+ .as_primitive::<arrow_array::types::Int32Type>();
+
+ for i in 0..batch.num_rows() {
+ all_names.push(name_array.value(i).to_string());
+ all_values.push(value_array.value(i));
+ }
+ }
+
+ assert_eq!(all_names.len(), 6);
+ assert_eq!(all_values.len(), 6);
+
+ for i in 0..6 {
+ assert_eq!(all_names[i], format!("name_{}", i));
+ assert_eq!(all_values[i], i as i32);
+ }
+ }
+
+ /// Test reading Parquet files without field IDs with nested types
(struct).
+ /// Java's pruneColumnsFallback() projects entire top-level columns
including nested content.
+ /// This test verifies that a top-level struct field is projected
correctly with all its nested fields.
+ #[tokio::test]
+ async fn test_read_parquet_without_field_ids_with_struct() {
+ use arrow_array::{Int32Array, StructArray};
+ use arrow_schema::Fields;
+
+ let schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(
+ 2,
+ "person",
+ Type::Struct(crate::spec::StructType::new(vec![
+ NestedField::required(
+ 3,
+ "name",
+ Type::Primitive(PrimitiveType::String),
+ )
+ .into(),
+ NestedField::required(4, "age",
Type::Primitive(PrimitiveType::Int))
+ .into(),
+ ])),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new(
+ "person",
+ DataType::Struct(Fields::from(vec![
+ Field::new("name", DataType::Utf8, false),
+ Field::new("age", DataType::Int32, false),
+ ])),
+ false,
+ ),
+ ]));
+
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().to_str().unwrap().to_string();
+ let file_io =
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+ let id_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
+ let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as
ArrayRef;
+ let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
+ let person_data = Arc::new(StructArray::from(vec![
+ (
+ Arc::new(Field::new("name", DataType::Utf8, false)),
+ name_data,
+ ),
+ (
+ Arc::new(Field::new("age", DataType::Int32, false)),
+ age_data,
+ ),
+ ])) as ArrayRef;
+
+ let to_write =
+ RecordBatch::try_new(arrow_schema.clone(), vec![id_data,
person_data]).unwrap();
+
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .build();
+
+ let file = File::create(format!("{}/1.parquet",
&table_location)).unwrap();
+ let mut writer = ArrowWriter::try_new(file, to_write.schema(),
Some(props)).unwrap();
+
+ writer.write(&to_write).expect("Writing batch");
+ writer.close().unwrap();
+
+ let reader = ArrowReaderBuilder::new(file_io).build();
+
+ let tasks = Box::pin(futures::stream::iter(
+ vec![Ok(FileScanTask {
+ start: 0,
+ length: 0,
+ record_count: None,
+ data_file_path: format!("{}/1.parquet", table_location),
+ data_file_format: DataFileFormat::Parquet,
+ schema: schema.clone(),
+ project_field_ids: vec![1, 2],
+ predicate: None,
+ deletes: vec![],
+ })]
+ .into_iter(),
+ )) as FileScanTaskStream;
+
+ let result = reader
+ .read(tasks)
+ .unwrap()
+ .try_collect::<Vec<RecordBatch>>()
+ .await
+ .unwrap();
+
+ assert_eq!(result.len(), 1);
+ let batch = &result[0];
+ assert_eq!(batch.num_rows(), 2);
+ assert_eq!(batch.num_columns(), 2);
+
+ let id_array = batch
+ .column(0)
+ .as_primitive::<arrow_array::types::Int32Type>();
+ assert_eq!(id_array.value(0), 1);
+ assert_eq!(id_array.value(1), 2);
+
+ let person_array = batch.column(1).as_struct();
+ assert_eq!(person_array.num_columns(), 2);
+
+ let name_array = person_array.column(0).as_string::<i32>();
+ assert_eq!(name_array.value(0), "Alice");
+ assert_eq!(name_array.value(1), "Bob");
+
+ let age_array = person_array
+ .column(1)
+ .as_primitive::<arrow_array::types::Int32Type>();
+ assert_eq!(age_array.value(0), 30);
+ assert_eq!(age_array.value(1), 25);
+ }
+
+ /// Test reading Parquet files without field IDs with schema evolution -
column added in the middle.
+ /// When a new column is inserted between existing columns in the schema
order,
+ /// the fallback projection must correctly map field IDs to output
positions.
+ #[tokio::test]
+ async fn
test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle() {
+ use arrow_array::{Array, Int32Array};
+
+ let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
+ Field::new("col0", DataType::Int32, true),
+ Field::new("col1", DataType::Int32, true),
+ ]));
+
+ // New column added between existing columns: col0 (id=1), newCol
(id=5), col1 (id=2)
+ let schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::optional(1, "col0",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(5, "newCol",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(2, "col1",
Type::Primitive(PrimitiveType::Int)).into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().to_str().unwrap().to_string();
+ let file_io =
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+ let col0_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
+ let col1_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
+
+ let to_write =
+ RecordBatch::try_new(arrow_schema_old.clone(), vec![col0_data,
col1_data]).unwrap();
+
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .build();
+
+ let file = File::create(format!("{}/1.parquet",
&table_location)).unwrap();
+ let mut writer = ArrowWriter::try_new(file, to_write.schema(),
Some(props)).unwrap();
+ writer.write(&to_write).expect("Writing batch");
+ writer.close().unwrap();
+
+ let reader = ArrowReaderBuilder::new(file_io).build();
+
+ let tasks = Box::pin(futures::stream::iter(
+ vec![Ok(FileScanTask {
+ start: 0,
+ length: 0,
+ record_count: None,
+ data_file_path: format!("{}/1.parquet", table_location),
+ data_file_format: DataFileFormat::Parquet,
+ schema: schema.clone(),
+ project_field_ids: vec![1, 5, 2],
+ predicate: None,
+ deletes: vec![],
+ })]
+ .into_iter(),
+ )) as FileScanTaskStream;
+
+ let result = reader
+ .read(tasks)
+ .unwrap()
+ .try_collect::<Vec<RecordBatch>>()
+ .await
+ .unwrap();
+
+ assert_eq!(result.len(), 1);
+ let batch = &result[0];
+ assert_eq!(batch.num_rows(), 2);
+ assert_eq!(batch.num_columns(), 3);
+
+ let result_col0 = batch
+ .column(0)
+ .as_primitive::<arrow_array::types::Int32Type>();
+ assert_eq!(result_col0.value(0), 1);
+ assert_eq!(result_col0.value(1), 2);
+
+ // New column should be NULL (doesn't exist in old file)
+ let result_newcol = batch
+ .column(1)
+ .as_primitive::<arrow_array::types::Int32Type>();
+ assert_eq!(result_newcol.null_count(), 2);
+ assert!(result_newcol.is_null(0));
+ assert!(result_newcol.is_null(1));
+
+ let result_col1 = batch
+ .column(2)
+ .as_primitive::<arrow_array::types::Int32Type>();
+ assert_eq!(result_col1.value(0), 10);
+ assert_eq!(result_col1.value(1), 20);
+ }
+
+ /// Test reading Parquet files without field IDs with a filter that
eliminates all row groups.
+ /// During development of field ID mapping, we saw a panic when
row_selection_enabled=true and
+ /// all row groups are filtered out.
+ #[tokio::test]
+ async fn test_read_parquet_without_field_ids_filter_eliminates_all_rows() {
+ use arrow_array::{Float64Array, Int32Array};
+
+ // Schema with fields that will use fallback IDs 1, 2, 3
+ let schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "name",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::required(3, "value",
Type::Primitive(PrimitiveType::Double))
+ .into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, false),
+ Field::new("value", DataType::Float64, false),
+ ]));
+
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().to_str().unwrap().to_string();
+ let file_io =
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+ // Write data where all ids are >= 10
+ let id_data = Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef;
+ let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as
ArrayRef;
+ let value_data = Arc::new(Float64Array::from(vec![100.0, 200.0,
300.0])) as ArrayRef;
+
+ let to_write =
+ RecordBatch::try_new(arrow_schema.clone(), vec![id_data,
name_data, value_data])
+ .unwrap();
+
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .build();
+
+ let file = File::create(format!("{}/1.parquet",
&table_location)).unwrap();
+ let mut writer = ArrowWriter::try_new(file, to_write.schema(),
Some(props)).unwrap();
+ writer.write(&to_write).expect("Writing batch");
+ writer.close().unwrap();
+
+ // Filter that eliminates all row groups: id < 5
+ let predicate = Reference::new("id").less_than(Datum::int(5));
+
+ // Enable both row_group_filtering and row_selection - triggered the
panic
+ let reader = ArrowReaderBuilder::new(file_io)
+ .with_row_group_filtering_enabled(true)
+ .with_row_selection_enabled(true)
+ .build();
+
+ let tasks = Box::pin(futures::stream::iter(
+ vec![Ok(FileScanTask {
+ start: 0,
+ length: 0,
+ record_count: None,
+ data_file_path: format!("{}/1.parquet", table_location),
+ data_file_format: DataFileFormat::Parquet,
+ schema: schema.clone(),
+ project_field_ids: vec![1, 2, 3],
+ predicate: Some(predicate.bind(schema, true).unwrap()),
+ deletes: vec![],
+ })]
+ .into_iter(),
+ )) as FileScanTaskStream;
+
+ // Should no longer panic
+ let result = reader
+ .read(tasks)
+ .unwrap()
+ .try_collect::<Vec<RecordBatch>>()
+ .await
+ .unwrap();
+
+ // Should return empty results
+ assert!(result.is_empty() || result.iter().all(|batch|
batch.num_rows() == 0));
+ }
}