This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 45c82dfa fix(reader): fix position delete bugs with row group skipping
(#1806)
45c82dfa is described below
commit 45c82dfa9632228f183aab0d011955c6a356513c
Author: Matt Butrovich <[email protected]>
AuthorDate: Fri Oct 31 04:39:33 2025 -0400
fix(reader): fix position delete bugs with row group skipping (#1806)
## Which issue does this PR close?
Partially address #1749.
## What changes are included in this PR?
This PR fixes two related correctness bugs in
`ArrowReader::build_deletes_row_selection()` where position deletes
targeting rows in skipped or skipped-to row groups were not being
applied correctly.
### Background: How These Bugs Were Discovered
While running Apache Spark + Apache Iceberg integration tests through
DataFusion Comet, we discovered that the following tests were failing or
hanging:
- org.apache.iceberg.spark.extensions.TestMergeOnReadMerge
- org.apache.iceberg.spark.extensions.TestMergeOnReadDelete
- org.apache.iceberg.spark.extensions.TestMergeOnReadUpdate
Investigation revealed that recent work to support Iceberg's file
splitting feature (via `filter_row_groups_by_byte_range()`) exposed
latent bugs in the position delete logic. While the byte range filtering
code itself is correct, it exercises code paths that were previously
untested, revealing these pre-existing issues.
#### Bug 1: Missing base index increment when skipping row groups
**The Issue:**
When processing a Parquet file with multiple row groups, if a position
delete targets a row in a later row group, the function would skip row
groups without deletes but fail to increment
`current_row_group_base_idx`. This caused the row index tracking to
become desynchronized.
**Example scenario:**
- File with 2 row groups: rows 0-99 (group 0) and rows 100-199 (group 1)
- Position delete targets row 199 (last row in group 1)
- When processing group 0: delete (199) is beyond the group's range, so
code hits `continue` at lines 469-471
- BUG: `current_row_group_base_idx` is NOT incremented, stays at 0
- When processing group 1: code thinks rows start at 0 instead of 100
- Delete at position 199 is never applied (thinks file only has rows
0-99)
**The Fix:**
Add `current_row_group_base_idx += row_group_num_rows` before the two
`continue` statements at lines ~470 and ~481. This ensures row index
tracking stays synchronized when skipping row groups.
#### Bug 2: Stale cached delete index when skipping unselected row
groups
**The Issue:**
When row group selection is active (e.g., via byte range filtering for
file splits) and an unselected row group is skipped, the cached
`next_deleted_row_idx_opt` variable can become stale, leading to either
lost deletes or infinite loops depending on the scenario.
The function maintains a cached value (`next_deleted_row_idx_opt`)
containing the next delete to apply. When skipping unselected row
groups, it calls
`delete_vector_iter.advance_to(next_row_group_base_idx)` to position the
iterator, but this doesn't automatically update the cached variable.
**Two problematic scenarios:**
1. Stale cache causes infinite loop (the bug we hit):
- File with 2 row groups: rows 0-99 (group 0) and rows 100-199 (group 1)
- Position delete at row 0 (in group 0)
- Row group selection: read ONLY group 1
- Initial state: `next_deleted_row_idx_opt = Some(0)` (cached)
- Skip group 0: `advance_to(100)` positions iterator past delete at 0
- BUG: cached value still `Some(0)` - STALE!
- Process group 1: loop condition `0 < 200` is `true`, but `current_idx
(100) != next_deleted_row_idx (0)`, so neither branch executes could
result in infinite loop
2. Unconditionally calling `next()` loses deletes:
- File with 2 row groups: rows 0-99 (group 0) and rows 100-199 (group 1)
- Position delete at row 199 (in group 1)
- Row group selection: read ONLY group 1
- Initial state: `next_deleted_row_idx_opt = Some(199)` (cached, already
correct!)
- Skip group 0: `advance_to(100)` - iterator already positioned
correctly
- If we call `next()`: BUG - consumes delete at 199, advancing past it
- Process group 1: iterator exhausted, delete is lost
**The Fix:**
- If `cached value < next_row_group_base_idx` (stale), update it, thus
avoiding infinite loop
- If `cached value >= next_row_group_base_idx` (still valid), keep it,
thus preserving delete
## Are these changes tested?
Yes. This PR adds two comprehensive unit tests in reader.rs:
1. `test_position_delete_across_multiple_row_groups` - Tests bug 1
(missing base index increment)
2. `test_position_delete_with_row_group_selection` - Tests bug 2
scenario where delete is in selected group
3. `test_position_delete_in_skipped_row_group` - Tests bug 2 scenario
where delete is in skipped group (would hang without fix)
Additionally, these fixes resolve failures in Iceberg Java's
spark-extension tests when running with DataFusion Comet’s PR
https://github.com/apache/datafusion-comet/pull/2528:
- org.apache.iceberg.spark.extensions.TestMergeOnReadMerge
- org.apache.iceberg.spark.extensions.TestMergeOnReadDelete
- org.apache.iceberg.spark.extensions.TestMergeOnReadUpdate
---
crates/iceberg/src/arrow/reader.rs | 617 ++++++++++++++++++++++++++++++++++++-
1 file changed, 612 insertions(+), 5 deletions(-)
diff --git a/crates/iceberg/src/arrow/reader.rs
b/crates/iceberg/src/arrow/reader.rs
index 3c18c033..e0894ad6 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -406,10 +406,17 @@ impl ArrowReader {
// selected row group
selected_row_groups_idx += 1;
} else {
- // remove any positional deletes from the skipped page so
that
- // `positional.deletes.min()` can be used
+ // Advance iterator past all deletes in the skipped row
group.
+ // advance_to() positions the iterator to the first delete
>= next_row_group_base_idx.
+ // However, if our cached next_deleted_row_idx_opt is in
the skipped range,
+ // we need to call next() to update the cache with the
newly positioned value.
delete_vector_iter.advance_to(next_row_group_base_idx);
- next_deleted_row_idx_opt = delete_vector_iter.next();
+ // Only update the cache if the cached value is stale (in
the skipped range)
+ if let Some(cached_idx) = next_deleted_row_idx_opt {
+ if cached_idx < next_row_group_base_idx {
+ next_deleted_row_idx_opt =
delete_vector_iter.next();
+ }
+ }
// still increment the current page base index but then
skip to the next row group
// in the file
@@ -424,6 +431,7 @@ impl ArrowReader {
// the remainder of this row group and skip to the next
row group
if next_deleted_row_idx >= next_row_group_base_idx {
results.push(RowSelector::select(row_group_num_rows as
usize));
+ current_row_group_base_idx += row_group_num_rows;
continue;
}
@@ -433,6 +441,7 @@ impl ArrowReader {
// If there are no more pos deletes, add a selector for the
entirety of this row group.
_ => {
results.push(RowSelector::select(row_group_num_rows as
usize));
+ current_row_group_base_idx += row_group_num_rows;
continue;
}
};
@@ -1496,8 +1505,10 @@ mod tests {
use crate::expr::visitors::bound_predicate_visitor::visit;
use crate::expr::{Bind, Predicate, Reference};
use crate::io::FileIO;
- use crate::scan::{FileScanTask, FileScanTaskStream};
- use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType,
Schema, SchemaRef, Type};
+ use crate::scan::{FileScanTask, FileScanTaskDeleteFile,
FileScanTaskStream};
+ use crate::spec::{
+ DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType,
Schema, SchemaRef, Type,
+ };
fn table_schema_simple() -> SchemaRef {
Arc::new(
@@ -2280,4 +2291,600 @@ message schema {
assert!(col_b.is_null(1));
assert!(col_b.is_null(2));
}
+
+ /// Test for bug where position deletes in later row groups are not
applied correctly.
+ ///
+ /// When a file has multiple row groups and a position delete targets a
row in a later
+ /// row group, the `build_deletes_row_selection` function had a bug where
it would
+ /// fail to increment `current_row_group_base_idx` when skipping row
groups.
+ ///
+ /// This test creates:
+ /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
+ /// - A position delete file that deletes row 199 (last row in second row
group)
+ ///
+ /// Expected behavior: Should return 199 rows (with id=200 deleted)
+ /// Bug behavior: Returns 200 rows (delete is not applied)
+ ///
+ /// This bug was discovered while running Apache Spark + Apache Iceberg
integration tests
+ /// through DataFusion Comet. The following Iceberg Java tests failed due
to this bug:
+ /// -
`org.apache.iceberg.spark.extensions.TestMergeOnReadDelete::testDeleteWithMultipleRowGroupsParquet`
+ /// -
`org.apache.iceberg.spark.extensions.TestMergeOnReadUpdate::testUpdateWithMultipleRowGroupsParquet`
+ #[tokio::test]
+ async fn test_position_delete_across_multiple_row_groups() {
+ use arrow_array::{Int32Array, Int64Array};
+ use parquet::file::reader::{FileReader, SerializedFileReader};
+
+ // Field IDs for positional delete schema
+ const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
+ const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
+
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().to_str().unwrap().to_string();
+
+ // Create table schema with a single 'id' column
+ let table_schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ ]));
+
+ // Step 1: Create data file with 200 rows in 2 row groups
+ // Row group 0: rows 0-99 (ids 1-100)
+ // Row group 1: rows 100-199 (ids 101-200)
+ let data_file_path = format!("{}/data.parquet", &table_location);
+
+ let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
+ Int32Array::from_iter_values(1..=100),
+ )])
+ .unwrap();
+
+ let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
+ Int32Array::from_iter_values(101..=200),
+ )])
+ .unwrap();
+
+ // Force each batch into its own row group
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .set_max_row_group_size(100)
+ .build();
+
+ let file = File::create(&data_file_path).unwrap();
+ let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(),
Some(props)).unwrap();
+ writer.write(&batch1).expect("Writing batch 1");
+ writer.write(&batch2).expect("Writing batch 2");
+ writer.close().unwrap();
+
+ // Verify we created 2 row groups
+ let verify_file = File::open(&data_file_path).unwrap();
+ let verify_reader = SerializedFileReader::new(verify_file).unwrap();
+ assert_eq!(
+ verify_reader.metadata().num_row_groups(),
+ 2,
+ "Should have 2 row groups"
+ );
+
+ // Step 2: Create position delete file that deletes row 199 (id=200,
last row in row group 1)
+ let delete_file_path = format!("{}/deletes.parquet", &table_location);
+
+ let delete_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("file_path", DataType::Utf8,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
+ )])),
+ Field::new("pos", DataType::Int64,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
+ )])),
+ ]));
+
+ // Delete row at position 199 (0-indexed, so it's the last row: id=200)
+ let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
+
Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
+ Arc::new(Int64Array::from_iter_values(vec![199i64])),
+ ])
+ .unwrap();
+
+ let delete_props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .build();
+
+ let delete_file = File::create(&delete_file_path).unwrap();
+ let mut delete_writer =
+ ArrowWriter::try_new(delete_file, delete_schema,
Some(delete_props)).unwrap();
+ delete_writer.write(&delete_batch).unwrap();
+ delete_writer.close().unwrap();
+
+ // Step 3: Read the data file with the delete applied
+ let file_io =
FileIO::from_path(&table_location).unwrap().build().unwrap();
+ let reader = ArrowReaderBuilder::new(file_io).build();
+
+ let task = FileScanTask {
+ start: 0,
+ length: 0,
+ record_count: Some(200),
+ data_file_path: data_file_path.clone(),
+ data_file_format: DataFileFormat::Parquet,
+ schema: table_schema.clone(),
+ project_field_ids: vec![1],
+ predicate: None,
+ deletes: vec![FileScanTaskDeleteFile {
+ file_path: delete_file_path,
+ file_type: DataContentType::PositionDeletes,
+ partition_spec_id: 0,
+ equality_ids: None,
+ }],
+ };
+
+ let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as
FileScanTaskStream;
+ let result = reader
+ .read(tasks)
+ .unwrap()
+ .try_collect::<Vec<RecordBatch>>()
+ .await
+ .unwrap();
+
+ // Step 4: Verify we got 199 rows (not 200)
+ let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
+
+ println!("Total rows read: {}", total_rows);
+ println!("Expected: 199 rows (deleted row 199 which had id=200)");
+
+ // This assertion will FAIL before the fix and PASS after the fix
+ assert_eq!(
+ total_rows, 199,
+ "Expected 199 rows after deleting row 199, but got {} rows. \
+ The bug causes position deletes in later row groups to be
ignored.",
+ total_rows
+ );
+
+ // Verify the deleted row (id=200) is not present
+ let all_ids: Vec<i32> = result
+ .iter()
+ .flat_map(|batch| {
+ batch
+ .column(0)
+ .as_primitive::<arrow_array::types::Int32Type>()
+ .values()
+ .iter()
+ .copied()
+ })
+ .collect();
+
+ assert!(
+ !all_ids.contains(&200),
+ "Row with id=200 should be deleted but was found in results"
+ );
+
+ // Verify we have all other ids (1-199)
+ let expected_ids: Vec<i32> = (1..=199).collect();
+ assert_eq!(
+ all_ids, expected_ids,
+ "Should have ids 1-199 but got different values"
+ );
+ }
+
+ /// Test for bug where position deletes are lost when skipping unselected
row groups.
+ ///
+ /// This is a variant of `test_position_delete_across_multiple_row_groups`
that exercises
+ /// the row group selection code path (`selected_row_groups: Some([...])`).
+ ///
+ /// When a file has multiple row groups and only some are selected for
reading,
+ /// the `build_deletes_row_selection` function must correctly skip over
deletes in
+ /// unselected row groups WITHOUT consuming deletes that belong to
selected row groups.
+ ///
+ /// This test creates:
+ /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
+ /// - A position delete file that deletes row 199 (last row in second row
group)
+ /// - Row group selection that reads ONLY row group 1 (rows 100-199)
+ ///
+ /// Expected behavior: Should return 99 rows (with row 199 deleted)
+ /// Bug behavior: Returns 100 rows (delete is lost when skipping row group
0)
+ ///
+ /// The bug occurs when processing row group 0 (unselected):
+ /// ```rust
+ /// delete_vector_iter.advance_to(next_row_group_base_idx); // Position at
first delete >= 100
+ /// next_deleted_row_idx_opt = delete_vector_iter.next(); // BUG: Consumes
delete at 199!
+ /// ```
+ ///
+ /// The fix is to NOT call `next()` after `advance_to()` when skipping
unselected row groups,
+ /// because `advance_to()` already positions the iterator correctly
without consuming elements.
+ #[tokio::test]
+ async fn test_position_delete_with_row_group_selection() {
+ use arrow_array::{Int32Array, Int64Array};
+ use parquet::file::reader::{FileReader, SerializedFileReader};
+
+ // Field IDs for positional delete schema
+ const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
+ const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
+
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().to_str().unwrap().to_string();
+
+ // Create table schema with a single 'id' column
+ let table_schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ ]));
+
+ // Step 1: Create data file with 200 rows in 2 row groups
+ // Row group 0: rows 0-99 (ids 1-100)
+ // Row group 1: rows 100-199 (ids 101-200)
+ let data_file_path = format!("{}/data.parquet", &table_location);
+
+ let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
+ Int32Array::from_iter_values(1..=100),
+ )])
+ .unwrap();
+
+ let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
+ Int32Array::from_iter_values(101..=200),
+ )])
+ .unwrap();
+
+ // Force each batch into its own row group
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .set_max_row_group_size(100)
+ .build();
+
+ let file = File::create(&data_file_path).unwrap();
+ let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(),
Some(props)).unwrap();
+ writer.write(&batch1).expect("Writing batch 1");
+ writer.write(&batch2).expect("Writing batch 2");
+ writer.close().unwrap();
+
+ // Verify we created 2 row groups
+ let verify_file = File::open(&data_file_path).unwrap();
+ let verify_reader = SerializedFileReader::new(verify_file).unwrap();
+ assert_eq!(
+ verify_reader.metadata().num_row_groups(),
+ 2,
+ "Should have 2 row groups"
+ );
+
+ // Step 2: Create position delete file that deletes row 199 (id=200,
last row in row group 1)
+ let delete_file_path = format!("{}/deletes.parquet", &table_location);
+
+ let delete_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("file_path", DataType::Utf8,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
+ )])),
+ Field::new("pos", DataType::Int64,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
+ )])),
+ ]));
+
+ // Delete row at position 199 (0-indexed, so it's the last row: id=200)
+ let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
+
Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
+ Arc::new(Int64Array::from_iter_values(vec![199i64])),
+ ])
+ .unwrap();
+
+ let delete_props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .build();
+
+ let delete_file = File::create(&delete_file_path).unwrap();
+ let mut delete_writer =
+ ArrowWriter::try_new(delete_file, delete_schema,
Some(delete_props)).unwrap();
+ delete_writer.write(&delete_batch).unwrap();
+ delete_writer.close().unwrap();
+
+ // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199)
+ // This exercises the row group selection code path where row group 0
is skipped
+ let metadata_file = File::open(&data_file_path).unwrap();
+ let metadata_reader =
SerializedFileReader::new(metadata_file).unwrap();
+ let metadata = metadata_reader.metadata();
+
+ let row_group_0 = metadata.row_group(0);
+ let row_group_1 = metadata.row_group(1);
+
+ let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
+ let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
+ let rg1_length = row_group_1.compressed_size() as u64;
+
+ println!(
+ "Row group 0: starts at byte {}, {} bytes compressed",
+ rg0_start,
+ row_group_0.compressed_size()
+ );
+ println!(
+ "Row group 1: starts at byte {}, {} bytes compressed",
+ rg1_start,
+ row_group_1.compressed_size()
+ );
+
+ let file_io =
FileIO::from_path(&table_location).unwrap().build().unwrap();
+ let reader = ArrowReaderBuilder::new(file_io).build();
+
+ // Create FileScanTask that reads ONLY row group 1 via byte range
filtering
+ let task = FileScanTask {
+ start: rg1_start,
+ length: rg1_length,
+ record_count: Some(100), // Row group 1 has 100 rows
+ data_file_path: data_file_path.clone(),
+ data_file_format: DataFileFormat::Parquet,
+ schema: table_schema.clone(),
+ project_field_ids: vec![1],
+ predicate: None,
+ deletes: vec![FileScanTaskDeleteFile {
+ file_path: delete_file_path,
+ file_type: DataContentType::PositionDeletes,
+ partition_spec_id: 0,
+ equality_ids: None,
+ }],
+ };
+
+ let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as
FileScanTaskStream;
+ let result = reader
+ .read(tasks)
+ .unwrap()
+ .try_collect::<Vec<RecordBatch>>()
+ .await
+ .unwrap();
+
+ // Step 4: Verify we got 99 rows (not 100)
+ // Row group 1 has 100 rows (ids 101-200), minus 1 delete (id=200) =
99 rows
+ let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
+
+ println!("Total rows read from row group 1: {}", total_rows);
+ println!("Expected: 99 rows (row group 1 has 100 rows, 1 delete at
position 199)");
+
+ // This assertion will FAIL before the fix and PASS after the fix
+ assert_eq!(
+ total_rows, 99,
+ "Expected 99 rows from row group 1 after deleting position 199,
but got {} rows. \
+ The bug causes position deletes to be lost when advance_to() is
followed by next() \
+ when skipping unselected row groups.",
+ total_rows
+ );
+
+ // Verify the deleted row (id=200) is not present
+ let all_ids: Vec<i32> = result
+ .iter()
+ .flat_map(|batch| {
+ batch
+ .column(0)
+ .as_primitive::<arrow_array::types::Int32Type>()
+ .values()
+ .iter()
+ .copied()
+ })
+ .collect();
+
+ assert!(
+ !all_ids.contains(&200),
+ "Row with id=200 should be deleted but was found in results"
+ );
+
+ // Verify we have ids 101-199 (not 101-200)
+ let expected_ids: Vec<i32> = (101..=199).collect();
+ assert_eq!(
+ all_ids, expected_ids,
+ "Should have ids 101-199 but got different values"
+ );
+ }
+ /// Test for bug where stale cached delete causes infinite loop when
skipping row groups.
+ ///
+ /// This test exposes the inverse scenario of
`test_position_delete_with_row_group_selection`:
+ /// - Position delete targets a row in the SKIPPED row group (not the
selected one)
+ /// - After calling advance_to(), the cached delete index is stale
+ /// - Without updating the cache, the code enters an infinite loop
+ ///
+ /// This test creates:
+ /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
+ /// - A position delete file that deletes row 0 (first row in SKIPPED row
group 0)
+ /// - Row group selection that reads ONLY row group 1 (rows 100-199)
+ ///
+ /// The bug occurs when skipping row group 0:
+ /// ```rust
+ /// let mut next_deleted_row_idx_opt = delete_vector_iter.next(); //
Some(0)
+ /// // ... skip to row group 1 ...
+ /// delete_vector_iter.advance_to(100); // Iterator advances past delete
at 0
+ /// // BUG: next_deleted_row_idx_opt is still Some(0) - STALE!
+ /// // When processing row group 1:
+ /// // current_idx = 100, next_deleted_row_idx = 0,
next_row_group_base_idx = 200
+ /// // Loop condition: 0 < 200 (true)
+ /// // But: current_idx (100) > next_deleted_row_idx (0)
+ /// // And: current_idx (100) != next_deleted_row_idx (0)
+ /// // Neither branch executes -> INFINITE LOOP!
+ /// ```
+ ///
+ /// Expected behavior: Should return 100 rows (delete at 0 doesn't affect
row group 1)
+ /// Bug behavior: Infinite loop in build_deletes_row_selection
+ #[tokio::test]
+ async fn test_position_delete_in_skipped_row_group() {
+ use arrow_array::{Int32Array, Int64Array};
+ use parquet::file::reader::{FileReader, SerializedFileReader};
+
+ // Field IDs for positional delete schema
+ const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
+ const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
+
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().to_str().unwrap().to_string();
+
+ // Create table schema with a single 'id' column
+ let table_schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ ]));
+
+ // Step 1: Create data file with 200 rows in 2 row groups
+ // Row group 0: rows 0-99 (ids 1-100)
+ // Row group 1: rows 100-199 (ids 101-200)
+ let data_file_path = format!("{}/data.parquet", &table_location);
+
+ let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
+ Int32Array::from_iter_values(1..=100),
+ )])
+ .unwrap();
+
+ let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
+ Int32Array::from_iter_values(101..=200),
+ )])
+ .unwrap();
+
+ // Force each batch into its own row group
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .set_max_row_group_size(100)
+ .build();
+
+ let file = File::create(&data_file_path).unwrap();
+ let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(),
Some(props)).unwrap();
+ writer.write(&batch1).expect("Writing batch 1");
+ writer.write(&batch2).expect("Writing batch 2");
+ writer.close().unwrap();
+
+ // Verify we created 2 row groups
+ let verify_file = File::open(&data_file_path).unwrap();
+ let verify_reader = SerializedFileReader::new(verify_file).unwrap();
+ assert_eq!(
+ verify_reader.metadata().num_row_groups(),
+ 2,
+ "Should have 2 row groups"
+ );
+
+ // Step 2: Create position delete file that deletes row 0 (id=1, first
row in row group 0)
+ let delete_file_path = format!("{}/deletes.parquet", &table_location);
+
+ let delete_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("file_path", DataType::Utf8,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
+ )])),
+ Field::new("pos", DataType::Int64,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
+ )])),
+ ]));
+
+ // Delete row at position 0 (0-indexed, so it's the first row: id=1)
+ let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
+
Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
+ Arc::new(Int64Array::from_iter_values(vec![0i64])),
+ ])
+ .unwrap();
+
+ let delete_props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .build();
+
+ let delete_file = File::create(&delete_file_path).unwrap();
+ let mut delete_writer =
+ ArrowWriter::try_new(delete_file, delete_schema,
Some(delete_props)).unwrap();
+ delete_writer.write(&delete_batch).unwrap();
+ delete_writer.close().unwrap();
+
+ // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199)
+ // This exercises the row group selection code path where row group 0
is skipped
+ let metadata_file = File::open(&data_file_path).unwrap();
+ let metadata_reader =
SerializedFileReader::new(metadata_file).unwrap();
+ let metadata = metadata_reader.metadata();
+
+ let row_group_0 = metadata.row_group(0);
+ let row_group_1 = metadata.row_group(1);
+
+ let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
+ let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
+ let rg1_length = row_group_1.compressed_size() as u64;
+
+ let file_io =
FileIO::from_path(&table_location).unwrap().build().unwrap();
+ let reader = ArrowReaderBuilder::new(file_io).build();
+
+ // Create FileScanTask that reads ONLY row group 1 via byte range
filtering
+ let task = FileScanTask {
+ start: rg1_start,
+ length: rg1_length,
+ record_count: Some(100), // Row group 1 has 100 rows
+ data_file_path: data_file_path.clone(),
+ data_file_format: DataFileFormat::Parquet,
+ schema: table_schema.clone(),
+ project_field_ids: vec![1],
+ predicate: None,
+ deletes: vec![FileScanTaskDeleteFile {
+ file_path: delete_file_path,
+ file_type: DataContentType::PositionDeletes,
+ partition_spec_id: 0,
+ equality_ids: None,
+ }],
+ };
+
+ let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as
FileScanTaskStream;
+ let result = reader
+ .read(tasks)
+ .unwrap()
+ .try_collect::<Vec<RecordBatch>>()
+ .await
+ .unwrap();
+
+ // Step 4: Verify we got 100 rows (all of row group 1)
+ // The delete at position 0 is in row group 0, which is skipped, so it
doesn't affect us
+ let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
+
+ assert_eq!(
+ total_rows, 100,
+ "Expected 100 rows from row group 1 (delete at position 0 is in
skipped row group 0). \
+ If this hangs or fails, it indicates the cached delete index was
not updated after advance_to()."
+ );
+
+ // Verify we have all ids from row group 1 (101-200)
+ let all_ids: Vec<i32> = result
+ .iter()
+ .flat_map(|batch| {
+ batch
+ .column(0)
+ .as_primitive::<arrow_array::types::Int32Type>()
+ .values()
+ .iter()
+ .copied()
+ })
+ .collect();
+
+ let expected_ids: Vec<i32> = (101..=200).collect();
+ assert_eq!(
+ all_ids, expected_ids,
+ "Should have ids 101-200 (all of row group 1)"
+ );
+ }
}