This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 45c82dfa fix(reader): fix position delete bugs with row group skipping 
(#1806)
45c82dfa is described below

commit 45c82dfa9632228f183aab0d011955c6a356513c
Author: Matt Butrovich <[email protected]>
AuthorDate: Fri Oct 31 04:39:33 2025 -0400

    fix(reader): fix position delete bugs with row group skipping (#1806)
    
    ## Which issue does this PR close?
    
    Partially address #1749.
    
    ## What changes are included in this PR?
    
    This PR fixes two related correctness bugs in
    `ArrowReader::build_deletes_row_selection()` where position deletes
    targeting rows in skipped or skipped-to row groups were not being
    applied correctly.
    
    ### Background: How These Bugs Were Discovered
    
    While running Apache Spark + Apache Iceberg integration tests through
    DataFusion Comet, we discovered that the following tests were failing or
    hanging:
    - org.apache.iceberg.spark.extensions.TestMergeOnReadMerge
    - org.apache.iceberg.spark.extensions.TestMergeOnReadDelete
    - org.apache.iceberg.spark.extensions.TestMergeOnReadUpdate
    
    Investigation revealed that recent work to support Iceberg's file
    splitting feature (via `filter_row_groups_by_byte_range()`) exposed
    latent bugs in the position delete logic. While the byte range filtering
    code itself is correct, it exercises code paths that were previously
    untested, revealing these pre-existing issues.
    
    #### Bug 1: Missing base index increment when skipping row groups
    
    **The Issue:**
    
    When processing a Parquet file with multiple row groups, if a position
    delete targets a row in a later row group, the function would skip row
    groups without deletes but fail to increment
    `current_row_group_base_idx`. This caused the row index tracking to
    become desynchronized.
    
    **Example scenario:**
    - File with 2 row groups: rows 0-99 (group 0) and rows 100-199 (group 1)
    - Position delete targets row 199 (last row in group 1)
    - When processing group 0: delete (199) is beyond the group's range, so
    code hits `continue` at lines 469-471
    - BUG: `current_row_group_base_idx` is NOT incremented, stays at 0
    - When processing group 1: code thinks rows start at 0 instead of 100
    - Delete at position 199 is never applied (thinks file only has rows
    0-99)
    
    **The Fix:**
    
    Add `current_row_group_base_idx += row_group_num_rows` before the two
    `continue` statements at lines ~470 and ~481. This ensures row index
    tracking stays synchronized when skipping row groups.
    
    #### Bug 2: Stale cached delete index when skipping unselected row
    groups
    
    **The Issue:**
    
    When row group selection is active (e.g., via byte range filtering for
    file splits) and an unselected row group is skipped, the cached
    `next_deleted_row_idx_opt` variable can become stale, leading to either
    lost deletes or infinite loops depending on the scenario.
    
    The function maintains a cached value (`next_deleted_row_idx_opt`)
    containing the next delete to apply. When skipping unselected row
    groups, it calls
    `delete_vector_iter.advance_to(next_row_group_base_idx)` to position the
    iterator, but this doesn't automatically update the cached variable.
    
    **Two problematic scenarios:**
    
    1. Stale cache causes infinite loop (the bug we hit):
    - File with 2 row groups: rows 0-99 (group 0) and rows 100-199 (group 1)
      - Position delete at row 0 (in group 0)
      - Row group selection: read ONLY group 1
      - Initial state: `next_deleted_row_idx_opt = Some(0)` (cached)
      - Skip group 0: `advance_to(100)` positions iterator past delete at 0
      - BUG: cached value still `Some(0)` - STALE!
    - Process group 1: loop condition `0 < 200` is `true`, but `current_idx
    (100) != next_deleted_row_idx (0)`, so neither branch executes could
    result in infinite loop
    2. Unconditionally calling `next()` loses deletes:
    - File with 2 row groups: rows 0-99 (group 0) and rows 100-199 (group 1)
      - Position delete at row 199 (in group 1)
      - Row group selection: read ONLY group 1
    - Initial state: `next_deleted_row_idx_opt = Some(199)` (cached, already
    correct!)
    - Skip group 0: `advance_to(100)` - iterator already positioned
    correctly
      - If we call `next()`: BUG - consumes delete at 199, advancing past it
      - Process group 1: iterator exhausted, delete is lost
    
    **The Fix:**
    
    - If `cached value < next_row_group_base_idx` (stale), update it, thus
    avoiding infinite loop
    - If `cached value >= next_row_group_base_idx` (still valid), keep it,
    thus preserving delete
    
    ## Are these changes tested?
    
    Yes. This PR adds two comprehensive unit tests in reader.rs:
    
    1. `test_position_delete_across_multiple_row_groups` - Tests bug 1
    (missing base index increment)
    2. `test_position_delete_with_row_group_selection` - Tests bug 2
    scenario where delete is in selected group
    3. `test_position_delete_in_skipped_row_group` - Tests bug 2 scenario
    where delete is in skipped group (would hang without fix)
    
    Additionally, these fixes resolve failures in Iceberg Java's
    spark-extension tests when running with DataFusion Comet’s PR
    https://github.com/apache/datafusion-comet/pull/2528:
    - org.apache.iceberg.spark.extensions.TestMergeOnReadMerge
    - org.apache.iceberg.spark.extensions.TestMergeOnReadDelete
    - org.apache.iceberg.spark.extensions.TestMergeOnReadUpdate
---
 crates/iceberg/src/arrow/reader.rs | 617 ++++++++++++++++++++++++++++++++++++-
 1 file changed, 612 insertions(+), 5 deletions(-)

diff --git a/crates/iceberg/src/arrow/reader.rs 
b/crates/iceberg/src/arrow/reader.rs
index 3c18c033..e0894ad6 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -406,10 +406,17 @@ impl ArrowReader {
                     // selected row group
                     selected_row_groups_idx += 1;
                 } else {
-                    // remove any positional deletes from the skipped page so 
that
-                    // `positional.deletes.min()` can be used
+                    // Advance iterator past all deletes in the skipped row 
group.
+                    // advance_to() positions the iterator to the first delete 
>= next_row_group_base_idx.
+                    // However, if our cached next_deleted_row_idx_opt is in 
the skipped range,
+                    // we need to call next() to update the cache with the 
newly positioned value.
                     delete_vector_iter.advance_to(next_row_group_base_idx);
-                    next_deleted_row_idx_opt = delete_vector_iter.next();
+                    // Only update the cache if the cached value is stale (in 
the skipped range)
+                    if let Some(cached_idx) = next_deleted_row_idx_opt {
+                        if cached_idx < next_row_group_base_idx {
+                            next_deleted_row_idx_opt = 
delete_vector_iter.next();
+                        }
+                    }
 
                     // still increment the current page base index but then 
skip to the next row group
                     // in the file
@@ -424,6 +431,7 @@ impl ArrowReader {
                     // the remainder of this row group and skip to the next 
row group
                     if next_deleted_row_idx >= next_row_group_base_idx {
                         results.push(RowSelector::select(row_group_num_rows as 
usize));
+                        current_row_group_base_idx += row_group_num_rows;
                         continue;
                     }
 
@@ -433,6 +441,7 @@ impl ArrowReader {
                 // If there are no more pos deletes, add a selector for the 
entirety of this row group.
                 _ => {
                     results.push(RowSelector::select(row_group_num_rows as 
usize));
+                    current_row_group_base_idx += row_group_num_rows;
                     continue;
                 }
             };
@@ -1496,8 +1505,10 @@ mod tests {
     use crate::expr::visitors::bound_predicate_visitor::visit;
     use crate::expr::{Bind, Predicate, Reference};
     use crate::io::FileIO;
-    use crate::scan::{FileScanTask, FileScanTaskStream};
-    use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, 
Schema, SchemaRef, Type};
+    use crate::scan::{FileScanTask, FileScanTaskDeleteFile, 
FileScanTaskStream};
+    use crate::spec::{
+        DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, 
Schema, SchemaRef, Type,
+    };
 
     fn table_schema_simple() -> SchemaRef {
         Arc::new(
@@ -2280,4 +2291,600 @@ message schema {
         assert!(col_b.is_null(1));
         assert!(col_b.is_null(2));
     }
+
+    /// Test for bug where position deletes in later row groups are not 
applied correctly.
+    ///
+    /// When a file has multiple row groups and a position delete targets a 
row in a later
+    /// row group, the `build_deletes_row_selection` function had a bug where 
it would
+    /// fail to increment `current_row_group_base_idx` when skipping row 
groups.
+    ///
+    /// This test creates:
+    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
+    /// - A position delete file that deletes row 199 (last row in second row 
group)
+    ///
+    /// Expected behavior: Should return 199 rows (with id=200 deleted)
+    /// Bug behavior: Returns 200 rows (delete is not applied)
+    ///
+    /// This bug was discovered while running Apache Spark + Apache Iceberg 
integration tests
+    /// through DataFusion Comet. The following Iceberg Java tests failed due 
to this bug:
+    /// - 
`org.apache.iceberg.spark.extensions.TestMergeOnReadDelete::testDeleteWithMultipleRowGroupsParquet`
+    /// - 
`org.apache.iceberg.spark.extensions.TestMergeOnReadUpdate::testUpdateWithMultipleRowGroupsParquet`
+    #[tokio::test]
+    async fn test_position_delete_across_multiple_row_groups() {
+        use arrow_array::{Int32Array, Int64Array};
+        use parquet::file::reader::{FileReader, SerializedFileReader};
+
+        // Field IDs for positional delete schema
+        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
+        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
+
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().to_str().unwrap().to_string();
+
+        // Create table schema with a single 'id' column
+        let table_schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("id", DataType::Int32, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "1".to_string(),
+            )])),
+        ]));
+
+        // Step 1: Create data file with 200 rows in 2 row groups
+        // Row group 0: rows 0-99 (ids 1-100)
+        // Row group 1: rows 100-199 (ids 101-200)
+        let data_file_path = format!("{}/data.parquet", &table_location);
+
+        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
+            Int32Array::from_iter_values(1..=100),
+        )])
+        .unwrap();
+
+        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
+            Int32Array::from_iter_values(101..=200),
+        )])
+        .unwrap();
+
+        // Force each batch into its own row group
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .set_max_row_group_size(100)
+            .build();
+
+        let file = File::create(&data_file_path).unwrap();
+        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), 
Some(props)).unwrap();
+        writer.write(&batch1).expect("Writing batch 1");
+        writer.write(&batch2).expect("Writing batch 2");
+        writer.close().unwrap();
+
+        // Verify we created 2 row groups
+        let verify_file = File::open(&data_file_path).unwrap();
+        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
+        assert_eq!(
+            verify_reader.metadata().num_row_groups(),
+            2,
+            "Should have 2 row groups"
+        );
+
+        // Step 2: Create position delete file that deletes row 199 (id=200, 
last row in row group 1)
+        let delete_file_path = format!("{}/deletes.parquet", &table_location);
+
+        let delete_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("file_path", DataType::Utf8, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
+            )])),
+            Field::new("pos", DataType::Int64, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
+            )])),
+        ]));
+
+        // Delete row at position 199 (0-indexed, so it's the last row: id=200)
+        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
+            
Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
+            Arc::new(Int64Array::from_iter_values(vec![199i64])),
+        ])
+        .unwrap();
+
+        let delete_props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .build();
+
+        let delete_file = File::create(&delete_file_path).unwrap();
+        let mut delete_writer =
+            ArrowWriter::try_new(delete_file, delete_schema, 
Some(delete_props)).unwrap();
+        delete_writer.write(&delete_batch).unwrap();
+        delete_writer.close().unwrap();
+
+        // Step 3: Read the data file with the delete applied
+        let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+        let reader = ArrowReaderBuilder::new(file_io).build();
+
+        let task = FileScanTask {
+            start: 0,
+            length: 0,
+            record_count: Some(200),
+            data_file_path: data_file_path.clone(),
+            data_file_format: DataFileFormat::Parquet,
+            schema: table_schema.clone(),
+            project_field_ids: vec![1],
+            predicate: None,
+            deletes: vec![FileScanTaskDeleteFile {
+                file_path: delete_file_path,
+                file_type: DataContentType::PositionDeletes,
+                partition_spec_id: 0,
+                equality_ids: None,
+            }],
+        };
+
+        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as 
FileScanTaskStream;
+        let result = reader
+            .read(tasks)
+            .unwrap()
+            .try_collect::<Vec<RecordBatch>>()
+            .await
+            .unwrap();
+
+        // Step 4: Verify we got 199 rows (not 200)
+        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
+
+        println!("Total rows read: {}", total_rows);
+        println!("Expected: 199 rows (deleted row 199 which had id=200)");
+
+        // This assertion will FAIL before the fix and PASS after the fix
+        assert_eq!(
+            total_rows, 199,
+            "Expected 199 rows after deleting row 199, but got {} rows. \
+             The bug causes position deletes in later row groups to be 
ignored.",
+            total_rows
+        );
+
+        // Verify the deleted row (id=200) is not present
+        let all_ids: Vec<i32> = result
+            .iter()
+            .flat_map(|batch| {
+                batch
+                    .column(0)
+                    .as_primitive::<arrow_array::types::Int32Type>()
+                    .values()
+                    .iter()
+                    .copied()
+            })
+            .collect();
+
+        assert!(
+            !all_ids.contains(&200),
+            "Row with id=200 should be deleted but was found in results"
+        );
+
+        // Verify we have all other ids (1-199)
+        let expected_ids: Vec<i32> = (1..=199).collect();
+        assert_eq!(
+            all_ids, expected_ids,
+            "Should have ids 1-199 but got different values"
+        );
+    }
+
+    /// Test for bug where position deletes are lost when skipping unselected 
row groups.
+    ///
+    /// This is a variant of `test_position_delete_across_multiple_row_groups` 
that exercises
+    /// the row group selection code path (`selected_row_groups: Some([...])`).
+    ///
+    /// When a file has multiple row groups and only some are selected for 
reading,
+    /// the `build_deletes_row_selection` function must correctly skip over 
deletes in
+    /// unselected row groups WITHOUT consuming deletes that belong to 
selected row groups.
+    ///
+    /// This test creates:
+    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
+    /// - A position delete file that deletes row 199 (last row in second row 
group)
+    /// - Row group selection that reads ONLY row group 1 (rows 100-199)
+    ///
+    /// Expected behavior: Should return 99 rows (with row 199 deleted)
+    /// Bug behavior: Returns 100 rows (delete is lost when skipping row group 
0)
+    ///
+    /// The bug occurs when processing row group 0 (unselected):
+    /// ```rust
+    /// delete_vector_iter.advance_to(next_row_group_base_idx); // Position at 
first delete >= 100
+    /// next_deleted_row_idx_opt = delete_vector_iter.next(); // BUG: Consumes 
delete at 199!
+    /// ```
+    ///
+    /// The fix is to NOT call `next()` after `advance_to()` when skipping 
unselected row groups,
+    /// because `advance_to()` already positions the iterator correctly 
without consuming elements.
+    #[tokio::test]
+    async fn test_position_delete_with_row_group_selection() {
+        use arrow_array::{Int32Array, Int64Array};
+        use parquet::file::reader::{FileReader, SerializedFileReader};
+
+        // Field IDs for positional delete schema
+        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
+        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
+
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().to_str().unwrap().to_string();
+
+        // Create table schema with a single 'id' column
+        let table_schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("id", DataType::Int32, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "1".to_string(),
+            )])),
+        ]));
+
+        // Step 1: Create data file with 200 rows in 2 row groups
+        // Row group 0: rows 0-99 (ids 1-100)
+        // Row group 1: rows 100-199 (ids 101-200)
+        let data_file_path = format!("{}/data.parquet", &table_location);
+
+        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
+            Int32Array::from_iter_values(1..=100),
+        )])
+        .unwrap();
+
+        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
+            Int32Array::from_iter_values(101..=200),
+        )])
+        .unwrap();
+
+        // Force each batch into its own row group
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .set_max_row_group_size(100)
+            .build();
+
+        let file = File::create(&data_file_path).unwrap();
+        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), 
Some(props)).unwrap();
+        writer.write(&batch1).expect("Writing batch 1");
+        writer.write(&batch2).expect("Writing batch 2");
+        writer.close().unwrap();
+
+        // Verify we created 2 row groups
+        let verify_file = File::open(&data_file_path).unwrap();
+        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
+        assert_eq!(
+            verify_reader.metadata().num_row_groups(),
+            2,
+            "Should have 2 row groups"
+        );
+
+        // Step 2: Create position delete file that deletes row 199 (id=200, 
last row in row group 1)
+        let delete_file_path = format!("{}/deletes.parquet", &table_location);
+
+        let delete_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("file_path", DataType::Utf8, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
+            )])),
+            Field::new("pos", DataType::Int64, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
+            )])),
+        ]));
+
+        // Delete row at position 199 (0-indexed, so it's the last row: id=200)
+        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
+            
Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
+            Arc::new(Int64Array::from_iter_values(vec![199i64])),
+        ])
+        .unwrap();
+
+        let delete_props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .build();
+
+        let delete_file = File::create(&delete_file_path).unwrap();
+        let mut delete_writer =
+            ArrowWriter::try_new(delete_file, delete_schema, 
Some(delete_props)).unwrap();
+        delete_writer.write(&delete_batch).unwrap();
+        delete_writer.close().unwrap();
+
+        // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199)
+        // This exercises the row group selection code path where row group 0 
is skipped
+        let metadata_file = File::open(&data_file_path).unwrap();
+        let metadata_reader = 
SerializedFileReader::new(metadata_file).unwrap();
+        let metadata = metadata_reader.metadata();
+
+        let row_group_0 = metadata.row_group(0);
+        let row_group_1 = metadata.row_group(1);
+
+        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
+        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
+        let rg1_length = row_group_1.compressed_size() as u64;
+
+        println!(
+            "Row group 0: starts at byte {}, {} bytes compressed",
+            rg0_start,
+            row_group_0.compressed_size()
+        );
+        println!(
+            "Row group 1: starts at byte {}, {} bytes compressed",
+            rg1_start,
+            row_group_1.compressed_size()
+        );
+
+        let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+        let reader = ArrowReaderBuilder::new(file_io).build();
+
+        // Create FileScanTask that reads ONLY row group 1 via byte range 
filtering
+        let task = FileScanTask {
+            start: rg1_start,
+            length: rg1_length,
+            record_count: Some(100), // Row group 1 has 100 rows
+            data_file_path: data_file_path.clone(),
+            data_file_format: DataFileFormat::Parquet,
+            schema: table_schema.clone(),
+            project_field_ids: vec![1],
+            predicate: None,
+            deletes: vec![FileScanTaskDeleteFile {
+                file_path: delete_file_path,
+                file_type: DataContentType::PositionDeletes,
+                partition_spec_id: 0,
+                equality_ids: None,
+            }],
+        };
+
+        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as 
FileScanTaskStream;
+        let result = reader
+            .read(tasks)
+            .unwrap()
+            .try_collect::<Vec<RecordBatch>>()
+            .await
+            .unwrap();
+
+        // Step 4: Verify we got 99 rows (not 100)
+        // Row group 1 has 100 rows (ids 101-200), minus 1 delete (id=200) = 
99 rows
+        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
+
+        println!("Total rows read from row group 1: {}", total_rows);
+        println!("Expected: 99 rows (row group 1 has 100 rows, 1 delete at 
position 199)");
+
+        // This assertion will FAIL before the fix and PASS after the fix
+        assert_eq!(
+            total_rows, 99,
+            "Expected 99 rows from row group 1 after deleting position 199, 
but got {} rows. \
+             The bug causes position deletes to be lost when advance_to() is 
followed by next() \
+             when skipping unselected row groups.",
+            total_rows
+        );
+
+        // Verify the deleted row (id=200) is not present
+        let all_ids: Vec<i32> = result
+            .iter()
+            .flat_map(|batch| {
+                batch
+                    .column(0)
+                    .as_primitive::<arrow_array::types::Int32Type>()
+                    .values()
+                    .iter()
+                    .copied()
+            })
+            .collect();
+
+        assert!(
+            !all_ids.contains(&200),
+            "Row with id=200 should be deleted but was found in results"
+        );
+
+        // Verify we have ids 101-199 (not 101-200)
+        let expected_ids: Vec<i32> = (101..=199).collect();
+        assert_eq!(
+            all_ids, expected_ids,
+            "Should have ids 101-199 but got different values"
+        );
+    }
+    /// Test for bug where stale cached delete causes infinite loop when 
skipping row groups.
+    ///
+    /// This test exposes the inverse scenario of 
`test_position_delete_with_row_group_selection`:
+    /// - Position delete targets a row in the SKIPPED row group (not the 
selected one)
+    /// - After calling advance_to(), the cached delete index is stale
+    /// - Without updating the cache, the code enters an infinite loop
+    ///
+    /// This test creates:
+    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
+    /// - A position delete file that deletes row 0 (first row in SKIPPED row 
group 0)
+    /// - Row group selection that reads ONLY row group 1 (rows 100-199)
+    ///
+    /// The bug occurs when skipping row group 0:
+    /// ```rust
+    /// let mut next_deleted_row_idx_opt = delete_vector_iter.next(); // 
Some(0)
+    /// // ... skip to row group 1 ...
+    /// delete_vector_iter.advance_to(100); // Iterator advances past delete 
at 0
+    /// // BUG: next_deleted_row_idx_opt is still Some(0) - STALE!
+    /// // When processing row group 1:
+    /// //   current_idx = 100, next_deleted_row_idx = 0, 
next_row_group_base_idx = 200
+    /// //   Loop condition: 0 < 200 (true)
+    /// //   But: current_idx (100) > next_deleted_row_idx (0)
+    /// //   And: current_idx (100) != next_deleted_row_idx (0)
+    /// //   Neither branch executes -> INFINITE LOOP!
+    /// ```
+    ///
+    /// Expected behavior: Should return 100 rows (delete at 0 doesn't affect 
row group 1)
+    /// Bug behavior: Infinite loop in build_deletes_row_selection
+    #[tokio::test]
+    async fn test_position_delete_in_skipped_row_group() {
+        use arrow_array::{Int32Array, Int64Array};
+        use parquet::file::reader::{FileReader, SerializedFileReader};
+
+        // Field IDs for positional delete schema
+        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
+        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
+
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().to_str().unwrap().to_string();
+
+        // Create table schema with a single 'id' column
+        let table_schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("id", DataType::Int32, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "1".to_string(),
+            )])),
+        ]));
+
+        // Step 1: Create data file with 200 rows in 2 row groups
+        // Row group 0: rows 0-99 (ids 1-100)
+        // Row group 1: rows 100-199 (ids 101-200)
+        let data_file_path = format!("{}/data.parquet", &table_location);
+
+        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
+            Int32Array::from_iter_values(1..=100),
+        )])
+        .unwrap();
+
+        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
+            Int32Array::from_iter_values(101..=200),
+        )])
+        .unwrap();
+
+        // Force each batch into its own row group
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .set_max_row_group_size(100)
+            .build();
+
+        let file = File::create(&data_file_path).unwrap();
+        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), 
Some(props)).unwrap();
+        writer.write(&batch1).expect("Writing batch 1");
+        writer.write(&batch2).expect("Writing batch 2");
+        writer.close().unwrap();
+
+        // Verify we created 2 row groups
+        let verify_file = File::open(&data_file_path).unwrap();
+        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
+        assert_eq!(
+            verify_reader.metadata().num_row_groups(),
+            2,
+            "Should have 2 row groups"
+        );
+
+        // Step 2: Create position delete file that deletes row 0 (id=1, first 
row in row group 0)
+        let delete_file_path = format!("{}/deletes.parquet", &table_location);
+
+        let delete_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("file_path", DataType::Utf8, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
+            )])),
+            Field::new("pos", DataType::Int64, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
+            )])),
+        ]));
+
+        // Delete row at position 0 (0-indexed, so it's the first row: id=1)
+        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
+            
Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
+            Arc::new(Int64Array::from_iter_values(vec![0i64])),
+        ])
+        .unwrap();
+
+        let delete_props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .build();
+
+        let delete_file = File::create(&delete_file_path).unwrap();
+        let mut delete_writer =
+            ArrowWriter::try_new(delete_file, delete_schema, 
Some(delete_props)).unwrap();
+        delete_writer.write(&delete_batch).unwrap();
+        delete_writer.close().unwrap();
+
+        // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199)
+        // This exercises the row group selection code path where row group 0 
is skipped
+        let metadata_file = File::open(&data_file_path).unwrap();
+        let metadata_reader = 
SerializedFileReader::new(metadata_file).unwrap();
+        let metadata = metadata_reader.metadata();
+
+        let row_group_0 = metadata.row_group(0);
+        let row_group_1 = metadata.row_group(1);
+
+        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
+        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
+        let rg1_length = row_group_1.compressed_size() as u64;
+
+        let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+        let reader = ArrowReaderBuilder::new(file_io).build();
+
+        // Create FileScanTask that reads ONLY row group 1 via byte range 
filtering
+        let task = FileScanTask {
+            start: rg1_start,
+            length: rg1_length,
+            record_count: Some(100), // Row group 1 has 100 rows
+            data_file_path: data_file_path.clone(),
+            data_file_format: DataFileFormat::Parquet,
+            schema: table_schema.clone(),
+            project_field_ids: vec![1],
+            predicate: None,
+            deletes: vec![FileScanTaskDeleteFile {
+                file_path: delete_file_path,
+                file_type: DataContentType::PositionDeletes,
+                partition_spec_id: 0,
+                equality_ids: None,
+            }],
+        };
+
+        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as 
FileScanTaskStream;
+        let result = reader
+            .read(tasks)
+            .unwrap()
+            .try_collect::<Vec<RecordBatch>>()
+            .await
+            .unwrap();
+
+        // Step 4: Verify we got 100 rows (all of row group 1)
+        // The delete at position 0 is in row group 0, which is skipped, so it 
doesn't affect us
+        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
+
+        assert_eq!(
+            total_rows, 100,
+            "Expected 100 rows from row group 1 (delete at position 0 is in 
skipped row group 0). \
+             If this hangs or fails, it indicates the cached delete index was 
not updated after advance_to()."
+        );
+
+        // Verify we have all ids from row group 1 (101-200)
+        let all_ids: Vec<i32> = result
+            .iter()
+            .flat_map(|batch| {
+                batch
+                    .column(0)
+                    .as_primitive::<arrow_array::types::Int32Type>()
+                    .values()
+                    .iter()
+                    .copied()
+            })
+            .collect();
+
+        let expected_ids: Vec<i32> = (101..=200).collect();
+        assert_eq!(
+            all_ids, expected_ids,
+            "Should have ids 101-200 (all of row group 1)"
+        );
+    }
 }

Reply via email to