This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch xuanwo/resolve-merge-conflicts in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
commit c9dcf54c581050aec60b55721aa98a160271b458 Merge: 719d62f21 9fa3776cb Author: Xuanwo <[email protected]> AuthorDate: Fri Dec 5 18:24:22 2025 +0800 Merge remote-tracking branch 'origin/main' into viktor/origin_limit_push_down Signed-off-by: Xuanwo <[email protected]> .asf.yaml | 9 +- .github/workflows/audit.yml | 2 +- .github/workflows/bindings_python_ci.yml | 37 +- .github/workflows/ci.yml | 41 +- .github/workflows/ci_typos.yml | 4 +- .github/workflows/publish.yml | 2 +- .github/workflows/release_python.yml | 14 +- .github/workflows/release_python_nightly.yml | 40 +- .github/workflows/stale.yml | 2 +- .github/workflows/website.yml | 2 +- .licenserc.yaml | 19 +- CONTRIBUTING.md | 4 +- Cargo.lock | 2240 +++++------ Cargo.toml | 61 +- Makefile | 5 +- README.md | 1 + bindings/python/Cargo.lock | 1361 +++---- bindings/python/Cargo.toml | 13 +- .licenserc.yaml => bindings/python/Makefile | 26 +- bindings/python/README.md | 14 +- bindings/python/pyproject.toml | 18 +- bindings/python/src/datafusion_table_provider.rs | 6 +- bindings/python/uv.lock | 767 ++++ crates/catalog/glue/src/catalog.rs | 130 +- crates/catalog/glue/src/error.rs | 4 +- crates/catalog/glue/src/schema.rs | 6 +- crates/catalog/glue/src/utils.rs | 5 +- crates/catalog/glue/tests/glue_catalog_test.rs | 4 +- crates/catalog/hms/src/catalog.rs | 4 +- crates/catalog/hms/src/error.rs | 4 +- crates/catalog/hms/src/schema.rs | 6 +- crates/catalog/hms/src/utils.rs | 14 +- crates/catalog/hms/testdata/hms_catalog/Dockerfile | 22 +- crates/catalog/hms/tests/hms_catalog_test.rs | 2 +- crates/catalog/loader/Cargo.toml | 5 + crates/catalog/loader/src/lib.rs | 34 + crates/catalog/rest/src/catalog.rs | 6 +- crates/catalog/rest/tests/rest_catalog_test.rs | 2 +- crates/catalog/s3tables/Cargo.toml | 1 + crates/catalog/s3tables/src/catalog.rs | 213 +- crates/catalog/sql/Cargo.toml | 5 +- crates/catalog/sql/src/catalog.rs | 511 ++- crates/catalog/sql/src/error.rs | 6 +- crates/catalog/sql/src/lib.rs | 37 + crates/examples/src/rest_catalog_namespace.rs | 9 +- crates/examples/src/rest_catalog_table.rs | 2 +- crates/iceberg/Cargo.toml | 5 +- .../src/arrow/caching_delete_file_loader.rs | 253 +- crates/iceberg/src/arrow/delete_file_loader.rs | 29 +- crates/iceberg/src/arrow/delete_filter.rs | 10 +- crates/iceberg/src/arrow/mod.rs | 10 +- .../src/arrow/partition_value_calculator.rs | 254 ++ crates/iceberg/src/arrow/reader.rs | 2331 +++++++++++- .../src/arrow/record_batch_partition_splitter.rs | 390 +- crates/iceberg/src/arrow/record_batch_projector.rs | 71 +- .../iceberg/src/arrow/record_batch_transformer.rs | 1064 +++++- crates/iceberg/src/arrow/schema.rs | 28 +- crates/iceberg/src/arrow/value.rs | 6 +- crates/iceberg/src/catalog/memory/catalog.rs | 78 +- .../iceberg/src/catalog/memory/namespace_state.rs | 14 +- crates/iceberg/src/catalog/metadata_location.rs | 11 +- crates/iceberg/src/catalog/mod.rs | 229 +- crates/iceberg/src/delete_file_index.rs | 292 +- crates/iceberg/src/expr/predicate.rs | 16 +- .../src/expr/visitors/manifest_evaluator.rs | 1 + .../src/expr/visitors/page_index_evaluator.rs | 9 +- .../src/expr/visitors/strict_metrics_evaluator.rs | 45 +- crates/iceberg/src/io/file_io.rs | 12 - crates/iceberg/src/io/object_cache.rs | 26 +- crates/iceberg/src/io/storage.rs | 2 +- crates/iceberg/src/io/storage_s3.rs | 5 +- crates/iceberg/src/puffin/metadata.rs | 13 +- crates/iceberg/src/scan/cache.rs | 2 +- crates/iceberg/src/scan/context.rs | 7 + crates/iceberg/src/scan/mod.rs | 65 +- crates/iceberg/src/scan/task.rs | 54 +- crates/iceberg/src/spec/datatypes.rs | 65 +- crates/iceberg/src/spec/encrypted_key.rs | 16 +- crates/iceberg/src/spec/manifest/data_file.rs | 14 +- crates/iceberg/src/spec/manifest/entry.rs | 40 +- crates/iceberg/src/spec/manifest/mod.rs | 3 +- crates/iceberg/src/spec/manifest/writer.rs | 76 +- crates/iceberg/src/spec/manifest_list.rs | 676 +++- crates/iceberg/src/spec/mod.rs | 2 + crates/iceberg/src/spec/partition.rs | 51 +- crates/iceberg/src/spec/schema/id_reassigner.rs | 4 +- crates/iceberg/src/spec/schema/mod.rs | 11 +- crates/iceberg/src/spec/schema/utils.rs | 5 +- crates/iceberg/src/spec/snapshot.rs | 140 +- crates/iceberg/src/spec/snapshot_summary.rs | 8 +- crates/iceberg/src/spec/table_metadata.rs | 694 +++- crates/iceberg/src/spec/table_metadata_builder.rs | 557 ++- crates/iceberg/src/spec/table_properties.rs | 284 ++ crates/iceberg/src/spec/transform.rs | 26 +- crates/iceberg/src/spec/values.rs | 3957 -------------------- crates/iceberg/src/spec/values/datum.rs | 1225 ++++++ crates/iceberg/src/spec/values/literal.rs | 747 ++++ crates/iceberg/src/spec/values/map.rs | 145 + .../src/spec/values}/mod.rs | 25 +- crates/iceberg/src/spec/values/primitive.rs | 59 + crates/iceberg/src/spec/values/serde.rs | 719 ++++ crates/iceberg/src/spec/values/struct_value.rs | 79 + crates/iceberg/src/spec/values/temporal.rs | 105 + crates/iceberg/src/spec/values/tests.rs | 1334 +++++++ crates/iceberg/src/spec/view_metadata.rs | 4 +- crates/iceberg/src/spec/view_metadata_builder.rs | 8 +- crates/iceberg/src/table.rs | 11 +- crates/iceberg/src/transaction/append.rs | 6 +- crates/iceberg/src/transaction/mod.rs | 191 +- crates/iceberg/src/transaction/snapshot.rs | 124 +- .../iceberg/src/transaction/update_properties.rs | 5 +- crates/iceberg/src/transform/mod.rs | 14 +- crates/iceberg/src/transform/temporal.rs | 10 +- crates/iceberg/src/transform/truncate.rs | 4 +- .../src/writer/base_writer/data_file_writer.rs | 163 +- .../writer/base_writer/equality_delete_writer.rs | 162 +- .../src/writer/file_writer/location_generator.rs | 10 +- crates/iceberg/src/writer/file_writer/mod.rs | 6 +- .../src/writer/file_writer/parquet_writer.rs | 258 +- .../src/writer/file_writer/rolling_writer.rs | 218 +- crates/iceberg/src/writer/mod.rs | 192 +- .../src/writer/partitioning/clustered_writer.rs | 517 +++ .../src/writer/partitioning/fanout_writer.rs | 384 ++ crates/iceberg/src/writer/partitioning/mod.rs | 56 + .../writer/partitioning/unpartitioned_writer.rs | 198 + .../TableMetadataV3ValidMinimal.json | 74 + crates/iceberg/tests/file_io_s3_test.rs | 6 +- crates/integration_tests/src/lib.rs | 2 +- crates/integration_tests/testdata/spark/Dockerfile | 2 +- .../tests/shared_tests/append_data_file_test.rs | 9 +- .../append_partition_data_file_test.rs | 82 +- .../tests/shared_tests/conflict_commit_test.rs | 9 +- .../tests/shared_tests/datafusion.rs | 4 +- .../tests/shared_tests/read_positional_deletes.rs | 4 +- .../tests/shared_tests/scan_all_type.rs | 9 +- crates/integrations/datafusion/src/error.rs | 2 +- crates/integrations/datafusion/src/lib.rs | 2 + .../datafusion/src/physical_plan/commit.rs | 128 +- .../datafusion/src/physical_plan/mod.rs | 4 + .../datafusion/src/physical_plan/project.rs | 380 ++ .../datafusion/src/physical_plan/repartition.rs | 885 +++++ .../datafusion/src/physical_plan/sort.rs | 244 ++ .../datafusion/src/physical_plan/write.rs | 114 +- crates/integrations/datafusion/src/schema.rs | 6 +- crates/integrations/datafusion/src/table/mod.rs | 468 ++- .../datafusion/src/table/table_provider_factory.rs | 10 +- crates/integrations/datafusion/src/task_writer.rs | 517 +++ .../tests/integration_datafusion_test.rs | 156 +- crates/integrations/playground/Cargo.toml | 3 +- crates/integrations/playground/src/main.rs | 2 +- crates/sqllogictest/Cargo.toml | 12 + crates/sqllogictest/src/engine/datafusion.rs | 56 +- crates/sqllogictest/src/engine/mod.rs | 73 +- crates/sqllogictest/src/error.rs | 6 + crates/sqllogictest/src/lib.rs | 7 +- crates/sqllogictest/src/schedule.rs | 24 +- .../sqllogictest/testdata/schedules/df_test.toml | 23 +- .../testdata/slts/df_test/show_tables.slt | 45 +- crates/sqllogictest/tests/sqllogictests.rs | 87 + .../0001_modularize_iceberg_implementations.md | 120 + scripts/release.sh | 2 +- website/src/download.md | 4 +- website/src/reference/orbstack.md | 4 +- website/src/release.md | 14 +- 164 files changed, 20194 insertions(+), 8065 deletions(-) diff --cc crates/iceberg/src/arrow/caching_delete_file_loader.rs index 9cf605680,192ca390a..d3e943200 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@@ -686,4 -701,215 +701,216 @@@ mod tests let result = delete_filter.get_delete_vector(&file_scan_tasks[1]); assert!(result.is_none()); // no pos dels for file 3 } + + /// Verifies that evolve_schema on partial-schema equality deletes works correctly + /// when only equality_ids columns are evolved, not all table columns. + /// + /// Per the [Iceberg spec](https://iceberg.apache.org/spec/#equality-delete-files), + /// equality delete files can contain only a subset of columns. + #[tokio::test] + async fn test_partial_schema_equality_deletes_evolve_succeeds() { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().as_os_str().to_str().unwrap(); + + // Create table schema with REQUIRED fields + let table_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + crate::spec::NestedField::required( + 1, + "id", + crate::spec::Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into(), + crate::spec::NestedField::required( + 2, + "data", + crate::spec::Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(), + ); + + // Write equality delete file with PARTIAL schema (only 'data' column) + let delete_file_path = { + let data_vals = vec!["a", "d", "g"]; + let data_col = Arc::new(StringArray::from(data_vals)) as ArrayRef; + + let delete_schema = Arc::new(arrow_schema::Schema::new(vec![simple_field( + "data", + DataType::Utf8, + false, + "2", // field ID + )])); + + let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![data_col]).unwrap(); + + let path = format!("{}/partial-eq-deletes.parquet", &table_location); + let file = File::create(&path).unwrap(); + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + let mut writer = + ArrowWriter::try_new(file, delete_batch.schema(), Some(props)).unwrap(); + writer.write(&delete_batch).expect("Writing batch"); + writer.close().unwrap(); + path + }; + + let file_io = FileIO::from_path(table_location).unwrap().build().unwrap(); + let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); + + let batch_stream = basic_delete_file_loader + .parquet_to_batch_stream(&delete_file_path) + .await + .unwrap(); + + // Only evolve the equality_ids columns (field 2), not all table columns + let equality_ids = vec![2]; + let evolved_stream = + BasicDeleteFileLoader::evolve_schema(batch_stream, table_schema, &equality_ids) + .await + .unwrap(); + + let result = evolved_stream.try_collect::<Vec<_>>().await; + + assert!( + result.is_ok(), + "Expected success when evolving only equality_ids columns, got error: {:?}", + result.err() + ); + + let batches = result.unwrap(); + assert_eq!(batches.len(), 1); + + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 1); // Only 'data' column + + // Verify the actual values are preserved after schema evolution + let data_col = batch.column(0).as_string::<i32>(); + assert_eq!(data_col.value(0), "a"); + assert_eq!(data_col.value(1), "d"); + assert_eq!(data_col.value(2), "g"); + } + + /// Test loading a FileScanTask with BOTH positional and equality deletes. + /// Verifies the fix for the inverted condition that caused "Missing predicate for equality delete file" errors. + #[tokio::test] + async fn test_load_deletes_with_mixed_types() { + use crate::scan::FileScanTask; + use crate::spec::{DataFileFormat, Schema}; + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path(); + let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) + .unwrap() + .build() + .unwrap(); + + // Create the data file schema + let data_file_schema = Arc::new( + Schema::builder() + .with_fields(vec![ + crate::spec::NestedField::optional( + 2, + "y", + crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long), + ) + .into(), + crate::spec::NestedField::optional( + 3, + "z", + crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long), + ) + .into(), + ]) + .build() + .unwrap(), + ); + + // Write positional delete file + let positional_delete_schema = crate::arrow::delete_filter::tests::create_pos_del_schema(); + let file_path_values = + vec![format!("{}/data-1.parquet", table_location.to_str().unwrap()); 4]; + let file_path_col = Arc::new(StringArray::from_iter_values(&file_path_values)); + let pos_col = Arc::new(Int64Array::from_iter_values(vec![0i64, 1, 2, 3])); + + let positional_deletes_to_write = + RecordBatch::try_new(positional_delete_schema.clone(), vec![ + file_path_col, + pos_col, + ]) + .unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let pos_del_path = format!("{}/pos-del-mixed.parquet", table_location.to_str().unwrap()); + let file = File::create(&pos_del_path).unwrap(); + let mut writer = ArrowWriter::try_new( + file, + positional_deletes_to_write.schema(), + Some(props.clone()), + ) + .unwrap(); + writer.write(&positional_deletes_to_write).unwrap(); + writer.close().unwrap(); + + // Write equality delete file + let eq_delete_path = setup_write_equality_delete_file_1(table_location.to_str().unwrap()); + + // Create FileScanTask with BOTH positional and equality deletes + let pos_del = FileScanTaskDeleteFile { + file_path: pos_del_path, + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: None, + }; + + let eq_del = FileScanTaskDeleteFile { + file_path: eq_delete_path.clone(), + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![2, 3]), // Only use field IDs that exist in both schemas + }; + + let file_scan_task = FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/data-1.parquet", table_location.to_str().unwrap()), + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![2, 3], + predicate: None, + deletes: vec![pos_del, eq_del], + partition: None, + partition_spec: None, + name_mapping: None, ++ limit: None, + }; + + // Load the deletes - should handle both types without error + let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + let delete_filter = delete_file_loader + .load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref()) + .await + .unwrap() + .unwrap(); + + // Verify both delete types can be processed together + let result = delete_filter + .build_equality_delete_predicate(&file_scan_task) + .await; + assert!( + result.is_ok(), + "Failed to build equality delete predicate: {:?}", + result.err() + ); + } } diff --cc crates/iceberg/src/arrow/delete_filter.rs index b29f80886,14b5124ee..03be7338b --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@@ -339,7 -341,9 +341,10 @@@ pub(crate) mod tests project_field_ids: vec![], predicate: None, deletes: vec![pos_del_1, pos_del_2.clone()], + partition: None, + partition_spec: None, + name_mapping: None, + limit: None, }, FileScanTask { start: 0, @@@ -351,7 -355,9 +356,10 @@@ project_field_ids: vec![], predicate: None, deletes: vec![pos_del_3], + partition: None, + partition_spec: None, + name_mapping: None, + limit: None, }, ]; diff --cc crates/iceberg/src/arrow/reader.rs index c6f005474,ab5a96f75..81a9fa727 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@@ -176,13 -178,15 +178,16 @@@ impl ArrowReader row_group_filtering_enabled: bool, row_selection_enabled: bool, ) -> Result<ArrowRecordBatchStream> { - let should_load_page_index = - (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); + let should_load_page_index = (row_selection_enabled && task.predicate.is_some()) + || !task.deletes.is_empty() + || task.limit.is_some(); - let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, task.schema.clone()); + let delete_filter_rx = + delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema)); - let mut record_batch_stream_builder = Self::create_parquet_record_batch_stream_builder( + // Migrated tables lack field IDs, requiring us to inspect the schema to choose + // between field-ID-based or position-based projection + let initial_stream_builder = Self::create_parquet_record_batch_stream_builder( &task.data_file_path, file_io.clone(), should_load_page_index, @@@ -344,11 -440,9 +445,11 @@@ .with_preload_page_index(should_load_page_index); // Create the record batch stream builder, which wraps the parquet file reader - let record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options( - parquet_file_reader, - ArrowReaderOptions::new().with_page_index(should_load_page_index), - ) - .await?; - let options = arrow_reader_options.unwrap_or_default(); ++ let options = arrow_reader_options ++ .unwrap_or_default() ++ .with_page_index(should_load_page_index); + let record_batch_stream_builder = + ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?; Ok(record_batch_stream_builder) } @@@ -1750,7 -2061,9 +2068,10 @@@ message schema project_field_ids: vec![1], predicate: Some(predicate.bind(schema, true).unwrap()), deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, + limit: None, })] .into_iter(), )) as FileScanTaskStream; @@@ -1964,4 -2277,1748 +2285,1762 @@@ Arc::new(SchemaDescriptor::new(Arc::new(schema))) } + + /// Verifies that file splits respect byte ranges and only read specific row groups. + #[tokio::test] + async fn test_file_splits_respect_byte_ranges() { + use arrow_array::Int32Array; + use parquet::file::reader::{FileReader, SerializedFileReader}; + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_path = format!("{}/multi_row_group.parquet", &table_location); + + // Force each batch into its own row group for testing byte range filtering. + let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from( + (0..100).collect::<Vec<i32>>(), + ))]) + .unwrap(); + let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from( + (100..200).collect::<Vec<i32>>(), + ))]) + .unwrap(); + let batch3 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from( + (200..300).collect::<Vec<i32>>(), + ))]) + .unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .set_max_row_group_size(100) + .build(); + + let file = File::create(&file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap(); + writer.write(&batch1).expect("Writing batch 1"); + writer.write(&batch2).expect("Writing batch 2"); + writer.write(&batch3).expect("Writing batch 3"); + writer.close().unwrap(); + + // Read the file metadata to get row group byte positions + let file = File::open(&file_path).unwrap(); + let reader = SerializedFileReader::new(file).unwrap(); + let metadata = reader.metadata(); + + println!("File has {} row groups", metadata.num_row_groups()); + assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups"); + + // Get byte positions for each row group + let row_group_0 = metadata.row_group(0); + let row_group_1 = metadata.row_group(1); + let row_group_2 = metadata.row_group(2); + + let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1" + let rg1_start = rg0_start + row_group_0.compressed_size() as u64; + let rg2_start = rg1_start + row_group_1.compressed_size() as u64; + let file_end = rg2_start + row_group_2.compressed_size() as u64; + + println!( + "Row group 0: {} rows, starts at byte {}, {} bytes compressed", + row_group_0.num_rows(), + rg0_start, + row_group_0.compressed_size() + ); + println!( + "Row group 1: {} rows, starts at byte {}, {} bytes compressed", + row_group_1.num_rows(), + rg1_start, + row_group_1.compressed_size() + ); + println!( + "Row group 2: {} rows, starts at byte {}, {} bytes compressed", + row_group_2.num_rows(), + rg2_start, + row_group_2.compressed_size() + ); + + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + let reader = ArrowReaderBuilder::new(file_io).build(); + + // Task 1: read only the first row group + let task1 = FileScanTask { + start: rg0_start, + length: row_group_0.compressed_size() as u64, + record_count: Some(100), + data_file_path: file_path.clone(), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1], + predicate: None, + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, ++ limit: None, + }; + + // Task 2: read the second and third row groups + let task2 = FileScanTask { + start: rg1_start, + length: file_end - rg1_start, + record_count: Some(200), + data_file_path: file_path.clone(), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1], + predicate: None, + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, ++ limit: None, + }; + + let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream; + let result1 = reader + .clone() + .read(tasks1) + .unwrap() + .try_collect::<Vec<RecordBatch>>() + .await + .unwrap(); + + let total_rows_task1: usize = result1.iter().map(|b| b.num_rows()).sum(); + println!( + "Task 1 (bytes {}-{}) returned {} rows", + rg0_start, + rg0_start + row_group_0.compressed_size() as u64, + total_rows_task1 + ); + + let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as FileScanTaskStream; + let result2 = reader + .read(tasks2) + .unwrap() + .try_collect::<Vec<RecordBatch>>() + .await + .unwrap(); + + let total_rows_task2: usize = result2.iter().map(|b| b.num_rows()).sum(); + println!("Task 2 (bytes {rg1_start}-{file_end}) returned {total_rows_task2} rows"); + + assert_eq!( + total_rows_task1, 100, + "Task 1 should read only the first row group (100 rows), but got {total_rows_task1} rows" + ); + + assert_eq!( + total_rows_task2, 200, + "Task 2 should read only the second+third row groups (200 rows), but got {total_rows_task2} rows" + ); + + // Verify the actual data values are correct (not just the row count) + if total_rows_task1 > 0 { + let first_batch = &result1[0]; + let id_col = first_batch + .column(0) + .as_primitive::<arrow_array::types::Int32Type>(); + let first_val = id_col.value(0); + let last_val = id_col.value(id_col.len() - 1); + println!("Task 1 data range: {first_val} to {last_val}"); + + assert_eq!(first_val, 0, "Task 1 should start with id=0"); + assert_eq!(last_val, 99, "Task 1 should end with id=99"); + } + + if total_rows_task2 > 0 { + let first_batch = &result2[0]; + let id_col = first_batch + .column(0) + .as_primitive::<arrow_array::types::Int32Type>(); + let first_val = id_col.value(0); + println!("Task 2 first value: {first_val}"); + + assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0"); + } + } + + /// Test schema evolution: reading old Parquet file (with only column 'a') + /// using a newer table schema (with columns 'a' and 'b'). + /// This tests that: + /// 1. get_arrow_projection_mask allows missing columns + /// 2. RecordBatchTransformer adds missing column 'b' with NULL values + #[tokio::test] + async fn test_schema_evolution_add_column() { + use arrow_array::{Array, Int32Array}; + + // New table schema: columns 'a' and 'b' (b was added later, file only has 'a') + let new_schema = Arc::new( + Schema::builder() + .with_schema_id(2) + .with_fields(vec![ + NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + // Create Arrow schema for old Parquet file (only has column 'a') + let arrow_schema_old = Arc::new(ArrowSchema::new(vec![ + Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + // Write old Parquet file with only column 'a' + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; + let to_write = RecordBatch::try_new(arrow_schema_old.clone(), vec![data_a]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + let file = File::create(format!("{}/old_file.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + // Read the old Parquet file using the NEW schema (with column 'b') + let reader = ArrowReaderBuilder::new(file_io).build(); + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{table_location}/old_file.parquet"), + data_file_format: DataFileFormat::Parquet, + schema: new_schema.clone(), + project_field_ids: vec![1, 2], // Request both columns 'a' and 'b' + predicate: None, + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, ++ limit: None, + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::<Vec<RecordBatch>>() + .await + .unwrap(); + + // Verify we got the correct data + assert_eq!(result.len(), 1); + let batch = &result[0]; + + // Should have 2 columns now + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 3); + + // Column 'a' should have the original data + let col_a = batch + .column(0) + .as_primitive::<arrow_array::types::Int32Type>(); + assert_eq!(col_a.values(), &[1, 2, 3]); + + // Column 'b' should be all NULLs (it didn't exist in the old file) + let col_b = batch + .column(1) + .as_primitive::<arrow_array::types::Int32Type>(); + assert_eq!(col_b.null_count(), 3); + assert!(col_b.is_null(0)); + assert!(col_b.is_null(1)); + assert!(col_b.is_null(2)); + } + + /// Test for bug where position deletes in later row groups are not applied correctly. + /// + /// When a file has multiple row groups and a position delete targets a row in a later + /// row group, the `build_deletes_row_selection` function had a bug where it would + /// fail to increment `current_row_group_base_idx` when skipping row groups. + /// + /// This test creates: + /// - A data file with 200 rows split into 2 row groups (0-99, 100-199) + /// - A position delete file that deletes row 199 (last row in second row group) + /// + /// Expected behavior: Should return 199 rows (with id=200 deleted) + /// Bug behavior: Returns 200 rows (delete is not applied) + /// + /// This bug was discovered while running Apache Spark + Apache Iceberg integration tests + /// through DataFusion Comet. The following Iceberg Java tests failed due to this bug: + /// - `org.apache.iceberg.spark.extensions.TestMergeOnReadDelete::testDeleteWithMultipleRowGroupsParquet` + /// - `org.apache.iceberg.spark.extensions.TestMergeOnReadUpdate::testUpdateWithMultipleRowGroupsParquet` + #[tokio::test] + async fn test_position_delete_across_multiple_row_groups() { + use arrow_array::{Int32Array, Int64Array}; + use parquet::file::reader::{FileReader, SerializedFileReader}; + + // Field IDs for positional delete schema + const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546; + const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545; + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + + // Create table schema with a single 'id' column + let table_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + // Step 1: Create data file with 200 rows in 2 row groups + // Row group 0: rows 0-99 (ids 1-100) + // Row group 1: rows 100-199 (ids 101-200) + let data_file_path = format!("{}/data.parquet", &table_location); + + let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new( + Int32Array::from_iter_values(1..=100), + )]) + .unwrap(); + + let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new( + Int32Array::from_iter_values(101..=200), + )]) + .unwrap(); + + // Force each batch into its own row group + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .set_max_row_group_size(100) + .build(); + + let file = File::create(&data_file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap(); + writer.write(&batch1).expect("Writing batch 1"); + writer.write(&batch2).expect("Writing batch 2"); + writer.close().unwrap(); + + // Verify we created 2 row groups + let verify_file = File::open(&data_file_path).unwrap(); + let verify_reader = SerializedFileReader::new(verify_file).unwrap(); + assert_eq!( + verify_reader.metadata().num_row_groups(), + 2, + "Should have 2 row groups" + ); + + // Step 2: Create position delete file that deletes row 199 (id=200, last row in row group 1) + let delete_file_path = format!("{}/deletes.parquet", &table_location); + + let delete_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(), + )])), + Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_POS.to_string(), + )])), + ])); + + // Delete row at position 199 (0-indexed, so it's the last row: id=200) + let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![ + Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])), + Arc::new(Int64Array::from_iter_values(vec![199i64])), + ]) + .unwrap(); + + let delete_props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let delete_file = File::create(&delete_file_path).unwrap(); + let mut delete_writer = + ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap(); + delete_writer.write(&delete_batch).unwrap(); + delete_writer.close().unwrap(); + + // Step 3: Read the data file with the delete applied + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + let reader = ArrowReaderBuilder::new(file_io).build(); + + let task = FileScanTask { + start: 0, + length: 0, + record_count: Some(200), + data_file_path: data_file_path.clone(), + data_file_format: DataFileFormat::Parquet, + schema: table_schema.clone(), + project_field_ids: vec![1], + predicate: None, + deletes: vec![FileScanTaskDeleteFile { + file_path: delete_file_path, + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: None, + }], + partition: None, + partition_spec: None, + name_mapping: None, ++ limit: None, + }; + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let result = reader + .read(tasks) + .unwrap() + .try_collect::<Vec<RecordBatch>>() + .await + .unwrap(); + + // Step 4: Verify we got 199 rows (not 200) + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + + println!("Total rows read: {}", total_rows); + println!("Expected: 199 rows (deleted row 199 which had id=200)"); + + // This assertion will FAIL before the fix and PASS after the fix + assert_eq!( + total_rows, 199, + "Expected 199 rows after deleting row 199, but got {} rows. \ + The bug causes position deletes in later row groups to be ignored.", + total_rows + ); + + // Verify the deleted row (id=200) is not present + let all_ids: Vec<i32> = result + .iter() + .flat_map(|batch| { + batch + .column(0) + .as_primitive::<arrow_array::types::Int32Type>() + .values() + .iter() + .copied() + }) + .collect(); + + assert!( + !all_ids.contains(&200), + "Row with id=200 should be deleted but was found in results" + ); + + // Verify we have all other ids (1-199) + let expected_ids: Vec<i32> = (1..=199).collect(); + assert_eq!( + all_ids, expected_ids, + "Should have ids 1-199 but got different values" + ); + } + + /// Test for bug where position deletes are lost when skipping unselected row groups. + /// + /// This is a variant of `test_position_delete_across_multiple_row_groups` that exercises + /// the row group selection code path (`selected_row_groups: Some([...])`). + /// + /// When a file has multiple row groups and only some are selected for reading, + /// the `build_deletes_row_selection` function must correctly skip over deletes in + /// unselected row groups WITHOUT consuming deletes that belong to selected row groups. + /// + /// This test creates: + /// - A data file with 200 rows split into 2 row groups (0-99, 100-199) + /// - A position delete file that deletes row 199 (last row in second row group) + /// - Row group selection that reads ONLY row group 1 (rows 100-199) + /// + /// Expected behavior: Should return 99 rows (with row 199 deleted) + /// Bug behavior: Returns 100 rows (delete is lost when skipping row group 0) + /// + /// The bug occurs when processing row group 0 (unselected): + /// ```rust + /// delete_vector_iter.advance_to(next_row_group_base_idx); // Position at first delete >= 100 + /// next_deleted_row_idx_opt = delete_vector_iter.next(); // BUG: Consumes delete at 199! + /// ``` + /// + /// The fix is to NOT call `next()` after `advance_to()` when skipping unselected row groups, + /// because `advance_to()` already positions the iterator correctly without consuming elements. + #[tokio::test] + async fn test_position_delete_with_row_group_selection() { + use arrow_array::{Int32Array, Int64Array}; + use parquet::file::reader::{FileReader, SerializedFileReader}; + + // Field IDs for positional delete schema + const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546; + const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545; + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + + // Create table schema with a single 'id' column + let table_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + // Step 1: Create data file with 200 rows in 2 row groups + // Row group 0: rows 0-99 (ids 1-100) + // Row group 1: rows 100-199 (ids 101-200) + let data_file_path = format!("{}/data.parquet", &table_location); + + let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new( + Int32Array::from_iter_values(1..=100), + )]) + .unwrap(); + + let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new( + Int32Array::from_iter_values(101..=200), + )]) + .unwrap(); + + // Force each batch into its own row group + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .set_max_row_group_size(100) + .build(); + + let file = File::create(&data_file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap(); + writer.write(&batch1).expect("Writing batch 1"); + writer.write(&batch2).expect("Writing batch 2"); + writer.close().unwrap(); + + // Verify we created 2 row groups + let verify_file = File::open(&data_file_path).unwrap(); + let verify_reader = SerializedFileReader::new(verify_file).unwrap(); + assert_eq!( + verify_reader.metadata().num_row_groups(), + 2, + "Should have 2 row groups" + ); + + // Step 2: Create position delete file that deletes row 199 (id=200, last row in row group 1) + let delete_file_path = format!("{}/deletes.parquet", &table_location); + + let delete_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(), + )])), + Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_POS.to_string(), + )])), + ])); + + // Delete row at position 199 (0-indexed, so it's the last row: id=200) + let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![ + Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])), + Arc::new(Int64Array::from_iter_values(vec![199i64])), + ]) + .unwrap(); + + let delete_props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let delete_file = File::create(&delete_file_path).unwrap(); + let mut delete_writer = + ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap(); + delete_writer.write(&delete_batch).unwrap(); + delete_writer.close().unwrap(); + + // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199) + // This exercises the row group selection code path where row group 0 is skipped + let metadata_file = File::open(&data_file_path).unwrap(); + let metadata_reader = SerializedFileReader::new(metadata_file).unwrap(); + let metadata = metadata_reader.metadata(); + + let row_group_0 = metadata.row_group(0); + let row_group_1 = metadata.row_group(1); + + let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1" + let rg1_start = rg0_start + row_group_0.compressed_size() as u64; + let rg1_length = row_group_1.compressed_size() as u64; + + println!( + "Row group 0: starts at byte {}, {} bytes compressed", + rg0_start, + row_group_0.compressed_size() + ); + println!( + "Row group 1: starts at byte {}, {} bytes compressed", + rg1_start, + row_group_1.compressed_size() + ); + + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + let reader = ArrowReaderBuilder::new(file_io).build(); + + // Create FileScanTask that reads ONLY row group 1 via byte range filtering + let task = FileScanTask { + start: rg1_start, + length: rg1_length, + record_count: Some(100), // Row group 1 has 100 rows + data_file_path: data_file_path.clone(), + data_file_format: DataFileFormat::Parquet, + schema: table_schema.clone(), + project_field_ids: vec![1], + predicate: None, + deletes: vec![FileScanTaskDeleteFile { + file_path: delete_file_path, + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: None, + }], + partition: None, + partition_spec: None, + name_mapping: None, ++ limit: None, + }; + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let result = reader + .read(tasks) + .unwrap() + .try_collect::<Vec<RecordBatch>>() + .await + .unwrap(); + + // Step 4: Verify we got 99 rows (not 100) + // Row group 1 has 100 rows (ids 101-200), minus 1 delete (id=200) = 99 rows + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + + println!("Total rows read from row group 1: {}", total_rows); + println!("Expected: 99 rows (row group 1 has 100 rows, 1 delete at position 199)"); + + // This assertion will FAIL before the fix and PASS after the fix + assert_eq!( + total_rows, 99, + "Expected 99 rows from row group 1 after deleting position 199, but got {} rows. \ + The bug causes position deletes to be lost when advance_to() is followed by next() \ + when skipping unselected row groups.", + total_rows + ); + + // Verify the deleted row (id=200) is not present + let all_ids: Vec<i32> = result + .iter() + .flat_map(|batch| { + batch + .column(0) + .as_primitive::<arrow_array::types::Int32Type>() + .values() + .iter() + .copied() + }) + .collect(); + + assert!( + !all_ids.contains(&200), + "Row with id=200 should be deleted but was found in results" + ); + + // Verify we have ids 101-199 (not 101-200) + let expected_ids: Vec<i32> = (101..=199).collect(); + assert_eq!( + all_ids, expected_ids, + "Should have ids 101-199 but got different values" + ); + } + /// Test for bug where stale cached delete causes infinite loop when skipping row groups. + /// + /// This test exposes the inverse scenario of `test_position_delete_with_row_group_selection`: + /// - Position delete targets a row in the SKIPPED row group (not the selected one) + /// - After calling advance_to(), the cached delete index is stale + /// - Without updating the cache, the code enters an infinite loop + /// + /// This test creates: + /// - A data file with 200 rows split into 2 row groups (0-99, 100-199) + /// - A position delete file that deletes row 0 (first row in SKIPPED row group 0) + /// - Row group selection that reads ONLY row group 1 (rows 100-199) + /// + /// The bug occurs when skipping row group 0: + /// ```rust + /// let mut next_deleted_row_idx_opt = delete_vector_iter.next(); // Some(0) + /// // ... skip to row group 1 ... + /// delete_vector_iter.advance_to(100); // Iterator advances past delete at 0 + /// // BUG: next_deleted_row_idx_opt is still Some(0) - STALE! + /// // When processing row group 1: + /// // current_idx = 100, next_deleted_row_idx = 0, next_row_group_base_idx = 200 + /// // Loop condition: 0 < 200 (true) + /// // But: current_idx (100) > next_deleted_row_idx (0) + /// // And: current_idx (100) != next_deleted_row_idx (0) + /// // Neither branch executes -> INFINITE LOOP! + /// ``` + /// + /// Expected behavior: Should return 100 rows (delete at 0 doesn't affect row group 1) + /// Bug behavior: Infinite loop in build_deletes_row_selection + #[tokio::test] + async fn test_position_delete_in_skipped_row_group() { + use arrow_array::{Int32Array, Int64Array}; + use parquet::file::reader::{FileReader, SerializedFileReader}; + + // Field IDs for positional delete schema + const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546; + const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545; + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + + // Create table schema with a single 'id' column + let table_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + // Step 1: Create data file with 200 rows in 2 row groups + // Row group 0: rows 0-99 (ids 1-100) + // Row group 1: rows 100-199 (ids 101-200) + let data_file_path = format!("{}/data.parquet", &table_location); + + let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new( + Int32Array::from_iter_values(1..=100), + )]) + .unwrap(); + + let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new( + Int32Array::from_iter_values(101..=200), + )]) + .unwrap(); + + // Force each batch into its own row group + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .set_max_row_group_size(100) + .build(); + + let file = File::create(&data_file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap(); + writer.write(&batch1).expect("Writing batch 1"); + writer.write(&batch2).expect("Writing batch 2"); + writer.close().unwrap(); + + // Verify we created 2 row groups + let verify_file = File::open(&data_file_path).unwrap(); + let verify_reader = SerializedFileReader::new(verify_file).unwrap(); + assert_eq!( + verify_reader.metadata().num_row_groups(), + 2, + "Should have 2 row groups" + ); + + // Step 2: Create position delete file that deletes row 0 (id=1, first row in row group 0) + let delete_file_path = format!("{}/deletes.parquet", &table_location); + + let delete_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(), + )])), + Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_POS.to_string(), + )])), + ])); + + // Delete row at position 0 (0-indexed, so it's the first row: id=1) + let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![ + Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])), + Arc::new(Int64Array::from_iter_values(vec![0i64])), + ]) + .unwrap(); + + let delete_props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let delete_file = File::create(&delete_file_path).unwrap(); + let mut delete_writer = + ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap(); + delete_writer.write(&delete_batch).unwrap(); + delete_writer.close().unwrap(); + + // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199) + // This exercises the row group selection code path where row group 0 is skipped + let metadata_file = File::open(&data_file_path).unwrap(); + let metadata_reader = SerializedFileReader::new(metadata_file).unwrap(); + let metadata = metadata_reader.metadata(); + + let row_group_0 = metadata.row_group(0); + let row_group_1 = metadata.row_group(1); + + let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1" + let rg1_start = rg0_start + row_group_0.compressed_size() as u64; + let rg1_length = row_group_1.compressed_size() as u64; + + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + let reader = ArrowReaderBuilder::new(file_io).build(); + + // Create FileScanTask that reads ONLY row group 1 via byte range filtering + let task = FileScanTask { + start: rg1_start, + length: rg1_length, + record_count: Some(100), // Row group 1 has 100 rows + data_file_path: data_file_path.clone(), + data_file_format: DataFileFormat::Parquet, + schema: table_schema.clone(), + project_field_ids: vec![1], + predicate: None, + deletes: vec![FileScanTaskDeleteFile { + file_path: delete_file_path, + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: None, + }], + partition: None, + partition_spec: None, + name_mapping: None, ++ limit: None, + }; + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let result = reader + .read(tasks) + .unwrap() + .try_collect::<Vec<RecordBatch>>() + .await + .unwrap(); + + // Step 4: Verify we got 100 rows (all of row group 1) + // The delete at position 0 is in row group 0, which is skipped, so it doesn't affect us + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + + assert_eq!( + total_rows, 100, + "Expected 100 rows from row group 1 (delete at position 0 is in skipped row group 0). \ + If this hangs or fails, it indicates the cached delete index was not updated after advance_to()." + ); + + // Verify we have all ids from row group 1 (101-200) + let all_ids: Vec<i32> = result + .iter() + .flat_map(|batch| { + batch + .column(0) + .as_primitive::<arrow_array::types::Int32Type>() + .values() + .iter() + .copied() + }) + .collect(); + + let expected_ids: Vec<i32> = (101..=200).collect(); + assert_eq!( + all_ids, expected_ids, + "Should have ids 101-200 (all of row group 1)" + ); + } + + /// Test reading Parquet files without field ID metadata (e.g., migrated tables). + /// This exercises the position-based fallback path. + /// + /// Corresponds to Java's ParquetSchemaUtil.addFallbackIds() + pruneColumnsFallback() + /// in /parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java + #[tokio::test] + async fn test_read_parquet_file_without_field_ids() { + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + // Parquet file from a migrated table - no field ID metadata + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("age", DataType::Int32, false), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + let name_data = vec!["Alice", "Bob", "Charlie"]; + let age_data = vec![30, 25, 35]; + + use arrow_array::Int32Array; + let name_col = Arc::new(StringArray::from(name_data.clone())) as ArrayRef; + let age_col = Arc::new(Int32Array::from(age_data.clone())) as ArrayRef; + + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![name_col, age_col]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + let reader = ArrowReaderBuilder::new(file_io).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, ++ limit: None, + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::<Vec<RecordBatch>>() + .await + .unwrap(); + + assert_eq!(result.len(), 1); + let batch = &result[0]; + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 2); + + // Verify position-based mapping: field_id 1 → position 0, field_id 2 → position 1 + let name_array = batch.column(0).as_string::<i32>(); + assert_eq!(name_array.value(0), "Alice"); + assert_eq!(name_array.value(1), "Bob"); + assert_eq!(name_array.value(2), "Charlie"); + + let age_array = batch + .column(1) + .as_primitive::<arrow_array::types::Int32Type>(); + assert_eq!(age_array.value(0), 30); + assert_eq!(age_array.value(1), 25); + assert_eq!(age_array.value(2), 35); + } + + /// Test reading Parquet files without field IDs with partial projection. + /// Only a subset of columns are requested, verifying position-based fallback + /// handles column selection correctly. + #[tokio::test] + async fn test_read_parquet_without_field_ids_partial_projection() { + use arrow_array::Int32Array; + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "col1", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(3, "col3", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("col1", DataType::Utf8, false), + Field::new("col2", DataType::Int32, false), + Field::new("col3", DataType::Utf8, false), + Field::new("col4", DataType::Int32, false), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef; + let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef; + let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef; + let col4_data = Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef; + + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![ + col1_data, col2_data, col3_data, col4_data, + ]) + .unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + let reader = ArrowReaderBuilder::new(file_io).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 3], + predicate: None, + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, ++ limit: None, + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::<Vec<RecordBatch>>() + .await + .unwrap(); + + assert_eq!(result.len(), 1); + let batch = &result[0]; + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 2); + + let col1_array = batch.column(0).as_string::<i32>(); + assert_eq!(col1_array.value(0), "a"); + assert_eq!(col1_array.value(1), "b"); + + let col3_array = batch.column(1).as_string::<i32>(); + assert_eq!(col3_array.value(0), "c"); + assert_eq!(col3_array.value(1), "d"); + } + + /// Test reading Parquet files without field IDs with schema evolution. + /// The Iceberg schema has more fields than the Parquet file, testing that + /// missing columns are filled with NULLs. + #[tokio::test] + async fn test_read_parquet_without_field_ids_schema_evolution() { + use arrow_array::{Array, Int32Array}; + + // Schema with field 3 added after the file was written + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "city", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("age", DataType::Int32, false), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef; + let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema.clone(), vec![name_data, age_data]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + let reader = ArrowReaderBuilder::new(file_io).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2, 3], + predicate: None, + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, ++ limit: None, + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::<Vec<RecordBatch>>() + .await + .unwrap(); + + assert_eq!(result.len(), 1); + let batch = &result[0]; + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 3); + + let name_array = batch.column(0).as_string::<i32>(); + assert_eq!(name_array.value(0), "Alice"); + assert_eq!(name_array.value(1), "Bob"); + + let age_array = batch + .column(1) + .as_primitive::<arrow_array::types::Int32Type>(); + assert_eq!(age_array.value(0), 30); + assert_eq!(age_array.value(1), 25); + + // Verify missing column filled with NULLs + let city_array = batch.column(2).as_string::<i32>(); + assert_eq!(city_array.null_count(), 2); + assert!(city_array.is_null(0)); + assert!(city_array.is_null(1)); + } + + /// Test reading Parquet files without field IDs that have multiple row groups. + /// This ensures the position-based fallback works correctly across row group boundaries. + #[tokio::test] + async fn test_read_parquet_without_field_ids_multiple_row_groups() { + use arrow_array::Int32Array; + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "value", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("value", DataType::Int32, false), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + // Small row group size to create multiple row groups + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .set_write_batch_size(2) + .set_max_row_group_size(2) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap(); + + // Write 6 rows in 3 batches (will create 3 row groups) + for batch_num in 0..3 { + let name_data = Arc::new(StringArray::from(vec![ + format!("name_{}", batch_num * 2), + format!("name_{}", batch_num * 2 + 1), + ])) as ArrayRef; + let value_data = + Arc::new(Int32Array::from(vec![batch_num * 2, batch_num * 2 + 1])) as ArrayRef; + + let batch = + RecordBatch::try_new(arrow_schema.clone(), vec![name_data, value_data]).unwrap(); + writer.write(&batch).expect("Writing batch"); + } + writer.close().unwrap(); + + let reader = ArrowReaderBuilder::new(file_io).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, ++ limit: None, + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::<Vec<RecordBatch>>() + .await + .unwrap(); + + assert!(!result.is_empty()); + + let mut all_names = Vec::new(); + let mut all_values = Vec::new(); + + for batch in &result { + let name_array = batch.column(0).as_string::<i32>(); + let value_array = batch + .column(1) + .as_primitive::<arrow_array::types::Int32Type>(); + + for i in 0..batch.num_rows() { + all_names.push(name_array.value(i).to_string()); + all_values.push(value_array.value(i)); + } + } + + assert_eq!(all_names.len(), 6); + assert_eq!(all_values.len(), 6); + + for i in 0..6 { + assert_eq!(all_names[i], format!("name_{}", i)); + assert_eq!(all_values[i], i as i32); + } + } + + /// Test reading Parquet files without field IDs with nested types (struct). + /// Java's pruneColumnsFallback() projects entire top-level columns including nested content. + /// This test verifies that a top-level struct field is projected correctly with all its nested fields. + #[tokio::test] + async fn test_read_parquet_without_field_ids_with_struct() { + use arrow_array::{Int32Array, StructArray}; + use arrow_schema::Fields; + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required( + 2, + "person", + Type::Struct(crate::spec::StructType::new(vec![ + NestedField::required( + 3, + "name", + Type::Primitive(PrimitiveType::String), + ) + .into(), + NestedField::required(4, "age", Type::Primitive(PrimitiveType::Int)) + .into(), + ])), + ) + .into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "person", + DataType::Struct(Fields::from(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("age", DataType::Int32, false), + ])), + false, + ), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + let id_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef; + let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef; + let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef; + let person_data = Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new("name", DataType::Utf8, false)), + name_data, + ), + ( + Arc::new(Field::new("age", DataType::Int32, false)), + age_data, + ), + ])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema.clone(), vec![id_data, person_data]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + let reader = ArrowReaderBuilder::new(file_io).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, ++ limit: None, + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::<Vec<RecordBatch>>() + .await + .unwrap(); + + assert_eq!(result.len(), 1); + let batch = &result[0]; + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 2); + + let id_array = batch + .column(0) + .as_primitive::<arrow_array::types::Int32Type>(); + assert_eq!(id_array.value(0), 1); + assert_eq!(id_array.value(1), 2); + + let person_array = batch.column(1).as_struct(); + assert_eq!(person_array.num_columns(), 2); + + let name_array = person_array.column(0).as_string::<i32>(); + assert_eq!(name_array.value(0), "Alice"); + assert_eq!(name_array.value(1), "Bob"); + + let age_array = person_array + .column(1) + .as_primitive::<arrow_array::types::Int32Type>(); + assert_eq!(age_array.value(0), 30); + assert_eq!(age_array.value(1), 25); + } + + /// Test reading Parquet files without field IDs with schema evolution - column added in the middle. + /// When a new column is inserted between existing columns in the schema order, + /// the fallback projection must correctly map field IDs to output positions. + #[tokio::test] + async fn test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle() { + use arrow_array::{Array, Int32Array}; + + let arrow_schema_old = Arc::new(ArrowSchema::new(vec![ + Field::new("col0", DataType::Int32, true), + Field::new("col1", DataType::Int32, true), + ])); + + // New column added between existing columns: col0 (id=1), newCol (id=5), col1 (id=2) + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional(1, "col0", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(5, "newCol", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "col1", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + let col0_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef; + let col1_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema_old.clone(), vec![col0_data, col1_data]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + let reader = ArrowReaderBuilder::new(file_io).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 5, 2], + predicate: None, + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, ++ limit: None, + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::<Vec<RecordBatch>>() + .await + .unwrap(); + + assert_eq!(result.len(), 1); + let batch = &result[0]; + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 3); + + let result_col0 = batch + .column(0) + .as_primitive::<arrow_array::types::Int32Type>(); + assert_eq!(result_col0.value(0), 1); + assert_eq!(result_col0.value(1), 2); + + // New column should be NULL (doesn't exist in old file) + let result_newcol = batch + .column(1) + .as_primitive::<arrow_array::types::Int32Type>(); + assert_eq!(result_newcol.null_count(), 2); + assert!(result_newcol.is_null(0)); + assert!(result_newcol.is_null(1)); + + let result_col1 = batch + .column(2) + .as_primitive::<arrow_array::types::Int32Type>(); + assert_eq!(result_col1.value(0), 10); + assert_eq!(result_col1.value(1), 20); + } + + /// Test reading Parquet files without field IDs with a filter that eliminates all row groups. + /// During development of field ID mapping, we saw a panic when row_selection_enabled=true and + /// all row groups are filtered out. + #[tokio::test] + async fn test_read_parquet_without_field_ids_filter_eliminates_all_rows() { + use arrow_array::{Float64Array, Int32Array}; + + // Schema with fields that will use fallback IDs 1, 2, 3 + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "value", Type::Primitive(PrimitiveType::Double)) + .into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + Field::new("value", DataType::Float64, false), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + // Write data where all ids are >= 10 + let id_data = Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef; + let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef; + let value_data = Arc::new(Float64Array::from(vec![100.0, 200.0, 300.0])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data, value_data]) + .unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + // Filter that eliminates all row groups: id < 5 + let predicate = Reference::new("id").less_than(Datum::int(5)); + + // Enable both row_group_filtering and row_selection - triggered the panic + let reader = ArrowReaderBuilder::new(file_io) + .with_row_group_filtering_enabled(true) + .with_row_selection_enabled(true) + .build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2, 3], + predicate: Some(predicate.bind(schema, true).unwrap()), + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, ++ limit: None, + })] + .into_iter(), + )) as FileScanTaskStream; + + // Should no longer panic + let result = reader + .read(tasks) + .unwrap() + .try_collect::<Vec<RecordBatch>>() + .await + .unwrap(); + + // Should return empty results + assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0)); + } + + /// Test bucket partitioning reads source column from data file (not partition metadata). + /// + /// This is an integration test verifying the complete ArrowReader pipeline with bucket partitioning. + /// It corresponds to TestRuntimeFiltering tests in Iceberg Java (e.g., testRenamedSourceColumnTable). + /// + /// # Iceberg Spec Requirements + /// + /// Per the Iceberg spec "Column Projection" section: + /// > "Return the value from partition metadata if an **Identity Transform** exists for the field" + /// + /// This means: + /// - Identity transforms (e.g., `identity(dept)`) use constants from partition metadata + /// - Non-identity transforms (e.g., `bucket(4, id)`) must read source columns from data files + /// - Partition metadata for bucket transforms stores bucket numbers (0-3), NOT source values + /// + /// Java's PartitionUtil.constantsMap() implements this via: + /// ```java + /// if (field.transform().isIdentity()) { + /// idToConstant.put(field.sourceId(), converted); + /// } + /// ``` + /// + /// # What This Test Verifies + /// + /// This test ensures the full ArrowReader → RecordBatchTransformer pipeline correctly handles + /// bucket partitioning when FileScanTask provides partition_spec and partition_data: + /// + /// - Parquet file has field_id=1 named "id" with actual data [1, 5, 9, 13] + /// - FileScanTask specifies partition_spec with bucket(4, id) and partition_data with bucket=1 + /// - RecordBatchTransformer.constants_map() excludes bucket-partitioned field from constants + /// - ArrowReader correctly reads [1, 5, 9, 13] from the data file + /// - Values are NOT replaced with constant 1 from partition metadata + /// + /// # Why This Matters + /// + /// Without correct handling: + /// - Runtime filtering would break (e.g., `WHERE id = 5` would fail) + /// - Query results would be incorrect (all rows would have id=1) + /// - Bucket partitioning would be unusable for query optimization + /// + /// # References + /// - Iceberg spec: format/spec.md "Column Projection" + "Partition Transforms" + /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java + /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java + #[tokio::test] + async fn test_bucket_partitioning_reads_source_column_from_file() { + use arrow_array::Int32Array; + + use crate::spec::{Literal, PartitionSpec, Struct, Transform}; + + // Iceberg schema with id and name columns + let schema = Arc::new( + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + // Partition spec: bucket(4, id) + let partition_spec = Arc::new( + PartitionSpec::builder(schema.clone()) + .with_spec_id(0) + .add_partition_field("id", "id_bucket", Transform::Bucket(4)) + .unwrap() + .build() + .unwrap(), + ); + + // Partition data: bucket value is 1 + let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]); + + // Create Arrow schema with field IDs for Parquet file + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ])); + + // Write Parquet file with data + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as ArrayRef; + let name_data = + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + let file = File::create(format!("{}/data.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + // Read the Parquet file with partition spec and data + let reader = ArrowReaderBuilder::new(file_io).build(); + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/data.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + deletes: vec![], + partition: Some(partition_data), + partition_spec: Some(partition_spec), + name_mapping: None, ++ limit: None, + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::<Vec<RecordBatch>>() + .await + .unwrap(); + + // Verify we got the correct data + assert_eq!(result.len(), 1); + let batch = &result[0]; + + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 4); + + // The id column MUST contain actual values from the Parquet file [1, 5, 9, 13], + // NOT the constant partition value 1 + let id_col = batch + .column(0) + .as_primitive::<arrow_array::types::Int32Type>(); + assert_eq!(id_col.value(0), 1); + assert_eq!(id_col.value(1), 5); + assert_eq!(id_col.value(2), 9); + assert_eq!(id_col.value(3), 13); + + let name_col = batch.column(1).as_string::<i32>(); + assert_eq!(name_col.value(0), "Alice"); + assert_eq!(name_col.value(1), "Bob"); + assert_eq!(name_col.value(2), "Charlie"); + assert_eq!(name_col.value(3), "Dave"); + } } diff --cc crates/iceberg/src/scan/context.rs index 0f39e1845,fe3f5c8f7..a3fbcb1f2 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@@ -133,7 -129,12 +133,14 @@@ impl ManifestEntryContext deletes, + // Include partition data and spec from manifest entry + partition: Some(self.manifest_entry.data_file.partition.clone()), + // TODO: Pass actual PartitionSpec through context chain for native flow + partition_spec: None, + // TODO: Extract name_mapping from table metadata property "schema.name-mapping.default" + name_mapping: None, ++ + limit: self.limit, }) } } diff --cc crates/iceberg/src/scan/mod.rs index e9055bdd6,3e319ca06..b2a81249f --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@@ -1914,7 -1777,9 +1911,10 @@@ pub mod tests record_count: Some(100), data_file_format: DataFileFormat::Parquet, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, + limit: None, }; test_fn(task); @@@ -1929,7 -1794,9 +1929,10 @@@ record_count: None, data_file_format: DataFileFormat::Avro, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, + limit: None, }; test_fn(task); } diff --cc crates/iceberg/src/scan/task.rs index 17116ef0b,e1ef241a5..ef2136360 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@@ -55,8 -78,32 +78,35 @@@ pub struct FileScanTask /// The list of delete files that may need to be applied to this data file pub deletes: Vec<FileScanTaskDeleteFile>, + /// Partition data from the manifest entry, used to identify which columns can use + /// constant values from partition metadata vs. reading from the data file. + /// Per the Iceberg spec, only identity-transformed partition fields should use constants. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(serialize_with = "serialize_not_implemented")] + #[serde(deserialize_with = "deserialize_not_implemented")] + pub partition: Option<Struct>, + + /// The partition spec for this file, used to distinguish identity transforms + /// (which use partition metadata constants) from non-identity transforms like + /// bucket/truncate (which must read source columns from the data file). + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(serialize_with = "serialize_not_implemented")] + #[serde(deserialize_with = "deserialize_not_implemented")] + pub partition_spec: Option<Arc<PartitionSpec>>, + + /// Name mapping from table metadata (property: schema.name-mapping.default), + /// used to resolve field IDs from column names when Parquet files lack field IDs + /// or have field ID conflicts. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(serialize_with = "serialize_not_implemented")] + #[serde(deserialize_with = "deserialize_not_implemented")] + pub name_mapping: Option<Arc<NameMapping>>, ++ + /// Maximum number of records to return, None means no limit + pub limit: Option<usize>, } impl FileScanTask { diff --cc crates/integrations/datafusion/src/table/mod.rs index 80f070c01,8527668d6..f4bd40e43 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@@ -84,12 -96,138 +96,139 @@@ impl IcebergTableProvider }) } - /// Asynchronously tries to construct a new [`IcebergTableProvider`] - /// using the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation. + pub(crate) async fn metadata_table( + &self, + r#type: MetadataTableType, + ) -> Result<IcebergMetadataTableProvider> { + // Load fresh table metadata for metadata table access + let table = self.catalog.load_table(&self.table_ident).await?; + Ok(IcebergMetadataTableProvider { table, r#type }) + } + } + + #[async_trait] + impl TableProvider for IcebergTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> ArrowSchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec<usize>>, + filters: &[Expr], - _limit: Option<usize>, ++ limit: Option<usize>, + ) -> DFResult<Arc<dyn ExecutionPlan>> { + // Load fresh table metadata from catalog + let table = self + .catalog + .load_table(&self.table_ident) + .await + .map_err(to_datafusion_error)?; + + // Create scan with fresh metadata (always use current snapshot) + Ok(Arc::new(IcebergTableScan::new( + table, + None, // Always use current snapshot for catalog-backed provider + self.schema.clone(), + projection, + filters, ++ limit, + ))) + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> DFResult<Vec<TableProviderFilterPushDown>> { + // Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + } + + async fn insert_into( + &self, + state: &dyn Session, + input: Arc<dyn ExecutionPlan>, + _insert_op: InsertOp, + ) -> DFResult<Arc<dyn ExecutionPlan>> { + // Load fresh table metadata from catalog + let table = self + .catalog + .load_table(&self.table_ident) + .await + .map_err(to_datafusion_error)?; + + let partition_spec = table.metadata().default_partition_spec(); + + // Step 1: Project partition values for partitioned tables + let plan_with_partition = if !partition_spec.is_unpartitioned() { + project_with_partition(input, &table)? + } else { + input + }; + + // Step 2: Repartition for parallel processing + let target_partitions = + NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| { + DataFusionError::Configuration( + "target_partitions must be greater than 0".to_string(), + ) + })?; + + let repartitioned_plan = + repartition(plan_with_partition, table.metadata_ref(), target_partitions)?; + + let write_plan = Arc::new(IcebergWriteExec::new( + table.clone(), + repartitioned_plan, + self.schema.clone(), + )); + + // Merge the outputs of write_plan into one so we can commit all files together + let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan)); + + Ok(Arc::new(IcebergCommitExec::new( + table, + self.catalog.clone(), + coalesce_partitions, + self.schema.clone(), + ))) + } + } + + /// Static table provider for read-only snapshot access. + /// + /// This provider holds a cached table instance and does not refresh metadata or support + /// write operations. Use this for consistent analytical queries, time-travel scenarios, + /// or when you want to avoid catalog overhead. + /// + /// For catalog-backed tables with write support and automatic refresh, use + /// [`IcebergTableProvider`] instead. + #[derive(Debug, Clone)] + pub struct IcebergStaticTableProvider { + /// The static table instance (never refreshed) + table: Table, + /// Optional snapshot ID for this static view + snapshot_id: Option<i64>, + /// A reference-counted arrow `Schema` + schema: ArrowSchemaRef, + } + + impl IcebergStaticTableProvider { + /// Creates a static provider from a table instance. + /// + /// Uses the table's current snapshot for all queries. Does not support write operations. pub async fn try_new_from_table(table: Table) -> Result<Self> { let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); - Ok(IcebergTableProvider { + Ok(IcebergStaticTableProvider { table, snapshot_id: None, schema, @@@ -149,8 -280,9 +281,9 @@@ impl TableProvider for IcebergStaticTab _state: &dyn Session, projection: Option<&Vec<usize>>, filters: &[Expr], - _limit: Option<usize>, + limit: Option<usize>, ) -> DFResult<Arc<dyn ExecutionPlan>> { + // Use cached table (no refresh) Ok(Arc::new(IcebergTableScan::new( self.table.clone(), self.snapshot_id,
