This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new cd6005724 fix(reader): filter row groups when FileScanTask contains
byte ranges (#1779)
cd6005724 is described below
commit cd6005724ff511b8a2844cf78bd54c9bb5a5493c
Author: Matt Butrovich <[email protected]>
AuthorDate: Mon Oct 27 07:30:59 2025 -0400
fix(reader): filter row groups when FileScanTask contains byte ranges
(#1779)
---
crates/iceberg/src/arrow/reader.rs | 254 ++++++++++++++++++++++++++++++++++++-
1 file changed, 248 insertions(+), 6 deletions(-)
diff --git a/crates/iceberg/src/arrow/reader.rs
b/crates/iceberg/src/arrow/reader.rs
index ff4cff0a6..720b9363b 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -224,15 +224,16 @@ impl ArrowReader {
}
};
- // There are two possible sources both for potential lists of selected
RowGroup indices,
- // and for `RowSelection`s.
- // Selected RowGroup index lists can come from two sources:
+ // There are three possible sources for potential lists of selected
RowGroup indices,
+ // and two for `RowSelection`s.
+ // Selected RowGroup index lists can come from three sources:
+ // * When task.start and task.length specify a byte range (file
splitting);
// * When there are equality delete files that are applicable;
// * When there is a scan predicate and row_group_filtering_enabled
= true.
// `RowSelection`s can be created in either or both of the following
cases:
// * When there are positional delete files that are applicable;
// * When there is a scan predicate and row_selection_enabled = true
- // Note that, in the former case we only perform row group filtering
when
+ // Note that row group filtering from predicates only happens when
// there is a scan predicate AND row_group_filtering_enabled = true,
// but we perform row selection filtering if there are applicable
// equality delete files OR (there is a scan predicate AND
row_selection_enabled),
@@ -241,6 +242,17 @@ impl ArrowReader {
let mut selected_row_group_indices = None;
let mut row_selection = None;
+ // Filter row groups based on byte range from task.start and
task.length.
+ // If both start and length are 0, read the entire file (backwards
compatibility).
+ if task.start != 0 || task.length != 0 {
+ let byte_range_filtered_row_groups =
Self::filter_row_groups_by_byte_range(
+ record_batch_stream_builder.metadata(),
+ task.start,
+ task.length,
+ )?;
+ selected_row_group_indices = Some(byte_range_filtered_row_groups);
+ }
+
if let Some(predicate) = final_predicate {
let (iceberg_field_ids, field_id_map) =
Self::build_field_id_set_and_map(
record_batch_stream_builder.parquet_schema(),
@@ -256,14 +268,26 @@ impl ArrowReader {
record_batch_stream_builder =
record_batch_stream_builder.with_row_filter(row_filter);
if row_group_filtering_enabled {
- let result = Self::get_selected_row_group_indices(
+ let predicate_filtered_row_groups =
Self::get_selected_row_group_indices(
&predicate,
record_batch_stream_builder.metadata(),
&field_id_map,
&task.schema,
)?;
- selected_row_group_indices = Some(result);
+ // Merge predicate-based filtering with byte range filtering
(if present)
+ // by taking the intersection of both filters
+ selected_row_group_indices = match selected_row_group_indices {
+ Some(byte_range_filtered) => {
+ // Keep only row groups that are in both filters
+ let intersection: Vec<usize> = byte_range_filtered
+ .into_iter()
+ .filter(|idx|
predicate_filtered_row_groups.contains(idx))
+ .collect();
+ Some(intersection)
+ }
+ None => Some(predicate_filtered_row_groups),
+ };
}
if row_selection_enabled {
@@ -717,6 +741,36 @@ impl ArrowReader {
Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
}
+
+ /// Filters row groups by byte range to support Iceberg's file splitting.
+ ///
+ /// Iceberg splits large files at row group boundaries, so we only read
row groups
+ /// whose byte ranges overlap with [start, start+length).
+ fn filter_row_groups_by_byte_range(
+ parquet_metadata: &Arc<ParquetMetaData>,
+ start: u64,
+ length: u64,
+ ) -> Result<Vec<usize>> {
+ let row_groups = parquet_metadata.row_groups();
+ let mut selected = Vec::new();
+ let end = start + length;
+
+ // Row groups are stored sequentially after the 4-byte magic header.
+ let mut current_byte_offset = 4u64;
+
+ for (idx, row_group) in row_groups.iter().enumerate() {
+ let row_group_size = row_group.compressed_size() as u64;
+ let row_group_end = current_byte_offset + row_group_size;
+
+ if current_byte_offset < end && start < row_group_end {
+ selected.push(idx);
+ }
+
+ current_byte_offset = row_group_end;
+ }
+
+ Ok(selected)
+ }
}
/// Build the map of parquet field id to Parquet column index in the schema.
@@ -1949,6 +2003,194 @@ message schema {
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
}
+ /// Verifies that file splits respect byte ranges and only read specific
row groups.
+ #[tokio::test]
+ async fn test_file_splits_respect_byte_ranges() {
+ use arrow_array::Int32Array;
+ use parquet::file::reader::{FileReader, SerializedFileReader};
+
+ let schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ ]));
+
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().to_str().unwrap().to_string();
+ let file_path = format!("{}/multi_row_group.parquet", &table_location);
+
+ // Force each batch into its own row group for testing byte range
filtering.
+ let batch1 = RecordBatch::try_new(arrow_schema.clone(),
vec![Arc::new(Int32Array::from(
+ (0..100).collect::<Vec<i32>>(),
+ ))])
+ .unwrap();
+ let batch2 = RecordBatch::try_new(arrow_schema.clone(),
vec![Arc::new(Int32Array::from(
+ (100..200).collect::<Vec<i32>>(),
+ ))])
+ .unwrap();
+ let batch3 = RecordBatch::try_new(arrow_schema.clone(),
vec![Arc::new(Int32Array::from(
+ (200..300).collect::<Vec<i32>>(),
+ ))])
+ .unwrap();
+
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .set_max_row_group_size(100)
+ .build();
+
+ let file = File::create(&file_path).unwrap();
+ let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(),
Some(props)).unwrap();
+ writer.write(&batch1).expect("Writing batch 1");
+ writer.write(&batch2).expect("Writing batch 2");
+ writer.write(&batch3).expect("Writing batch 3");
+ writer.close().unwrap();
+
+ // Read the file metadata to get row group byte positions
+ let file = File::open(&file_path).unwrap();
+ let reader = SerializedFileReader::new(file).unwrap();
+ let metadata = reader.metadata();
+
+ println!("File has {} row groups", metadata.num_row_groups());
+ assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups");
+
+ // Get byte positions for each row group
+ let row_group_0 = metadata.row_group(0);
+ let row_group_1 = metadata.row_group(1);
+ let row_group_2 = metadata.row_group(2);
+
+ let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
+ let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
+ let rg2_start = rg1_start + row_group_1.compressed_size() as u64;
+ let file_end = rg2_start + row_group_2.compressed_size() as u64;
+
+ println!(
+ "Row group 0: {} rows, starts at byte {}, {} bytes compressed",
+ row_group_0.num_rows(),
+ rg0_start,
+ row_group_0.compressed_size()
+ );
+ println!(
+ "Row group 1: {} rows, starts at byte {}, {} bytes compressed",
+ row_group_1.num_rows(),
+ rg1_start,
+ row_group_1.compressed_size()
+ );
+ println!(
+ "Row group 2: {} rows, starts at byte {}, {} bytes compressed",
+ row_group_2.num_rows(),
+ rg2_start,
+ row_group_2.compressed_size()
+ );
+
+ let file_io =
FileIO::from_path(&table_location).unwrap().build().unwrap();
+ let reader = ArrowReaderBuilder::new(file_io).build();
+
+ // Task 1: read only the first row group
+ let task1 = FileScanTask {
+ start: rg0_start,
+ length: row_group_0.compressed_size() as u64,
+ record_count: Some(100),
+ data_file_path: file_path.clone(),
+ data_file_format: DataFileFormat::Parquet,
+ schema: schema.clone(),
+ project_field_ids: vec![1],
+ predicate: None,
+ deletes: vec![],
+ };
+
+ // Task 2: read the second and third row groups
+ let task2 = FileScanTask {
+ start: rg1_start,
+ length: file_end - rg1_start,
+ record_count: Some(200),
+ data_file_path: file_path.clone(),
+ data_file_format: DataFileFormat::Parquet,
+ schema: schema.clone(),
+ project_field_ids: vec![1],
+ predicate: None,
+ deletes: vec![],
+ };
+
+ let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as
FileScanTaskStream;
+ let result1 = reader
+ .clone()
+ .read(tasks1)
+ .unwrap()
+ .try_collect::<Vec<RecordBatch>>()
+ .await
+ .unwrap();
+
+ let total_rows_task1: usize = result1.iter().map(|b|
b.num_rows()).sum();
+ println!(
+ "Task 1 (bytes {}-{}) returned {} rows",
+ rg0_start,
+ rg0_start + row_group_0.compressed_size() as u64,
+ total_rows_task1
+ );
+
+ let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as
FileScanTaskStream;
+ let result2 = reader
+ .read(tasks2)
+ .unwrap()
+ .try_collect::<Vec<RecordBatch>>()
+ .await
+ .unwrap();
+
+ let total_rows_task2: usize = result2.iter().map(|b|
b.num_rows()).sum();
+ println!(
+ "Task 2 (bytes {}-{}) returned {} rows",
+ rg1_start, file_end, total_rows_task2
+ );
+
+ assert_eq!(
+ total_rows_task1, 100,
+ "Task 1 should read only the first row group (100 rows), but got
{} rows",
+ total_rows_task1
+ );
+
+ assert_eq!(
+ total_rows_task2, 200,
+ "Task 2 should read only the second+third row groups (200 rows),
but got {} rows",
+ total_rows_task2
+ );
+
+ // Verify the actual data values are correct (not just the row count)
+ if total_rows_task1 > 0 {
+ let first_batch = &result1[0];
+ let id_col = first_batch
+ .column(0)
+ .as_primitive::<arrow_array::types::Int32Type>();
+ let first_val = id_col.value(0);
+ let last_val = id_col.value(id_col.len() - 1);
+ println!("Task 1 data range: {} to {}", first_val, last_val);
+
+ assert_eq!(first_val, 0, "Task 1 should start with id=0");
+ assert_eq!(last_val, 99, "Task 1 should end with id=99");
+ }
+
+ if total_rows_task2 > 0 {
+ let first_batch = &result2[0];
+ let id_col = first_batch
+ .column(0)
+ .as_primitive::<arrow_array::types::Int32Type>();
+ let first_val = id_col.value(0);
+ println!("Task 2 first value: {}", first_val);
+
+ assert_eq!(first_val, 100, "Task 2 should start with id=100, not
id=0");
+ }
+ }
+
/// Test schema evolution: reading old Parquet file (with only column 'a')
/// using a newer table schema (with columns 'a' and 'b').
/// This tests that: