This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new cd6005724 fix(reader): filter row groups when FileScanTask contains 
byte ranges (#1779)
cd6005724 is described below

commit cd6005724ff511b8a2844cf78bd54c9bb5a5493c
Author: Matt Butrovich <[email protected]>
AuthorDate: Mon Oct 27 07:30:59 2025 -0400

    fix(reader): filter row groups when FileScanTask contains byte ranges 
(#1779)
---
 crates/iceberg/src/arrow/reader.rs | 254 ++++++++++++++++++++++++++++++++++++-
 1 file changed, 248 insertions(+), 6 deletions(-)

diff --git a/crates/iceberg/src/arrow/reader.rs 
b/crates/iceberg/src/arrow/reader.rs
index ff4cff0a6..720b9363b 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -224,15 +224,16 @@ impl ArrowReader {
             }
         };
 
-        // There are two possible sources both for potential lists of selected 
RowGroup indices,
-        // and for `RowSelection`s.
-        // Selected RowGroup index lists can come from two sources:
+        // There are three possible sources for potential lists of selected 
RowGroup indices,
+        // and two for `RowSelection`s.
+        // Selected RowGroup index lists can come from three sources:
+        //   * When task.start and task.length specify a byte range (file 
splitting);
         //   * When there are equality delete files that are applicable;
         //   * When there is a scan predicate and row_group_filtering_enabled 
= true.
         // `RowSelection`s can be created in either or both of the following 
cases:
         //   * When there are positional delete files that are applicable;
         //   * When there is a scan predicate and row_selection_enabled = true
-        // Note that, in the former case we only perform row group filtering 
when
+        // Note that row group filtering from predicates only happens when
         // there is a scan predicate AND row_group_filtering_enabled = true,
         // but we perform row selection filtering if there are applicable
         // equality delete files OR (there is a scan predicate AND 
row_selection_enabled),
@@ -241,6 +242,17 @@ impl ArrowReader {
         let mut selected_row_group_indices = None;
         let mut row_selection = None;
 
+        // Filter row groups based on byte range from task.start and 
task.length.
+        // If both start and length are 0, read the entire file (backwards 
compatibility).
+        if task.start != 0 || task.length != 0 {
+            let byte_range_filtered_row_groups = 
Self::filter_row_groups_by_byte_range(
+                record_batch_stream_builder.metadata(),
+                task.start,
+                task.length,
+            )?;
+            selected_row_group_indices = Some(byte_range_filtered_row_groups);
+        }
+
         if let Some(predicate) = final_predicate {
             let (iceberg_field_ids, field_id_map) = 
Self::build_field_id_set_and_map(
                 record_batch_stream_builder.parquet_schema(),
@@ -256,14 +268,26 @@ impl ArrowReader {
             record_batch_stream_builder = 
record_batch_stream_builder.with_row_filter(row_filter);
 
             if row_group_filtering_enabled {
-                let result = Self::get_selected_row_group_indices(
+                let predicate_filtered_row_groups = 
Self::get_selected_row_group_indices(
                     &predicate,
                     record_batch_stream_builder.metadata(),
                     &field_id_map,
                     &task.schema,
                 )?;
 
-                selected_row_group_indices = Some(result);
+                // Merge predicate-based filtering with byte range filtering 
(if present)
+                // by taking the intersection of both filters
+                selected_row_group_indices = match selected_row_group_indices {
+                    Some(byte_range_filtered) => {
+                        // Keep only row groups that are in both filters
+                        let intersection: Vec<usize> = byte_range_filtered
+                            .into_iter()
+                            .filter(|idx| 
predicate_filtered_row_groups.contains(idx))
+                            .collect();
+                        Some(intersection)
+                    }
+                    None => Some(predicate_filtered_row_groups),
+                };
             }
 
             if row_selection_enabled {
@@ -717,6 +741,36 @@ impl ArrowReader {
 
         Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
     }
+
+    /// Filters row groups by byte range to support Iceberg's file splitting.
+    ///
+    /// Iceberg splits large files at row group boundaries, so we only read 
row groups
+    /// whose byte ranges overlap with [start, start+length).
+    fn filter_row_groups_by_byte_range(
+        parquet_metadata: &Arc<ParquetMetaData>,
+        start: u64,
+        length: u64,
+    ) -> Result<Vec<usize>> {
+        let row_groups = parquet_metadata.row_groups();
+        let mut selected = Vec::new();
+        let end = start + length;
+
+        // Row groups are stored sequentially after the 4-byte magic header.
+        let mut current_byte_offset = 4u64;
+
+        for (idx, row_group) in row_groups.iter().enumerate() {
+            let row_group_size = row_group.compressed_size() as u64;
+            let row_group_end = current_byte_offset + row_group_size;
+
+            if current_byte_offset < end && start < row_group_end {
+                selected.push(idx);
+            }
+
+            current_byte_offset = row_group_end;
+        }
+
+        Ok(selected)
+    }
 }
 
 /// Build the map of parquet field id to Parquet column index in the schema.
@@ -1949,6 +2003,194 @@ message schema {
         Arc::new(SchemaDescriptor::new(Arc::new(schema)))
     }
 
+    /// Verifies that file splits respect byte ranges and only read specific 
row groups.
+    #[tokio::test]
+    async fn test_file_splits_respect_byte_ranges() {
+        use arrow_array::Int32Array;
+        use parquet::file::reader::{FileReader, SerializedFileReader};
+
+        let schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("id", DataType::Int32, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "1".to_string(),
+            )])),
+        ]));
+
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().to_str().unwrap().to_string();
+        let file_path = format!("{}/multi_row_group.parquet", &table_location);
+
+        // Force each batch into its own row group for testing byte range 
filtering.
+        let batch1 = RecordBatch::try_new(arrow_schema.clone(), 
vec![Arc::new(Int32Array::from(
+            (0..100).collect::<Vec<i32>>(),
+        ))])
+        .unwrap();
+        let batch2 = RecordBatch::try_new(arrow_schema.clone(), 
vec![Arc::new(Int32Array::from(
+            (100..200).collect::<Vec<i32>>(),
+        ))])
+        .unwrap();
+        let batch3 = RecordBatch::try_new(arrow_schema.clone(), 
vec![Arc::new(Int32Array::from(
+            (200..300).collect::<Vec<i32>>(),
+        ))])
+        .unwrap();
+
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .set_max_row_group_size(100)
+            .build();
+
+        let file = File::create(&file_path).unwrap();
+        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), 
Some(props)).unwrap();
+        writer.write(&batch1).expect("Writing batch 1");
+        writer.write(&batch2).expect("Writing batch 2");
+        writer.write(&batch3).expect("Writing batch 3");
+        writer.close().unwrap();
+
+        // Read the file metadata to get row group byte positions
+        let file = File::open(&file_path).unwrap();
+        let reader = SerializedFileReader::new(file).unwrap();
+        let metadata = reader.metadata();
+
+        println!("File has {} row groups", metadata.num_row_groups());
+        assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups");
+
+        // Get byte positions for each row group
+        let row_group_0 = metadata.row_group(0);
+        let row_group_1 = metadata.row_group(1);
+        let row_group_2 = metadata.row_group(2);
+
+        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
+        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
+        let rg2_start = rg1_start + row_group_1.compressed_size() as u64;
+        let file_end = rg2_start + row_group_2.compressed_size() as u64;
+
+        println!(
+            "Row group 0: {} rows, starts at byte {}, {} bytes compressed",
+            row_group_0.num_rows(),
+            rg0_start,
+            row_group_0.compressed_size()
+        );
+        println!(
+            "Row group 1: {} rows, starts at byte {}, {} bytes compressed",
+            row_group_1.num_rows(),
+            rg1_start,
+            row_group_1.compressed_size()
+        );
+        println!(
+            "Row group 2: {} rows, starts at byte {}, {} bytes compressed",
+            row_group_2.num_rows(),
+            rg2_start,
+            row_group_2.compressed_size()
+        );
+
+        let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+        let reader = ArrowReaderBuilder::new(file_io).build();
+
+        // Task 1: read only the first row group
+        let task1 = FileScanTask {
+            start: rg0_start,
+            length: row_group_0.compressed_size() as u64,
+            record_count: Some(100),
+            data_file_path: file_path.clone(),
+            data_file_format: DataFileFormat::Parquet,
+            schema: schema.clone(),
+            project_field_ids: vec![1],
+            predicate: None,
+            deletes: vec![],
+        };
+
+        // Task 2: read the second and third row groups
+        let task2 = FileScanTask {
+            start: rg1_start,
+            length: file_end - rg1_start,
+            record_count: Some(200),
+            data_file_path: file_path.clone(),
+            data_file_format: DataFileFormat::Parquet,
+            schema: schema.clone(),
+            project_field_ids: vec![1],
+            predicate: None,
+            deletes: vec![],
+        };
+
+        let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as 
FileScanTaskStream;
+        let result1 = reader
+            .clone()
+            .read(tasks1)
+            .unwrap()
+            .try_collect::<Vec<RecordBatch>>()
+            .await
+            .unwrap();
+
+        let total_rows_task1: usize = result1.iter().map(|b| 
b.num_rows()).sum();
+        println!(
+            "Task 1 (bytes {}-{}) returned {} rows",
+            rg0_start,
+            rg0_start + row_group_0.compressed_size() as u64,
+            total_rows_task1
+        );
+
+        let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as 
FileScanTaskStream;
+        let result2 = reader
+            .read(tasks2)
+            .unwrap()
+            .try_collect::<Vec<RecordBatch>>()
+            .await
+            .unwrap();
+
+        let total_rows_task2: usize = result2.iter().map(|b| 
b.num_rows()).sum();
+        println!(
+            "Task 2 (bytes {}-{}) returned {} rows",
+            rg1_start, file_end, total_rows_task2
+        );
+
+        assert_eq!(
+            total_rows_task1, 100,
+            "Task 1 should read only the first row group (100 rows), but got 
{} rows",
+            total_rows_task1
+        );
+
+        assert_eq!(
+            total_rows_task2, 200,
+            "Task 2 should read only the second+third row groups (200 rows), 
but got {} rows",
+            total_rows_task2
+        );
+
+        // Verify the actual data values are correct (not just the row count)
+        if total_rows_task1 > 0 {
+            let first_batch = &result1[0];
+            let id_col = first_batch
+                .column(0)
+                .as_primitive::<arrow_array::types::Int32Type>();
+            let first_val = id_col.value(0);
+            let last_val = id_col.value(id_col.len() - 1);
+            println!("Task 1 data range: {} to {}", first_val, last_val);
+
+            assert_eq!(first_val, 0, "Task 1 should start with id=0");
+            assert_eq!(last_val, 99, "Task 1 should end with id=99");
+        }
+
+        if total_rows_task2 > 0 {
+            let first_batch = &result2[0];
+            let id_col = first_batch
+                .column(0)
+                .as_primitive::<arrow_array::types::Int32Type>();
+            let first_val = id_col.value(0);
+            println!("Task 2 first value: {}", first_val);
+
+            assert_eq!(first_val, 100, "Task 2 should start with id=100, not 
id=0");
+        }
+    }
+
     /// Test schema evolution: reading old Parquet file (with only column 'a')
     /// using a newer table schema (with columns 'a' and 'b').
     /// This tests that:

Reply via email to